package org.apache.beam.sdk.io.kinesis;

import java.util.function.BooleanSupplier;
import org.assertj.core.api.Assertions;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/kinesis/KinesisWatermarkTest.class */
public class KinesisWatermarkTest {
    private static final BooleanSupplier SHARDS_UP_TO_DATE = () -> {
        return true;
    };
    private static final BooleanSupplier SHARDS_NOT_UP_TO_DATE = () -> {
        return false;
    };
    private static final BooleanSupplier SHARDS_IRRELEVANT = () -> {
        throw new AssertionError("Shard status should not be queried");
    };
    private final Instant now = Instant.now();
    private KinesisWatermark watermark;

    @Before
    public void setUp() {
        setCurrentTimeTo(this.now);
        this.watermark = new KinesisWatermark();
    }

    @After
    public void tearDown() {
        DateTimeUtils.setCurrentMillisSystem();
    }

    @Test
    public void watermarkStartsAtSamplePeriodBehindNowIfShardsUpToDate() {
        Assertions.assertThat(this.watermark.getCurrent(SHARDS_UP_TO_DATE)).isEqualTo(this.now.minus(KinesisWatermark.SAMPLE_PERIOD));
    }

    @Test
    public void watermarkStartsWithMinIfShardsNotUpToDate() {
        Assertions.assertThat(this.watermark.getCurrent(SHARDS_NOT_UP_TO_DATE)).isEqualTo(this.now.minus(KinesisWatermark.MAX_KINESIS_STREAM_RETENTION_PERIOD));
    }

    @Test
    public void watermarkIsUpdatedToFirstRecordTimestamp() {
        Instant minus = this.now.minus(Duration.standardHours(1L));
        this.watermark.update(minus);
        Assertions.assertThat(this.watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(minus);
    }

    @Test
    public void watermarkIsUpdatedToRecentRecordTimestampIfItIsOlderThanUpdateThreshold() {
        Instant minus = this.now.minus(Duration.standardHours(1L));
        this.watermark.update(minus);
        Assertions.assertThat(this.watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(minus);
        Instant plus = this.now.plus(KinesisWatermark.UPDATE_THRESHOLD.plus(Duration.millis(1L)));
        setCurrentTimeTo(plus);
        Instant plus2 = plus.plus(Duration.millis(1L));
        this.watermark.update(plus2);
        Assertions.assertThat(this.watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(plus2);
    }

    @Test
    public void watermarkDoesNotChangeWhenTooFewSampleRecordsInSamplePeriod() {
        Instant minus = this.now.minus(Duration.standardHours(1L));
        this.watermark.update(minus);
        Assertions.assertThat(this.watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(minus);
        setCurrentTimeTo(this.now.plus(KinesisWatermark.SAMPLE_PERIOD));
        this.watermark.update(minus);
        for (int i = 1; i <= 5; i++) {
            this.watermark.update(minus.plus(Duration.millis(i)));
        }
        Assertions.assertThat(this.watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(minus);
    }

    @Test
    public void watermarkAdvancesWhenEnoughRecordsReadRecently() {
        Instant minus = this.now.minus(Duration.standardHours(1L));
        this.watermark.update(minus);
        Assertions.assertThat(this.watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(minus);
        Instant plus = minus.plus(Duration.millis(1L));
        setCurrentTimeTo(this.now.plus(KinesisWatermark.SAMPLE_PERIOD));
        for (int i = 0; i < 9; i++) {
            this.watermark.update(plus.plus(Duration.millis(i)));
            Assertions.assertThat(this.watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(minus);
        }
        this.watermark.update(plus.plus(Duration.millis(9L)));
        Assertions.assertThat(this.watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(plus);
    }

    @Test
    public void watermarkDoesNotGoBackward() {
        this.watermark.update(this.now);
        for (int i = 0; i <= 20; i++) {
            this.watermark.update(this.now.minus(Duration.millis(i)));
            Assertions.assertThat(this.watermark.getCurrent(SHARDS_IRRELEVANT)).isEqualTo(this.now);
        }
    }

    private static void setCurrentTimeTo(Instant instant) {
        DateTimeUtils.setCurrentMillisFixed(instant.getMillis());
    }
}
