package org.apache.beam.runners.direct;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.class */
public class WatermarkCallbackExecutorTest {
    private AppliedPTransform<?, ?, ?> create;
    private AppliedPTransform<?, ?, ?> sum;
    private WatermarkCallbackExecutor executor = WatermarkCallbackExecutor.create(Executors.newSingleThreadExecutor());

    @Rule
    public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkCallbackExecutorTest$CountDownLatchCallback.class */
    private static class CountDownLatchCallback implements Runnable {
        private final CountDownLatch latch;

        public CountDownLatchCallback(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.latch.countDown();
        }
    }

    @Before
    public void setup() {
        PCollection apply = this.p.apply(Create.of(new Integer[]{1, 2, 3}));
        PCollection apply2 = apply.apply(Sum.integersGlobally());
        DirectGraph graph = DirectGraphs.getGraph(this.p);
        this.create = graph.getProducer(apply);
        this.sum = graph.getProducer(apply2);
    }

    @Test
    public void onGuaranteedFiringFiresAfterTrigger() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.executor.callOnGuaranteedFiring(this.create, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), new CountDownLatchCallback(countDownLatch));
        this.executor.fireForWatermark(this.create, BoundedWindow.TIMESTAMP_MAX_VALUE);
        Assert.assertThat(Boolean.valueOf(countDownLatch.await(500L, TimeUnit.MILLISECONDS)), Matchers.equalTo(true));
    }

    @Test
    public void multipleCallbacksShouldFireFires() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        FixedWindows of = FixedWindows.of(Duration.standardMinutes(10L));
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10L)));
        this.executor.callOnGuaranteedFiring(this.create, intervalWindow, WindowingStrategy.of(of), new CountDownLatchCallback(countDownLatch));
        this.executor.callOnGuaranteedFiring(this.create, intervalWindow, WindowingStrategy.of(of), new CountDownLatchCallback(countDownLatch));
        this.executor.fireForWatermark(this.create, new Instant(0L).plus(Duration.standardMinutes(10L)));
        Assert.assertThat(Boolean.valueOf(countDownLatch.await(500L, TimeUnit.MILLISECONDS)), Matchers.equalTo(true));
    }

    @Test
    public void noCallbacksShouldFire() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        FixedWindows of = FixedWindows.of(Duration.standardMinutes(10L));
        this.executor.callOnGuaranteedFiring(this.create, new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10L))), WindowingStrategy.of(of), new CountDownLatchCallback(countDownLatch));
        this.executor.fireForWatermark(this.create, new Instant(0L).plus(Duration.standardMinutes(5L)));
        Assert.assertThat(Boolean.valueOf(countDownLatch.await(500L, TimeUnit.MILLISECONDS)), Matchers.equalTo(false));
    }

    @Test
    public void unrelatedStepShouldNotFire() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        FixedWindows of = FixedWindows.of(Duration.standardMinutes(10L));
        this.executor.callOnGuaranteedFiring(this.sum, new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10L))), WindowingStrategy.of(of), new CountDownLatchCallback(countDownLatch));
        this.executor.fireForWatermark(this.create, new Instant(0L).plus(Duration.standardMinutes(20L)));
        Assert.assertThat(Boolean.valueOf(countDownLatch.await(500L, TimeUnit.MILLISECONDS)), Matchers.equalTo(false));
    }
}
