package org.apache.beam.sdk.io.kinesis;

import java.lang.invoke.SerializedLambda;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({Instant.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest.class */
public class WatermarkPolicyTest {
    private static final Instant NOW = Instant.now();

    @Test
    public void shouldAdvanceWatermarkWithTheArrivalTimeFromKinesisRecords() {
        WatermarkPolicy createWatermarkPolicy = WatermarkPolicyFactory.withArrivalTimePolicy().createWatermarkPolicy();
        KinesisRecord kinesisRecord = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        KinesisRecord kinesisRecord2 = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        Instant minus = NOW.minus(Duration.standardSeconds(30L));
        Instant minus2 = NOW.minus(Duration.standardSeconds(20L));
        Mockito.when(kinesisRecord.getApproximateArrivalTimestamp()).thenReturn(minus);
        Mockito.when(kinesisRecord2.getApproximateArrivalTimestamp()).thenReturn(minus2);
        createWatermarkPolicy.update(kinesisRecord);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus);
        createWatermarkPolicy.update(kinesisRecord2);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus2);
    }

    @Test
    public void shouldOnlyAdvanceTheWatermark() {
        WatermarkPolicy createWatermarkPolicy = WatermarkPolicyFactory.withArrivalTimePolicy().createWatermarkPolicy();
        KinesisRecord kinesisRecord = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        KinesisRecord kinesisRecord2 = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        KinesisRecord kinesisRecord3 = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        Instant minus = NOW.minus(Duration.standardSeconds(30L));
        Instant minus2 = NOW.minus(Duration.standardSeconds(20L));
        Instant minus3 = NOW.minus(Duration.standardSeconds(40L));
        Mockito.when(kinesisRecord.getApproximateArrivalTimestamp()).thenReturn(minus);
        Mockito.when(kinesisRecord2.getApproximateArrivalTimestamp()).thenReturn(minus2);
        Mockito.when(kinesisRecord3.getApproximateArrivalTimestamp()).thenReturn(minus3);
        createWatermarkPolicy.update(kinesisRecord);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus);
        createWatermarkPolicy.update(kinesisRecord2);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus2);
        createWatermarkPolicy.update(kinesisRecord3);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus2);
    }

    @Test
    public void shouldAdvanceWatermarkWhenThereAreNoIncomingRecords() {
        WatermarkParameters create = WatermarkParameters.create();
        WatermarkPolicy createWatermarkPolicy = WatermarkPolicyFactory.withCustomWatermarkPolicy(create).createWatermarkPolicy();
        PowerMockito.mockStatic(Instant.class, new Class[0]);
        Instant minus = NOW.minus(Duration.standardSeconds(500L));
        Instant minus2 = NOW.minus(Duration.standardSeconds(498L));
        Instant instant = NOW;
        Instant minus3 = NOW.minus(Duration.standardSeconds(510L));
        Duration watermarkIdleDurationThreshold = create.getWatermarkIdleDurationThreshold();
        Mockito.when(Instant.now()).thenReturn(minus).thenReturn(minus2).thenReturn(instant);
        KinesisRecord kinesisRecord = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        Mockito.when(kinesisRecord.getApproximateArrivalTimestamp()).thenReturn(minus3);
        createWatermarkPolicy.update(kinesisRecord);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus3);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(instant.minus(watermarkIdleDurationThreshold));
    }

    @Test
    public void shouldAdvanceWatermarkToNowWithProcessingTimePolicy() {
        WatermarkPolicy createWatermarkPolicy = WatermarkPolicyFactory.withProcessingTimePolicy().createWatermarkPolicy();
        PowerMockito.mockStatic(Instant.class, new Class[0]);
        Instant minus = NOW.minus(Duration.standardSeconds(5L));
        Instant minus2 = NOW.minus(Duration.standardSeconds(4L));
        Mockito.when(Instant.now()).thenReturn(minus).thenReturn(minus2);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus2);
    }

    @Test
    public void shouldAdvanceWatermarkWithCustomTimePolicy() {
        WatermarkPolicy createWatermarkPolicy = WatermarkPolicyFactory.withCustomWatermarkPolicy(WatermarkParameters.create().withTimestampFn(kinesisRecord -> {
            return kinesisRecord.getApproximateArrivalTimestamp().plus(Duration.standardMinutes(1L));
        })).createWatermarkPolicy();
        KinesisRecord kinesisRecord2 = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        KinesisRecord kinesisRecord3 = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        Instant minus = NOW.minus(Duration.standardSeconds(30L));
        Instant minus2 = NOW.minus(Duration.standardSeconds(20L));
        Mockito.when(kinesisRecord2.getApproximateArrivalTimestamp()).thenReturn(minus);
        Mockito.when(kinesisRecord3.getApproximateArrivalTimestamp()).thenReturn(minus2);
        createWatermarkPolicy.update(kinesisRecord2);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus.plus(Duration.standardMinutes(1L)));
        createWatermarkPolicy.update(kinesisRecord3);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus2.plus(Duration.standardMinutes(1L)));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1561022878:
                if (implMethodName.equals("lambda$shouldAdvanceWatermarkWithCustomTimePolicy$3338a463$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/kinesis/WatermarkPolicyTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/kinesis/KinesisRecord;)Lorg/joda/time/Instant;")) {
                    return kinesisRecord -> {
                        return kinesisRecord.getApproximateArrivalTimestamp().plus(Duration.standardMinutes(1L));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
