/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.Serializable;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.aws2.kinesis.WatermarkParameters;
import org.apache.beam.sdk.io.aws2.kinesis.WatermarkPolicy;
import org.apache.beam.sdk.io.aws2.kinesis.WatermarkPolicyFactory;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.assertj.core.api.Assertions;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.After;
import org.junit.Test;
import org.mockito.Mockito;

public class WatermarkPolicyTest {
    private static final Instant NOW = Instant.now();

    @After
    public void resetJodaTime() {
        DateTimeUtils.setCurrentMillisSystem();
    }

    @Test
    public void shouldAdvanceWatermarkWithTheArrivalTimeFromKinesisRecords() {
        WatermarkPolicy policy = WatermarkPolicyFactory.withArrivalTimePolicy().createWatermarkPolicy();
        KinesisRecord a = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        KinesisRecord b = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        Instant time1 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)30L));
        Instant time2 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)20L));
        Mockito.when((Object)a.getApproximateArrivalTimestamp()).thenReturn((Object)time1);
        Mockito.when((Object)b.getApproximateArrivalTimestamp()).thenReturn((Object)time2);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)BoundedWindow.TIMESTAMP_MIN_VALUE);
        policy.update(a);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time1);
        policy.update(b);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time2);
    }

    @Test
    public void shouldOnlyAdvanceTheWatermark() {
        WatermarkPolicy policy = WatermarkPolicyFactory.withArrivalTimePolicy().createWatermarkPolicy();
        KinesisRecord a = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        KinesisRecord b = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        KinesisRecord c = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        Instant time1 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)30L));
        Instant time2 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)20L));
        Instant time3 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)40L));
        Mockito.when((Object)a.getApproximateArrivalTimestamp()).thenReturn((Object)time1);
        Mockito.when((Object)b.getApproximateArrivalTimestamp()).thenReturn((Object)time2);
        Mockito.when((Object)c.getApproximateArrivalTimestamp()).thenReturn((Object)time3);
        policy.update(a);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time1);
        policy.update(b);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time2);
        policy.update(c);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time2);
    }

    @Test
    public void shouldAdvanceWatermarkWhenThereAreNoIncomingRecords() {
        WatermarkParameters standardWatermarkParams = WatermarkParameters.create();
        WatermarkPolicy policy = WatermarkPolicyFactory.withCustomWatermarkPolicy((WatermarkParameters)standardWatermarkParams).createWatermarkPolicy();
        Duration watermarkIdleTimeThreshold = standardWatermarkParams.getWatermarkIdleDurationThreshold();
        KinesisRecord a = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        Instant arrivalTime = NOW.minus((ReadableDuration)Duration.standardSeconds((long)510L));
        Mockito.when((Object)a.getApproximateArrivalTimestamp()).thenReturn((Object)arrivalTime);
        DateTimeUtils.setCurrentMillisFixed((long)NOW.minus((ReadableDuration)Duration.standardSeconds((long)500L)).getMillis());
        policy.update(a);
        DateTimeUtils.setCurrentMillisFixed((long)NOW.minus((ReadableDuration)Duration.standardSeconds((long)498L)).getMillis());
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)arrivalTime);
        DateTimeUtils.setCurrentMillisFixed((long)NOW.getMillis());
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)NOW.minus((ReadableDuration)watermarkIdleTimeThreshold));
    }

    @Test
    public void shouldAdvanceWatermarkToNowWithProcessingTimePolicy() {
        WatermarkPolicy policy = WatermarkPolicyFactory.withProcessingTimePolicy().createWatermarkPolicy();
        Instant time1 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)5L));
        Instant time2 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)4L));
        DateTimeUtils.setCurrentMillisFixed((long)time1.getMillis());
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time1);
        DateTimeUtils.setCurrentMillisFixed((long)time2.getMillis());
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time2);
    }

    @Test
    public void shouldAdvanceWatermarkWithCustomTimePolicy() {
        SerializableFunction & Serializable timestampFn = (SerializableFunction & Serializable)record -> record.getApproximateArrivalTimestamp().plus((ReadableDuration)Duration.standardMinutes((long)1L));
        WatermarkPolicy policy = WatermarkPolicyFactory.withCustomWatermarkPolicy((WatermarkParameters)WatermarkParameters.create().withTimestampFn((SerializableFunction)timestampFn)).createWatermarkPolicy();
        KinesisRecord a = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        KinesisRecord b = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        Instant time1 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)30L));
        Instant time2 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)20L));
        Mockito.when((Object)a.getApproximateArrivalTimestamp()).thenReturn((Object)time1);
        Mockito.when((Object)b.getApproximateArrivalTimestamp()).thenReturn((Object)time2);
        policy.update(a);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time1.plus((ReadableDuration)Duration.standardMinutes((long)1L)));
        policy.update(b);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time2.plus((ReadableDuration)Duration.standardMinutes((long)1L)));
    }

    @Test
    public void shouldUpdateWatermarkParameters() {
        SerializableFunction & Serializable fn = (SerializableFunction & Serializable)input -> Instant.now();
        Duration idleDurationThreshold = Duration.standardSeconds((long)30L);
        WatermarkParameters parameters = WatermarkParameters.create().withTimestampFn((SerializableFunction)fn).withWatermarkIdleDurationThreshold(idleDurationThreshold);
        Assertions.assertThat((Object)parameters.getTimestampFn()).isEqualTo((Object)fn);
        Assertions.assertThat((Comparable)parameters.getWatermarkIdleDurationThreshold()).isEqualTo((Object)idleDurationThreshold);
    }
}

