/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.splittabledofn;

import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.apache.beam.sdk.fn.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.KV;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class WatermarkEstimatorsTest {
    @Test
    public void testThreadSafeWatermarkEstimator() throws @UnknownKeyFor @NonNull @Initialized Exception {
        final Instant[] reference = new Instant[]{GlobalWindow.TIMESTAMP_MIN_VALUE};
        WatermarkEstimator<Instant> watermarkEstimator = new WatermarkEstimator<Instant>(){
            private @UnknownKeyFor @NonNull @Initialized Instant currentWatermark = GlobalWindow.TIMESTAMP_MIN_VALUE;

            public @UnknownKeyFor @NonNull @Initialized Instant currentWatermark() {
                this.currentWatermark = reference[0];
                return this.currentWatermark;
            }

            public @UnknownKeyFor @NonNull @Initialized Instant getState() {
                return this.currentWatermark;
            }
        };
        this.testWatermarkEstimatorSnapshotsStateWithCompetingThread(WatermarkEstimators.threadSafe((WatermarkEstimator)watermarkEstimator), instant -> {
            reference[0] = instant;
        });
    }

    @Test
    public void testThreadSafeTimestampObservingWatermarkEstimator() throws @UnknownKeyFor @NonNull @Initialized Exception {
        WatermarkEstimators.WatermarkAndStateObserver threadsafeWatermarkEstimator = WatermarkEstimators.threadSafe((WatermarkEstimator)new WatermarkEstimators.MonotonicallyIncreasing(GlobalWindow.TIMESTAMP_MIN_VALUE));
        this.testWatermarkEstimatorSnapshotsStateWithCompetingThread(threadsafeWatermarkEstimator, arg_0 -> ((TimestampObservingWatermarkEstimator)((TimestampObservingWatermarkEstimator)threadsafeWatermarkEstimator)).observeTimestamp(arg_0));
    }

    public <WatermarkEstimatorT extends WatermarkEstimators.WatermarkAndStateObserver<Instant>> void testWatermarkEstimatorSnapshotsStateWithCompetingThread(WatermarkEstimatorT watermarkEstimator, @UnknownKeyFor @NonNull @Initialized Consumer<@UnknownKeyFor @NonNull @Initialized Instant> watermarkUpdater) throws @UnknownKeyFor @NonNull @Initialized Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread t = new Thread(() -> {
            countDownLatch.countDown();
            for (int i = 0; i < 1000; ++i) {
                watermarkUpdater.accept(GlobalWindow.TIMESTAMP_MIN_VALUE.plus((long)i));
            }
        });
        t.start();
        countDownLatch.await();
        Instant currentMinimum = GlobalWindow.TIMESTAMP_MIN_VALUE;
        for (int i = 0; i < 100; ++i) {
            KV value = watermarkEstimator.getWatermarkAndState();
            Assert.assertEquals((Object)value.getKey(), (Object)value.getValue());
            Assert.assertFalse((boolean)currentMinimum.isAfter((ReadableInstant)value.getKey()));
        }
        t.join(10000L);
    }
}

