package org.apache.beam.sdk.io.aws2.kinesis;

import java.lang.invoke.SerializedLambda;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.assertj.core.api.Assertions;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/WatermarkPolicyTest.class */
public class WatermarkPolicyTest {
    private static final Instant NOW = Instant.now();

    @After
    public void resetJodaTime() {
        DateTimeUtils.setCurrentMillisSystem();
    }

    @Test
    public void shouldAdvanceWatermarkWithTheArrivalTimeFromKinesisRecords() {
        WatermarkPolicy createWatermarkPolicy = WatermarkPolicyFactory.withArrivalTimePolicy().createWatermarkPolicy();
        KinesisRecord kinesisRecord = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        KinesisRecord kinesisRecord2 = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        Instant minus = NOW.minus(Duration.standardSeconds(30L));
        Instant minus2 = NOW.minus(Duration.standardSeconds(20L));
        Mockito.when(kinesisRecord.getApproximateArrivalTimestamp()).thenReturn(minus);
        Mockito.when(kinesisRecord2.getApproximateArrivalTimestamp()).thenReturn(minus2);
        createWatermarkPolicy.update(kinesisRecord);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus);
        createWatermarkPolicy.update(kinesisRecord2);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus2);
    }

    @Test
    public void shouldOnlyAdvanceTheWatermark() {
        WatermarkPolicy createWatermarkPolicy = WatermarkPolicyFactory.withArrivalTimePolicy().createWatermarkPolicy();
        KinesisRecord kinesisRecord = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        KinesisRecord kinesisRecord2 = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        KinesisRecord kinesisRecord3 = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        Instant minus = NOW.minus(Duration.standardSeconds(30L));
        Instant minus2 = NOW.minus(Duration.standardSeconds(20L));
        Instant minus3 = NOW.minus(Duration.standardSeconds(40L));
        Mockito.when(kinesisRecord.getApproximateArrivalTimestamp()).thenReturn(minus);
        Mockito.when(kinesisRecord2.getApproximateArrivalTimestamp()).thenReturn(minus2);
        Mockito.when(kinesisRecord3.getApproximateArrivalTimestamp()).thenReturn(minus3);
        createWatermarkPolicy.update(kinesisRecord);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus);
        createWatermarkPolicy.update(kinesisRecord2);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus2);
        createWatermarkPolicy.update(kinesisRecord3);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus2);
    }

    @Test
    public void shouldAdvanceWatermarkWhenThereAreNoIncomingRecords() {
        WatermarkParameters create = WatermarkParameters.create();
        WatermarkPolicy createWatermarkPolicy = WatermarkPolicyFactory.withCustomWatermarkPolicy(create).createWatermarkPolicy();
        Duration watermarkIdleDurationThreshold = create.getWatermarkIdleDurationThreshold();
        KinesisRecord kinesisRecord = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        Instant minus = NOW.minus(Duration.standardSeconds(510L));
        Mockito.when(kinesisRecord.getApproximateArrivalTimestamp()).thenReturn(minus);
        DateTimeUtils.setCurrentMillisFixed(NOW.minus(Duration.standardSeconds(500L)).getMillis());
        createWatermarkPolicy.update(kinesisRecord);
        DateTimeUtils.setCurrentMillisFixed(NOW.minus(Duration.standardSeconds(498L)).getMillis());
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus);
        DateTimeUtils.setCurrentMillisFixed(NOW.getMillis());
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(NOW.minus(watermarkIdleDurationThreshold));
    }

    @Test
    public void shouldAdvanceWatermarkToNowWithProcessingTimePolicy() {
        WatermarkPolicy createWatermarkPolicy = WatermarkPolicyFactory.withProcessingTimePolicy().createWatermarkPolicy();
        Instant minus = NOW.minus(Duration.standardSeconds(5L));
        Instant minus2 = NOW.minus(Duration.standardSeconds(4L));
        DateTimeUtils.setCurrentMillisFixed(minus.getMillis());
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus);
        DateTimeUtils.setCurrentMillisFixed(minus2.getMillis());
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus2);
    }

    @Test
    public void shouldAdvanceWatermarkWithCustomTimePolicy() {
        WatermarkPolicy createWatermarkPolicy = WatermarkPolicyFactory.withCustomWatermarkPolicy(WatermarkParameters.create().withTimestampFn(kinesisRecord -> {
            return kinesisRecord.getApproximateArrivalTimestamp().plus(Duration.standardMinutes(1L));
        })).createWatermarkPolicy();
        KinesisRecord kinesisRecord2 = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        KinesisRecord kinesisRecord3 = (KinesisRecord) Mockito.mock(KinesisRecord.class);
        Instant minus = NOW.minus(Duration.standardSeconds(30L));
        Instant minus2 = NOW.minus(Duration.standardSeconds(20L));
        Mockito.when(kinesisRecord2.getApproximateArrivalTimestamp()).thenReturn(minus);
        Mockito.when(kinesisRecord3.getApproximateArrivalTimestamp()).thenReturn(minus2);
        createWatermarkPolicy.update(kinesisRecord2);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus.plus(Duration.standardMinutes(1L)));
        createWatermarkPolicy.update(kinesisRecord3);
        Assertions.assertThat(createWatermarkPolicy.getWatermark()).isEqualTo(minus2.plus(Duration.standardMinutes(1L)));
    }

    @Test
    public void shouldUpdateWatermarkParameters() {
        SerializableFunction serializableFunction = kinesisRecord -> {
            return Instant.now();
        };
        Duration standardSeconds = Duration.standardSeconds(30L);
        WatermarkParameters withWatermarkIdleDurationThreshold = WatermarkParameters.create().withTimestampFn(serializableFunction).withWatermarkIdleDurationThreshold(standardSeconds);
        Assertions.assertThat(withWatermarkIdleDurationThreshold.getTimestampFn()).isEqualTo(serializableFunction);
        Assertions.assertThat(withWatermarkIdleDurationThreshold.getWatermarkIdleDurationThreshold()).isEqualTo(standardSeconds);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 264373246:
                if (implMethodName.equals("lambda$shouldUpdateWatermarkParameters$640fe311$1")) {
                    z = true;
                    break;
                }
                break;
            case 1561022878:
                if (implMethodName.equals("lambda$shouldAdvanceWatermarkWithCustomTimePolicy$3338a463$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/WatermarkPolicyTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/aws2/kinesis/KinesisRecord;)Lorg/joda/time/Instant;")) {
                    return kinesisRecord -> {
                        return kinesisRecord.getApproximateArrivalTimestamp().plus(Duration.standardMinutes(1L));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/WatermarkPolicyTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/aws2/kinesis/KinesisRecord;)Lorg/joda/time/Instant;")) {
                    return kinesisRecord2 -> {
                        return Instant.now();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
