package org.apache.beam.sdk.transforms.splittabledofn;

import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.class */
public class WatermarkEstimatorsTest {

    @Rule
    public ResetDateTimeProvider resetDateTimeProvider = new ResetDateTimeProvider();

    @Test
    public void testManualWatermarkEstimator() {
        WatermarkEstimators.Manual manual = new WatermarkEstimators.Manual(BoundedWindow.TIMESTAMP_MIN_VALUE);
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, manual.currentWatermark());
        manual.setWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE);
        manual.setWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardHours(2L)));
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardHours(2L)), manual.currentWatermark());
        manual.setWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardHours(1L)));
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardHours(2L)), manual.currentWatermark());
        manual.setWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, manual.currentWatermark());
    }

    @Test
    public void testWallTimeWatermarkEstimator() {
        DateTimeUtils.setCurrentMillisFixed(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
        WatermarkEstimators.WallTime wallTime = new WatermarkEstimators.WallTime(new Instant());
        DateTimeUtils.setCurrentMillisFixed(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(1L).getMillis());
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(1L), wallTime.currentWatermark());
        DateTimeUtils.setCurrentMillisFixed(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(2L).getMillis());
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(1L), wallTime.getState());
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(2L), wallTime.currentWatermark());
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(2L), wallTime.getState());
        DateTimeUtils.setCurrentMillisFixed(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(1L).getMillis());
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(2L), wallTime.currentWatermark());
    }

    @Test
    public void testMonotonicallyIncreasingWatermarkEstimator() {
        WatermarkEstimators.MonotonicallyIncreasing monotonicallyIncreasing = new WatermarkEstimators.MonotonicallyIncreasing(BoundedWindow.TIMESTAMP_MIN_VALUE);
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, monotonicallyIncreasing.currentWatermark());
        monotonicallyIncreasing.observeTimestamp(BoundedWindow.TIMESTAMP_MIN_VALUE);
        monotonicallyIncreasing.observeTimestamp(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardHours(2L)));
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardHours(2L)), monotonicallyIncreasing.currentWatermark());
        monotonicallyIncreasing.observeTimestamp(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardHours(1L)));
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardHours(2L)), monotonicallyIncreasing.currentWatermark());
        monotonicallyIncreasing.observeTimestamp(BoundedWindow.TIMESTAMP_MAX_VALUE);
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, monotonicallyIncreasing.currentWatermark());
    }
}
