/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.io.Serializable;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.aws2.kinesis.WatermarkParameters;
import org.apache.beam.sdk.io.aws2.kinesis.WatermarkPolicy;
import org.apache.beam.sdk.io.aws2.kinesis.WatermarkPolicyFactory;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={Instant.class})
public class WatermarkPolicyTest {
    private static final Instant NOW = Instant.now();

    @Test
    public void shouldAdvanceWatermarkWithTheArrivalTimeFromKinesisRecords() {
        WatermarkPolicy policy = WatermarkPolicyFactory.withArrivalTimePolicy().createWatermarkPolicy();
        KinesisRecord a = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        KinesisRecord b = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        Instant time1 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)30L));
        Instant time2 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)20L));
        Mockito.when((Object)a.getApproximateArrivalTimestamp()).thenReturn((Object)time1);
        Mockito.when((Object)b.getApproximateArrivalTimestamp()).thenReturn((Object)time2);
        policy.update(a);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time1);
        policy.update(b);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time2);
    }

    @Test
    public void shouldOnlyAdvanceTheWatermark() {
        WatermarkPolicy policy = WatermarkPolicyFactory.withArrivalTimePolicy().createWatermarkPolicy();
        KinesisRecord a = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        KinesisRecord b = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        KinesisRecord c = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        Instant time1 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)30L));
        Instant time2 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)20L));
        Instant time3 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)40L));
        Mockito.when((Object)a.getApproximateArrivalTimestamp()).thenReturn((Object)time1);
        Mockito.when((Object)b.getApproximateArrivalTimestamp()).thenReturn((Object)time2);
        Mockito.when((Object)c.getApproximateArrivalTimestamp()).thenReturn((Object)time3);
        policy.update(a);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time1);
        policy.update(b);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time2);
        policy.update(c);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time2);
    }

    @Test
    public void shouldAdvanceWatermarkWhenThereAreNoIncomingRecords() {
        WatermarkParameters standardWatermarkParams = WatermarkParameters.create();
        WatermarkPolicy policy = WatermarkPolicyFactory.withCustomWatermarkPolicy((WatermarkParameters)standardWatermarkParams).createWatermarkPolicy();
        PowerMockito.mockStatic(Instant.class, (Class[])new Class[0]);
        Instant time1 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)500L));
        Instant time2 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)498L));
        Instant time3 = NOW;
        Instant arrivalTime = NOW.minus((ReadableDuration)Duration.standardSeconds((long)510L));
        Duration watermarkIdleTimeThreshold = standardWatermarkParams.getWatermarkIdleDurationThreshold();
        Mockito.when((Object)Instant.now()).thenReturn((Object)time1).thenReturn((Object)time2).thenReturn((Object)time3);
        KinesisRecord a = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        Mockito.when((Object)a.getApproximateArrivalTimestamp()).thenReturn((Object)arrivalTime);
        policy.update(a);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)arrivalTime);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time3.minus((ReadableDuration)watermarkIdleTimeThreshold));
    }

    @Test
    public void shouldAdvanceWatermarkToNowWithProcessingTimePolicy() {
        WatermarkPolicy policy = WatermarkPolicyFactory.withProcessingTimePolicy().createWatermarkPolicy();
        PowerMockito.mockStatic(Instant.class, (Class[])new Class[0]);
        Instant time1 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)5L));
        Instant time2 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)4L));
        Mockito.when((Object)Instant.now()).thenReturn((Object)time1).thenReturn((Object)time2);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time1);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time2);
    }

    @Test
    public void shouldAdvanceWatermarkWithCustomTimePolicy() {
        SerializableFunction & Serializable timestampFn = (SerializableFunction & Serializable)record -> record.getApproximateArrivalTimestamp().plus((ReadableDuration)Duration.standardMinutes((long)1L));
        WatermarkPolicy policy = WatermarkPolicyFactory.withCustomWatermarkPolicy((WatermarkParameters)WatermarkParameters.create().withTimestampFn((SerializableFunction)timestampFn)).createWatermarkPolicy();
        KinesisRecord a = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        KinesisRecord b = (KinesisRecord)Mockito.mock(KinesisRecord.class);
        Instant time1 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)30L));
        Instant time2 = NOW.minus((ReadableDuration)Duration.standardSeconds((long)20L));
        Mockito.when((Object)a.getApproximateArrivalTimestamp()).thenReturn((Object)time1);
        Mockito.when((Object)b.getApproximateArrivalTimestamp()).thenReturn((Object)time2);
        policy.update(a);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time1.plus((ReadableDuration)Duration.standardMinutes((long)1L)));
        policy.update(b);
        Assertions.assertThat((Comparable)policy.getWatermark()).isEqualTo((Object)time2.plus((ReadableDuration)Duration.standardMinutes((long)1L)));
    }
}

