package org.apache.beam.sdk.fn.splittabledofn;

import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.apache.beam.sdk.fn.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/fn/splittabledofn/WatermarkEstimatorsTest.class */
public class WatermarkEstimatorsTest {
    @Test
    public void testThreadSafeWatermarkEstimator() throws Exception {
        final Instant[] instantArr = {GlobalWindow.TIMESTAMP_MIN_VALUE};
        testWatermarkEstimatorSnapshotsStateWithCompetingThread(WatermarkEstimators.threadSafe(new WatermarkEstimator<Instant>() { // from class: org.apache.beam.sdk.fn.splittabledofn.WatermarkEstimatorsTest.1
            private Instant currentWatermark = GlobalWindow.TIMESTAMP_MIN_VALUE;

            public Instant currentWatermark() {
                this.currentWatermark = instantArr[0];
                return this.currentWatermark;
            }

            /* renamed from: getState, reason: merged with bridge method [inline-methods] */
            public Instant m345getState() {
                return this.currentWatermark;
            }
        }), instant -> {
            instantArr[0] = instant;
        });
    }

    @Test
    public void testThreadSafeTimestampObservingWatermarkEstimator() throws Exception {
        TimestampObservingWatermarkEstimator threadSafe = WatermarkEstimators.threadSafe(new WatermarkEstimators.MonotonicallyIncreasing(GlobalWindow.TIMESTAMP_MIN_VALUE));
        TimestampObservingWatermarkEstimator timestampObservingWatermarkEstimator = threadSafe;
        Objects.requireNonNull(timestampObservingWatermarkEstimator);
        testWatermarkEstimatorSnapshotsStateWithCompetingThread(threadSafe, timestampObservingWatermarkEstimator::observeTimestamp);
    }

    public <WatermarkEstimatorT extends WatermarkEstimators.WatermarkAndStateObserver<Instant>> void testWatermarkEstimatorSnapshotsStateWithCompetingThread(WatermarkEstimatorT watermarkestimatort, Consumer<Instant> consumer) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            countDownLatch.countDown();
            for (int i = 0; i < 1000; i++) {
                consumer.accept(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(i)));
            }
        });
        thread.start();
        countDownLatch.await();
        Instant instant = GlobalWindow.TIMESTAMP_MIN_VALUE;
        for (int i = 0; i < 100; i++) {
            KV watermarkAndState = watermarkestimatort.getWatermarkAndState();
            Assert.assertEquals(watermarkAndState.getKey(), watermarkAndState.getValue());
            Assert.assertFalse(instant.isAfter((ReadableInstant) watermarkAndState.getKey()));
        }
        thread.join(10000L);
    }
}
