package org.apache.beam.runners.direct;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.direct_java.runners.local.Bundle;
import org.apache.beam.repackaged.direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManagerTest.class */
public class WatermarkManagerTest implements Serializable {
    private transient MockClock clock;
    private transient PCollection<Integer> createdInts;
    private transient PCollection<Integer> filtered;
    private transient PCollection<Integer> filteredTimesTwo;
    private transient PCollection<KV<String, Integer>> keyed;
    private transient PCollection<Integer> intsToFlatten;
    private transient PCollection<Integer> flattened;
    private transient WatermarkManager<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> manager;
    private transient BundleFactory bundleFactory;
    private DirectGraph graph;

    @Rule
    public transient ExpectedException thrown = ExpectedException.none();

    @Rule
    public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    @Before
    public void setup() {
        this.createdInts = this.p.apply("createdInts", Create.of(1, new Integer[]{2, 3}));
        this.filtered = this.createdInts.apply("filtered", Filter.greaterThan(1));
        this.filteredTimesTwo = this.filtered.apply("timesTwo", ParDo.of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.runners.direct.WatermarkManagerTest.1
            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) throws Exception {
                processContext.output(Integer.valueOf(((Integer) processContext.element()).intValue() * 2));
            }
        }));
        this.keyed = this.createdInts.apply("keyed", WithKeys.of("MyKey"));
        this.intsToFlatten = this.p.apply("intsToFlatten", Create.of(-1, new Integer[]{256, 65535}));
        this.flattened = PCollectionList.of(this.createdInts).and(this.intsToFlatten).apply("flattened", Flatten.pCollections());
        this.clock = MockClock.fromInstant(new Instant(1000L));
        DirectGraphs.performDirectOverrides(this.p);
        this.graph = DirectGraphs.getGraph(this.p);
        this.manager = WatermarkManager.create(this.clock, this.graph, (v0) -> {
            return v0.getFullName();
        });
        this.bundleFactory = ImmutableListBundleFactory.create();
    }

    @Test
    public void getWatermarkForUntouchedTransform() {
        WatermarkManager.TransformWatermarks watermarks = this.manager.getWatermarks(this.graph.getProducer(this.createdInts));
        Assert.assertThat(watermarks.getInputWatermark(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
        Assert.assertThat(watermarks.getOutputWatermark(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
    }

    @Test
    public void getWatermarkForUpdatedSourceTransform() {
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(multiWindowedBundle(this.createdInts, 1)), new Instant(8000L));
        this.manager.refreshAll();
        Assert.assertThat(this.manager.getWatermarks(this.graph.getProducer(this.createdInts)).getOutputWatermark(), Matchers.equalTo(new Instant(8000L)));
    }

    @Test
    public void getWatermarkForMultiInputTransform() {
        CommittedBundle multiWindowedBundle = multiWindowedBundle(this.intsToFlatten, -1);
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.intsToFlatten), (Bundle) null, Collections.singleton(multiWindowedBundle), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        Assert.assertThat(this.manager.getWatermarks(this.graph.getProducer(this.createdInts)).getOutputWatermark(), Matchers.not(Matchers.greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
        Assert.assertThat(this.manager.getWatermarks(this.graph.getProducer(this.intsToFlatten)).getOutputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
        WatermarkManager.TransformWatermarks watermarks = this.manager.getWatermarks(this.graph.getProducer(this.flattened));
        Assert.assertThat(watermarks.getInputWatermark(), Matchers.not(Matchers.greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
        Assert.assertThat(watermarks.getOutputWatermark(), Matchers.not(Matchers.greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
        CommittedBundle multiWindowedBundle2 = multiWindowedBundle(this.flattened, -1);
        this.manager.updateWatermarks(multiWindowedBundle, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.flattened), multiWindowedBundle.withElements(Collections.emptyList()), Collections.singleton(multiWindowedBundle2), BoundedWindow.TIMESTAMP_MAX_VALUE);
        WatermarkManager.TransformWatermarks watermarks2 = this.manager.getWatermarks(this.graph.getProducer(this.flattened));
        this.manager.updateWatermarks(multiWindowedBundle, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.flattened), multiWindowedBundle.withElements(Collections.emptyList()), Collections.singleton(multiWindowedBundle2), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        Assert.assertThat(watermarks2.getInputWatermark(), Matchers.not(Matchers.greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
        Assert.assertThat(watermarks2.getOutputWatermark(), Matchers.not(Matchers.greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
        Instant instant = new Instant(10000L);
        CommittedBundle timestampedBundle = timestampedBundle(this.createdInts, TimestampedValue.of(5, instant));
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(timestampedBundle), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        Assert.assertThat(this.manager.getWatermarks(this.graph.getProducer(this.createdInts)).getOutputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
        WatermarkManager.TransformWatermarks watermarks3 = this.manager.getWatermarks(this.graph.getProducer(this.flattened));
        Assert.assertThat(watermarks3.getInputWatermark(), Matchers.not(Matchers.greaterThan(instant)));
        Assert.assertThat(watermarks3.getOutputWatermark(), Matchers.not(Matchers.greaterThan(instant)));
        WatermarkManager.TransformWatermarks watermarks4 = this.manager.getWatermarks(this.graph.getProducer(this.flattened));
        Assert.assertThat(watermarks4.getInputWatermark(), Matchers.equalTo(instant));
        Assert.assertThat(watermarks4.getOutputWatermark(), Matchers.equalTo(instant));
        this.manager.updateWatermarks(timestampedBundle, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.flattened), timestampedBundle.withElements(Collections.emptyList()), Collections.singleton(this.bundleFactory.createBundle(this.flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE)), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        WatermarkManager.TransformWatermarks watermarks5 = this.manager.getWatermarks(this.graph.getProducer(this.flattened));
        Assert.assertThat(watermarks5.getInputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
        Assert.assertThat(watermarks5.getOutputWatermark(), Matchers.not(Matchers.greaterThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
    }

    @Test
    public void getWatermarkMultiIdenticalInput() {
        PCollection apply = this.p.apply(Create.of(1, new Integer[]{2, 3}));
        PCollection apply2 = PCollectionList.of(apply).and(apply).apply(Flatten.pCollections());
        DirectGraphVisitor directGraphVisitor = new DirectGraphVisitor();
        this.p.traverseTopologically(directGraphVisitor);
        DirectGraph graph = directGraphVisitor.getGraph();
        AppliedPTransform producer = graph.getProducer(apply2);
        WatermarkManager create = WatermarkManager.create(this.clock, graph, (v0) -> {
            return v0.getFullName();
        });
        CommittedBundle commit = this.bundleFactory.createRootBundle().add(WindowedValue.valueInGlobalWindow((Object) null)).commit(this.clock.now());
        CommittedBundle commit2 = this.bundleFactory.createBundle(apply).add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(33536L))).commit(this.clock.now());
        create.initialize(ImmutableMap.builder().put(graph.getProducer(apply), Collections.singleton(commit)).build());
        create.updateWatermarks(commit, WatermarkManager.TimerUpdate.empty(), graph.getProducer(apply), (Bundle) null, Collections.singleton(commit2), BoundedWindow.TIMESTAMP_MAX_VALUE);
        create.refreshAll();
        WatermarkManager.TransformWatermarks watermarks = create.getWatermarks(producer);
        Assert.assertThat(watermarks.getInputWatermark(), Matchers.equalTo(new Instant(33536L)));
        create.updateWatermarks(commit2, WatermarkManager.TimerUpdate.empty(), producer, (Bundle) null, Collections.emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE);
        create.refreshAll();
        Assert.assertThat(watermarks.getInputWatermark(), Matchers.equalTo(new Instant(33536L)));
        create.updateWatermarks(commit2, WatermarkManager.TimerUpdate.empty(), producer, (Bundle) null, Collections.emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE);
        create.refreshAll();
        Assert.assertThat(watermarks.getInputWatermark(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
    }

    @Test
    public void getWatermarkForMultiConsumedCollection() {
        CommittedBundle timestampedBundle = timestampedBundle(this.createdInts, TimestampedValue.of(1, new Instant(1000000L)), TimestampedValue.of(2, new Instant(1234L)), TimestampedValue.of(3, new Instant(-1000L)));
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(timestampedBundle), new Instant(Long.MAX_VALUE));
        this.manager.refreshAll();
        Assert.assertThat(this.manager.getWatermarks(this.graph.getProducer(this.createdInts)).getOutputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
        this.manager.updateWatermarks(timestampedBundle, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.keyed), timestampedBundle.withElements(Collections.emptyList()), Collections.singleton(timestampedBundle(this.keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1000000L)), TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)))), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        WatermarkManager.TransformWatermarks watermarks = this.manager.getWatermarks(this.graph.getProducer(this.keyed));
        Assert.assertThat(watermarks.getInputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
        Assert.assertThat(watermarks.getOutputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
        WatermarkManager.TransformWatermarks watermarks2 = this.manager.getWatermarks(this.graph.getProducer(this.filtered));
        Assert.assertThat(watermarks2.getInputWatermark(), Matchers.not(Matchers.greaterThan(new Instant(-1000L))));
        Assert.assertThat(watermarks2.getOutputWatermark(), Matchers.not(Matchers.greaterThan(new Instant(-1000L))));
        this.manager.updateWatermarks(timestampedBundle, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.filtered), timestampedBundle.withElements(Collections.emptyList()), Collections.singleton(timestampedBundle(this.filtered, TimestampedValue.of(2, new Instant(1234L)))), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        WatermarkManager.TransformWatermarks watermarks3 = this.manager.getWatermarks(this.graph.getProducer(this.filtered));
        Assert.assertThat(watermarks3.getInputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
        Assert.assertThat(watermarks3.getOutputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
    }

    @Test
    public void updateWatermarkWithWatermarkHolds() {
        CommittedBundle timestampedBundle = timestampedBundle(this.createdInts, TimestampedValue.of(1, new Instant(1000000L)), TimestampedValue.of(2, new Instant(1234L)), TimestampedValue.of(3, new Instant(-1000L)));
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(timestampedBundle), new Instant(Long.MAX_VALUE));
        this.manager.updateWatermarks(timestampedBundle, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.keyed), timestampedBundle.withElements(Collections.emptyList()), Collections.singleton(timestampedBundle(this.keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1000000L)), TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)))), new Instant(500L));
        this.manager.refreshAll();
        WatermarkManager.TransformWatermarks watermarks = this.manager.getWatermarks(this.graph.getProducer(this.keyed));
        Assert.assertThat(watermarks.getInputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
        Assert.assertThat(watermarks.getOutputWatermark(), Matchers.not(Matchers.greaterThan(new Instant(500L))));
    }

    @Test
    public void updateWatermarkWithKeyedWatermarkHolds() {
        CommittedBundle commit = this.bundleFactory.createKeyedBundle(StructuralKey.of("Odd", StringUtf8Coder.of()), this.createdInts).add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1000000L))).add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L))).commit(this.clock.now());
        CommittedBundle commit2 = this.bundleFactory.createKeyedBundle(StructuralKey.of("Even", StringUtf8Coder.of()), this.createdInts).add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L))).commit(this.clock.now());
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, ImmutableList.of(commit, commit2), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.updateWatermarks(commit, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.filtered), commit.withElements(Collections.emptyList()), Collections.emptyList(), new Instant(-1000L));
        this.manager.updateWatermarks(commit2, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.filtered), commit2.withElements(Collections.emptyList()), Collections.emptyList(), new Instant(1234L));
        this.manager.refreshAll();
        WatermarkManager.TransformWatermarks watermarks = this.manager.getWatermarks(this.graph.getProducer(this.filtered));
        Assert.assertThat(watermarks.getInputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
        Assert.assertThat(watermarks.getOutputWatermark(), Matchers.not(Matchers.greaterThan(new Instant(-1000L))));
        CommittedBundle commit3 = this.bundleFactory.createKeyedBundle(StructuralKey.of("Odd", StringUtf8Coder.of()), this.createdInts).commit(this.clock.now());
        this.manager.updateWatermarks(commit3, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.filtered), commit3.withElements(Collections.emptyList()), Collections.emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        Assert.assertThat(watermarks.getOutputWatermark(), Matchers.equalTo(new Instant(1234L)));
        CommittedBundle commit4 = this.bundleFactory.createKeyedBundle(StructuralKey.of("Even", StringUtf8Coder.of()), this.createdInts).commit(this.clock.now());
        this.manager.updateWatermarks(commit4, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.filtered), commit4.withElements(Collections.emptyList()), Collections.emptyList(), new Instant(5678L));
        this.manager.refreshAll();
        Assert.assertThat(watermarks.getOutputWatermark(), Matchers.equalTo(new Instant(5678L)));
        this.manager.updateWatermarks(commit4, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.filtered), commit4.withElements(Collections.emptyList()), Collections.emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        Assert.assertThat(watermarks.getOutputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
    }

    @Test
    public void updateOutputWatermarkShouldBeMonotonic() {
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(this.bundleFactory.createBundle(this.createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE)), new Instant(0L));
        this.manager.refreshAll();
        Assert.assertThat(this.manager.getWatermarks(this.graph.getProducer(this.createdInts)).getOutputWatermark(), Matchers.equalTo(new Instant(0L)));
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(this.bundleFactory.createBundle(this.createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE)), new Instant(-250L));
        this.manager.refreshAll();
        Assert.assertThat(this.manager.getWatermarks(this.graph.getProducer(this.createdInts)).getOutputWatermark(), Matchers.not(Matchers.lessThan(new Instant(0L))));
    }

    @Test
    public void updateWatermarkWithHoldsShouldBeMonotonic() {
        CommittedBundle timestampedBundle = timestampedBundle(this.createdInts, TimestampedValue.of(1, new Instant(1000000L)), TimestampedValue.of(2, new Instant(1234L)), TimestampedValue.of(3, new Instant(-1000L)));
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(timestampedBundle), new Instant(Long.MAX_VALUE));
        this.manager.updateWatermarks(timestampedBundle, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.keyed), timestampedBundle.withElements(Collections.emptyList()), Collections.singleton(timestampedBundle(this.keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1000000L)), TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)))), new Instant(500L));
        this.manager.refreshAll();
        WatermarkManager.TransformWatermarks watermarks = this.manager.getWatermarks(this.graph.getProducer(this.keyed));
        Assert.assertThat(watermarks.getInputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
        Assert.assertThat(watermarks.getOutputWatermark(), Matchers.not(Matchers.greaterThan(new Instant(500L))));
        Instant outputWatermark = watermarks.getOutputWatermark();
        WatermarkManager.TransformWatermarks watermarks2 = this.manager.getWatermarks(this.graph.getProducer(this.keyed));
        Assert.assertThat(watermarks2.getInputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
        Assert.assertThat(watermarks2.getOutputWatermark(), Matchers.equalTo(outputWatermark));
    }

    @Test
    public void updateWatermarkWithUnprocessedElements() {
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(1);
        WindowedValue timestampedValueInGlobalWindow = WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-1000L));
        WindowedValue timestampedValueInGlobalWindow2 = WindowedValue.timestampedValueInGlobalWindow(3, new Instant(1234L));
        CommittedBundle commit = this.bundleFactory.createBundle(this.createdInts).add(valueInGlobalWindow).add(timestampedValueInGlobalWindow).add(timestampedValueInGlobalWindow2).commit(this.clock.now());
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(commit), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.updateWatermarks(commit, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.keyed), commit.withElements(ImmutableList.of(timestampedValueInGlobalWindow, timestampedValueInGlobalWindow2)), Collections.singleton(timestampedBundle(this.keyed, TimestampedValue.of(KV.of("MyKey", 1), BoundedWindow.TIMESTAMP_MIN_VALUE))), BoundedWindow.TIMESTAMP_MAX_VALUE);
        Assert.assertThat(this.manager.getWatermarks(this.graph.getProducer(this.keyed)).getInputWatermark(), Matchers.not(Matchers.greaterThan(new Instant(-1000L))));
    }

    @Test
    public void updateWatermarkWithCompletedElementsNotPending() {
        CommittedBundle commit = this.bundleFactory.createBundle(this.createdInts).add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(22L))).commit(this.clock.now());
        CommittedBundle commit2 = this.bundleFactory.createBundle(this.createdInts).add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(22L))).commit(this.clock.now());
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(commit), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.updateWatermarks(commit2, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.filtered), commit2.withElements(Collections.emptyList()), Collections.emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        Assert.assertThat(this.manager.getWatermarks(this.graph.getProducer(this.filtered)).getInputWatermark(), Matchers.equalTo(new Instant(22L)));
    }

    @Test
    public void updateWatermarkWithLateData() {
        Instant instant = new Instant(1000000L);
        CommittedBundle timestampedBundle = timestampedBundle(this.createdInts, TimestampedValue.of(1, instant), TimestampedValue.of(2, new Instant(1234L)));
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(timestampedBundle), instant);
        this.manager.updateWatermarks(timestampedBundle, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.keyed), timestampedBundle.withElements(Collections.emptyList()), Collections.singleton(timestampedBundle(this.keyed, TimestampedValue.of(KV.of("MyKey", 1), instant), TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)))), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        WatermarkManager.TransformWatermarks watermarks = this.manager.getWatermarks(this.graph.getProducer(this.keyed));
        Assert.assertThat(watermarks.getInputWatermark(), Matchers.equalTo(instant));
        Assert.assertThat(watermarks.getOutputWatermark(), Matchers.equalTo(instant));
        CommittedBundle timestampedBundle2 = timestampedBundle(this.createdInts, TimestampedValue.of(3, new Instant(-1000L)));
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), timestampedBundle.withElements(Collections.emptyList()), Collections.singleton(timestampedBundle2), new Instant(2000000L));
        this.manager.refreshAll();
        Assert.assertThat(this.manager.getWatermarks(this.graph.getProducer(this.createdInts)).getOutputWatermark(), Matchers.equalTo(new Instant(2000000L)));
        WatermarkManager.TransformWatermarks watermarks2 = this.manager.getWatermarks(this.graph.getProducer(this.keyed));
        Assert.assertThat(watermarks2.getInputWatermark(), Matchers.not(Matchers.lessThan(instant)));
        Assert.assertThat(watermarks2.getOutputWatermark(), Matchers.not(Matchers.lessThan(instant)));
        this.manager.updateWatermarks(timestampedBundle2, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.keyed), timestampedBundle2.withElements(Collections.emptyList()), Collections.singleton(timestampedBundle(this.keyed, TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)))), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
    }

    @Test
    @Ignore("https://issues.apache.org/jira/browse/BEAM-4191")
    public void updateWatermarkWithDifferentWindowedValueInstances() {
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(this.bundleFactory.createBundle(this.createdInts).add(WindowedValue.valueInGlobalWindow(1)).commit(Instant.now())), BoundedWindow.TIMESTAMP_MAX_VALUE);
        CommittedBundle commit = this.bundleFactory.createBundle(this.createdInts).add(WindowedValue.valueInGlobalWindow(1)).commit(Instant.now());
        this.manager.updateWatermarks(commit, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.keyed), commit.withElements(Collections.emptyList()), Collections.emptyList(), (Instant) null);
        this.manager.refreshAll();
        Assert.assertThat(this.manager.getWatermarks(this.graph.getProducer(this.keyed)).getInputWatermark(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
    }

    @Test
    public void getWatermarksAfterOnlyEmptyOutput() {
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(multiWindowedBundle(this.createdInts, new Integer[0])), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        Assert.assertThat(this.manager.getWatermarks(this.graph.getProducer(this.createdInts)).getOutputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
        WatermarkManager.TransformWatermarks watermarks = this.manager.getWatermarks(this.graph.getProducer(this.filtered));
        Assert.assertThat(watermarks.getInputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
        Assert.assertThat(watermarks.getOutputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
    }

    @Test
    public void getWatermarksAfterHoldAndEmptyOutput() {
        CommittedBundle multiWindowedBundle = multiWindowedBundle(this.createdInts, 1, 2);
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(multiWindowedBundle), new Instant(12000L));
        this.manager.updateWatermarks(multiWindowedBundle, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.filtered), multiWindowedBundle.withElements(Collections.emptyList()), Collections.singleton(multiWindowedBundle(this.filtered, new Integer[0])), new Instant(10000L));
        this.manager.refreshAll();
        WatermarkManager.TransformWatermarks watermarks = this.manager.getWatermarks(this.graph.getProducer(this.filtered));
        Assert.assertThat(watermarks.getInputWatermark(), Matchers.not(Matchers.lessThan(new Instant(12000L))));
        Assert.assertThat(watermarks.getOutputWatermark(), Matchers.not(Matchers.greaterThan(new Instant(10000L))));
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(multiWindowedBundle(this.createdInts, new Integer[0])), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        Assert.assertThat(this.manager.getWatermarks(this.graph.getProducer(this.createdInts)).getOutputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
        WatermarkManager.TransformWatermarks watermarks2 = this.manager.getWatermarks(this.graph.getProducer(this.filtered));
        Assert.assertThat(watermarks2.getInputWatermark(), Matchers.not(Matchers.lessThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
        Assert.assertThat(watermarks2.getOutputWatermark(), Matchers.not(Matchers.greaterThan(new Instant(10000L))));
    }

    @Test
    public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() {
        WatermarkManager.TransformWatermarks watermarks = this.manager.getWatermarks(this.graph.getProducer(this.createdInts));
        Assert.assertThat(watermarks.getSynchronizedProcessingInputTime(), Matchers.equalTo(this.clock.now()));
        Assert.assertThat(watermarks.getSynchronizedProcessingOutputTime(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
        WatermarkManager.TransformWatermarks watermarks2 = this.manager.getWatermarks(this.graph.getProducer(this.filtered));
        Assert.assertThat(watermarks2.getSynchronizedProcessingInputTime(), Matchers.not(Matchers.greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
        Assert.assertThat(watermarks2.getSynchronizedProcessingOutputTime(), Matchers.not(Matchers.greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
        CommittedBundle commit = this.bundleFactory.createBundle(this.createdInts).commit(new Instant(1250L));
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(commit), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        WatermarkManager.TransformWatermarks watermarks3 = this.manager.getWatermarks(this.graph.getProducer(this.createdInts));
        Assert.assertThat(watermarks3.getSynchronizedProcessingInputTime(), Matchers.equalTo(this.clock.now()));
        Assert.assertThat(watermarks3.getSynchronizedProcessingOutputTime(), Matchers.equalTo(this.clock.now()));
        WatermarkManager.TransformWatermarks watermarks4 = this.manager.getWatermarks(this.graph.getProducer(this.filtered));
        Assert.assertThat(watermarks4.getSynchronizedProcessingInputTime(), Matchers.not(Matchers.greaterThan(this.clock.now())));
        Assert.assertThat(watermarks4.getSynchronizedProcessingOutputTime(), Matchers.not(Matchers.greaterThan(this.clock.now())));
        this.clock.set(new Instant(1500L));
        Assert.assertThat(watermarks3.getSynchronizedProcessingInputTime(), Matchers.equalTo(this.clock.now()));
        Assert.assertThat(watermarks3.getSynchronizedProcessingOutputTime(), Matchers.equalTo(this.clock.now()));
        Assert.assertThat(watermarks4.getSynchronizedProcessingInputTime(), Matchers.not(Matchers.greaterThan(new Instant(1250L))));
        Assert.assertThat(watermarks4.getSynchronizedProcessingOutputTime(), Matchers.not(Matchers.greaterThan(new Instant(1250L))));
        this.manager.updateWatermarks(commit, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.filtered), commit.withElements(Collections.emptyList()), Collections.singleton(this.bundleFactory.createBundle(this.intsToFlatten).commit(new Instant(1250L))), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        WatermarkManager.TransformWatermarks watermarks5 = this.manager.getWatermarks(this.graph.getProducer(this.filtered));
        Assert.assertThat(watermarks5.getSynchronizedProcessingInputTime(), Matchers.not(Matchers.greaterThan(watermarks3.getSynchronizedProcessingOutputTime())));
        Assert.assertThat(watermarks5.getSynchronizedProcessingOutputTime(), Matchers.not(Matchers.greaterThan(watermarks5.getSynchronizedProcessingInputTime())));
    }

    @Test
    public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
        CommittedBundle multiWindowedBundle = multiWindowedBundle(this.createdInts, 1, 2, 4, 8);
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(multiWindowedBundle), new Instant(1248L));
        this.manager.refreshAll();
        WatermarkManager.TransformWatermarks watermarks = this.manager.getWatermarks(this.graph.getProducer(this.filtered));
        WatermarkManager.TransformWatermarks watermarks2 = this.manager.getWatermarks(this.graph.getProducer(this.filteredTimesTwo));
        Instant synchronizedProcessingOutputTime = watermarks.getSynchronizedProcessingOutputTime();
        Instant synchronizedProcessingOutputTime2 = watermarks2.getSynchronizedProcessingOutputTime();
        StructuralKey of = StructuralKey.of("key", StringUtf8Coder.of());
        CommittedBundle multiWindowedBundle2 = multiWindowedBundle(this.filtered, 2, 8);
        TimerInternals.TimerData of2 = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(250L), new Instant(250L), TimeDomain.PROCESSING_TIME);
        this.manager.updateWatermarks(multiWindowedBundle, WatermarkManager.TimerUpdate.builder(of).setTimer(of2).setTimer(TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(4096L), new Instant(4096L), TimeDomain.PROCESSING_TIME)).build(), this.graph.getProducer(this.filtered), multiWindowedBundle.withElements(Collections.emptyList()), Collections.singleton(multiWindowedBundle2), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        Instant now = this.clock.now();
        this.clock.set(now.plus(250L));
        Assert.assertThat(watermarks.getSynchronizedProcessingOutputTime(), Matchers.not(Matchers.greaterThan(now)));
        Assert.assertThat(watermarks2.getSynchronizedProcessingOutputTime(), Matchers.not(Matchers.greaterThan(now)));
        Assert.assertThat(watermarks.getSynchronizedProcessingOutputTime(), Matchers.not(Matchers.lessThan(synchronizedProcessingOutputTime)));
        Assert.assertThat(watermarks2.getSynchronizedProcessingOutputTime(), Matchers.not(Matchers.lessThan(synchronizedProcessingOutputTime2)));
        Assert.assertThat(((WatermarkManager.FiredTimers) Iterables.getOnlyElement(this.manager.extractFiredTimers())).getTimers(), Matchers.contains(new TimerInternals.TimerData[]{of2}));
        Assert.assertThat(watermarks.getSynchronizedProcessingOutputTime(), Matchers.not(Matchers.greaterThan(now)));
        Assert.assertThat(watermarks2.getSynchronizedProcessingOutputTime(), Matchers.not(Matchers.greaterThan(now)));
        CommittedBundle commit = this.bundleFactory.createKeyedBundle(of, this.filtered).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
        CommittedBundle commit2 = this.bundleFactory.createKeyedBundle(of, this.filteredTimesTwo).commit(watermarks.getSynchronizedProcessingOutputTime());
        this.manager.updateWatermarks(commit, WatermarkManager.TimerUpdate.builder(of).withCompletedTimers(Collections.singleton(of2)).build(), this.graph.getProducer(this.filtered), commit.withElements(Collections.emptyList()), Collections.singleton(commit2), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        this.clock.set(now.plus(500L));
        Assert.assertThat(watermarks.getSynchronizedProcessingOutputTime(), Matchers.not(Matchers.greaterThan(this.clock.now())));
        Assert.assertThat(watermarks2.getSynchronizedProcessingOutputTime(), Matchers.not(Matchers.lessThan(commit2.getSynchronizedProcessingOutputWatermark())));
        this.manager.updateWatermarks(commit2, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.filteredTimesTwo), commit2.withElements(Collections.emptyList()), Collections.emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        this.clock.set(new Instant(Long.MAX_VALUE));
        Assert.assertThat(watermarks2.getSynchronizedProcessingOutputTime(), Matchers.not(Matchers.greaterThan(new Instant(4096L))));
    }

    @Test
    public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() {
        Assert.assertThat(this.manager.getWatermarks(this.graph.getProducer(this.createdInts)).getSynchronizedProcessingInputTime(), Matchers.equalTo(this.clock.now()));
        WatermarkManager.TransformWatermarks watermarks = this.manager.getWatermarks(this.graph.getProducer(this.filtered));
        Assert.assertThat(watermarks.getSynchronizedProcessingInputTime(), Matchers.not(Matchers.greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
        Assert.assertThat(watermarks.getSynchronizedProcessingOutputTime(), Matchers.not(Matchers.greaterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(this.bundleFactory.createBundle(this.createdInts).commit(new Instant(1250L))), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        WatermarkManager.TransformWatermarks watermarks2 = this.manager.getWatermarks(this.graph.getProducer(this.createdInts));
        Assert.assertThat(watermarks2.getSynchronizedProcessingInputTime(), Matchers.not(Matchers.greaterThan(this.clock.now())));
        Assert.assertThat(watermarks2.getSynchronizedProcessingOutputTime(), Matchers.not(Matchers.greaterThan(this.clock.now())));
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(this.bundleFactory.createBundle(this.createdInts).commit(new Instant(750L))), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        Assert.assertThat(watermarks2.getSynchronizedProcessingOutputTime(), Matchers.equalTo(this.clock.now()));
    }

    @Test
    public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers() {
        CommittedBundle multiWindowedBundle = multiWindowedBundle(this.createdInts, 1, 2, 3);
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(multiWindowedBundle), new Instant(40900L));
        this.manager.refreshAll();
        CommittedBundle multiWindowedBundle2 = multiWindowedBundle(this.filtered, 2, 4);
        Instant instant = new Instant(2048L);
        TimerInternals.TimerData of = TimerInternals.TimerData.of(StateNamespaces.global(), instant, instant, TimeDomain.PROCESSING_TIME);
        this.manager.updateWatermarks(multiWindowedBundle, WatermarkManager.TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of())).setTimer(of).build(), this.graph.getProducer(this.filtered), multiWindowedBundle.withElements(Collections.emptyList()), Collections.singleton(multiWindowedBundle2), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        WatermarkManager.TransformWatermarks watermarks = this.manager.getWatermarks(this.graph.getProducer(this.filteredTimesTwo));
        Assert.assertThat(watermarks.getSynchronizedProcessingInputTime(), Matchers.equalTo(this.clock.now()));
        this.clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
        Assert.assertThat(watermarks.getSynchronizedProcessingInputTime(), Matchers.equalTo(instant));
        this.manager.extractFiredTimers();
        Assert.assertThat(watermarks.getSynchronizedProcessingInputTime(), Matchers.equalTo(instant));
        CommittedBundle multiWindowedBundle3 = multiWindowedBundle(this.createdInts, 4, 8, 12);
        this.manager.updateWatermarks(multiWindowedBundle3, WatermarkManager.TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of())).withCompletedTimers(Collections.singleton(of)).build(), this.graph.getProducer(this.filtered), multiWindowedBundle3.withElements(Collections.emptyList()), Collections.emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        Assert.assertThat(watermarks.getSynchronizedProcessingInputTime(), Matchers.not(Matchers.greaterThan(this.clock.now())));
    }

    @Test
    public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() {
        CommittedBundle multiWindowedBundle = multiWindowedBundle(this.createdInts, 1, 2, 3);
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(multiWindowedBundle), new Instant(29919235L));
        Instant instant = new Instant(2048L);
        this.manager.updateWatermarks(multiWindowedBundle, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.filtered), multiWindowedBundle.withElements(Collections.emptyList()), Collections.singleton(this.bundleFactory.createKeyedBundle(StructuralKey.of("key", StringUtf8Coder.of()), this.filtered).commit(instant)), BoundedWindow.TIMESTAMP_MAX_VALUE);
        this.manager.refreshAll();
        WatermarkManager.TransformWatermarks watermarks = this.manager.getWatermarks(this.graph.getProducer(this.filteredTimesTwo));
        Assert.assertThat(watermarks.getSynchronizedProcessingInputTime(), Matchers.equalTo(this.clock.now()));
        this.clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
        Assert.assertThat(watermarks.getSynchronizedProcessingInputTime(), Matchers.equalTo(instant));
    }

    @Test
    public void extractFiredTimersReturnsFiredEventTimeTimers() {
        Assert.assertThat(this.manager.extractFiredTimers(), Matchers.emptyIterable());
        CommittedBundle multiWindowedBundle = multiWindowedBundle(this.filtered, new Integer[0]);
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(multiWindowedBundle), new Instant(1500L));
        this.manager.refreshAll();
        TimerInternals.TimerData of = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(1000L), new Instant(1000L), TimeDomain.EVENT_TIME);
        TimerInternals.TimerData of2 = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(5000L), new Instant(5000L), TimeDomain.EVENT_TIME);
        TimerInternals.TimerData of3 = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(10000L), new Instant(10000L), TimeDomain.EVENT_TIME);
        StructuralKey of4 = StructuralKey.of(new byte[]{1, 4, 9}, ByteArrayCoder.of());
        this.manager.updateWatermarks(multiWindowedBundle, WatermarkManager.TimerUpdate.builder(of4).setTimer(of).setTimer(of2).setTimer(of3).build(), this.graph.getProducer(this.filtered), multiWindowedBundle.withElements(Collections.emptyList()), Collections.singleton(multiWindowedBundle(this.intsToFlatten, new Integer[0])), new Instant(1000L));
        this.manager.refreshAll();
        Collection extractFiredTimers = this.manager.extractFiredTimers();
        Assert.assertThat(extractFiredTimers, Matchers.not(Matchers.emptyIterable()));
        WatermarkManager.FiredTimers firedTimers = (WatermarkManager.FiredTimers) Iterables.getOnlyElement(extractFiredTimers);
        Assert.assertThat(firedTimers.getTimers(), Matchers.contains(new TimerInternals.TimerData[]{of}));
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.emptyList(), new Instant(50000L));
        this.manager.refreshAll();
        Assert.assertTrue(this.manager.extractFiredTimers().isEmpty());
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.builder(of4).withCompletedTimers(firedTimers.getTimers()).build(), this.graph.getProducer(this.filtered), (Bundle) null, Collections.emptyList(), new Instant(1000L));
        this.manager.refreshAll();
        Collection extractFiredTimers2 = this.manager.extractFiredTimers();
        Assert.assertThat(extractFiredTimers2, Matchers.not(Matchers.emptyIterable()));
        Assert.assertThat(((WatermarkManager.FiredTimers) Iterables.getOnlyElement(extractFiredTimers2)).getTimers(), Matchers.contains(new TimerInternals.TimerData[]{of2, of3}));
    }

    @Test
    public void extractFiredTimersReturnsFiredProcessingTimeTimers() {
        Assert.assertThat(this.manager.extractFiredTimers(), Matchers.emptyIterable());
        CommittedBundle multiWindowedBundle = multiWindowedBundle(this.filtered, new Integer[0]);
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(multiWindowedBundle), new Instant(1500L));
        TimerInternals.TimerData of = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(999L), new Instant(999L), TimeDomain.PROCESSING_TIME);
        TimerInternals.TimerData of2 = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(5000L), new Instant(5000L), TimeDomain.PROCESSING_TIME);
        TimerInternals.TimerData of3 = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(10000L), new Instant(10000L), TimeDomain.PROCESSING_TIME);
        StructuralKey of4 = StructuralKey.of(-12L, VarLongCoder.of());
        this.manager.updateWatermarks(multiWindowedBundle, WatermarkManager.TimerUpdate.builder(of4).setTimer(of3).setTimer(of).setTimer(of2).build(), this.graph.getProducer(this.filtered), multiWindowedBundle.withElements(Collections.emptyList()), Collections.singleton(multiWindowedBundle(this.intsToFlatten, new Integer[0])), new Instant(1000L));
        this.manager.refreshAll();
        Collection extractFiredTimers = this.manager.extractFiredTimers();
        Assert.assertThat(extractFiredTimers, Matchers.not(Matchers.emptyIterable()));
        WatermarkManager.FiredTimers firedTimers = (WatermarkManager.FiredTimers) Iterables.getOnlyElement(extractFiredTimers);
        Assert.assertThat(firedTimers.getTimers(), Matchers.contains(new TimerInternals.TimerData[]{of}));
        this.clock.set(new Instant(50000L));
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.emptyList(), new Instant(50000L));
        this.manager.refreshAll();
        Assert.assertTrue(this.manager.extractFiredTimers().isEmpty());
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.builder(of4).withCompletedTimers(firedTimers.getTimers()).build(), this.graph.getProducer(this.filtered), (Bundle) null, Collections.emptyList(), new Instant(1000L));
        this.manager.refreshAll();
        Collection extractFiredTimers2 = this.manager.extractFiredTimers();
        Assert.assertThat(extractFiredTimers2, Matchers.not(Matchers.emptyIterable()));
        Assert.assertThat(((WatermarkManager.FiredTimers) Iterables.getOnlyElement(extractFiredTimers2)).getTimers(), Matchers.contains(new TimerInternals.TimerData[]{of2, of3}));
    }

    @Test
    public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() {
        Assert.assertThat(this.manager.extractFiredTimers(), Matchers.emptyIterable());
        CommittedBundle multiWindowedBundle = multiWindowedBundle(this.filtered, new Integer[0]);
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(multiWindowedBundle), new Instant(1500L));
        TimerInternals.TimerData of = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(999L), new Instant(999L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
        TimerInternals.TimerData of2 = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(5000L), new Instant(5000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
        TimerInternals.TimerData of3 = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(10000L), new Instant(10000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
        StructuralKey of4 = StructuralKey.of(new byte[]{2, -2, 22}, ByteArrayCoder.of());
        this.manager.updateWatermarks(multiWindowedBundle, WatermarkManager.TimerUpdate.builder(of4).setTimer(of3).setTimer(of).setTimer(of2).build(), this.graph.getProducer(this.filtered), multiWindowedBundle.withElements(Collections.emptyList()), Collections.singleton(multiWindowedBundle(this.intsToFlatten, new Integer[0])), new Instant(1000L));
        this.manager.refreshAll();
        Collection extractFiredTimers = this.manager.extractFiredTimers();
        Assert.assertThat(extractFiredTimers, Matchers.not(Matchers.emptyIterable()));
        WatermarkManager.FiredTimers firedTimers = (WatermarkManager.FiredTimers) Iterables.getOnlyElement(extractFiredTimers);
        Assert.assertThat(firedTimers.getTimers(), Matchers.contains(new TimerInternals.TimerData[]{of}));
        this.clock.set(new Instant(50000L));
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.emptyList(), new Instant(50000L));
        this.manager.refreshAll();
        Assert.assertTrue(this.manager.extractFiredTimers().isEmpty());
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.builder(of4).withCompletedTimers(firedTimers.getTimers()).build(), this.graph.getProducer(this.filtered), (Bundle) null, Collections.emptyList(), new Instant(1000L));
        Collection extractFiredTimers2 = this.manager.extractFiredTimers();
        Assert.assertThat(extractFiredTimers2, Matchers.not(Matchers.emptyIterable()));
        Assert.assertThat(((WatermarkManager.FiredTimers) Iterables.getOnlyElement(extractFiredTimers2)).getTimers(), Matchers.contains(new TimerInternals.TimerData[]{of2, of3}));
    }

    @Test
    public void processingTimeTimersCanBeReset() {
        Assert.assertThat(this.manager.extractFiredTimers(), Matchers.emptyIterable());
        StructuralKey of = StructuralKey.of(-12L, VarLongCoder.of());
        TimerInternals.TimerData of2 = TimerInternals.TimerData.of("myTimer", StateNamespaces.global(), new Instant(5000L), new Instant(5000L), TimeDomain.PROCESSING_TIME);
        TimerInternals.TimerData of3 = TimerInternals.TimerData.of("myTimer", StateNamespaces.global(), new Instant(10000L), new Instant(10000L), TimeDomain.PROCESSING_TIME);
        WatermarkManager.TimerUpdate build = WatermarkManager.TimerUpdate.builder(of).setTimer(of2).build();
        WatermarkManager.TimerUpdate build2 = WatermarkManager.TimerUpdate.builder(of).setTimer(of3).build();
        this.manager.updateWatermarks((Bundle) null, build, this.graph.getProducer(this.createdInts), (Bundle) null, Collections.emptyList(), new Instant(5000L));
        this.manager.refreshAll();
        this.manager.updateWatermarks((Bundle) null, build2, this.graph.getProducer(this.createdInts), (Bundle) null, Collections.emptyList(), new Instant(10000L));
        this.clock.set(new Instant(50000L));
        this.manager.refreshAll();
        Collection extractFiredTimers = this.manager.extractFiredTimers();
        Assert.assertThat(extractFiredTimers, Matchers.not(Matchers.emptyIterable()));
        Assert.assertThat(((WatermarkManager.FiredTimers) Iterables.getOnlyElement(extractFiredTimers)).getTimers(), Matchers.contains(new TimerInternals.TimerData[]{of3}));
    }

    @Test
    public void eventTimeTimersCanBeReset() {
        Assert.assertThat(this.manager.extractFiredTimers(), Matchers.emptyIterable());
        StructuralKey of = StructuralKey.of(-12L, VarLongCoder.of());
        TimerInternals.TimerData of2 = TimerInternals.TimerData.of("myTimer", StateNamespaces.global(), new Instant(1000L), new Instant(1000L), TimeDomain.EVENT_TIME);
        TimerInternals.TimerData of3 = TimerInternals.TimerData.of("myTimer", StateNamespaces.global(), new Instant(2000L), new Instant(2000L), TimeDomain.EVENT_TIME);
        WatermarkManager.TimerUpdate build = WatermarkManager.TimerUpdate.builder(of).setTimer(of2).build();
        WatermarkManager.TimerUpdate build2 = WatermarkManager.TimerUpdate.builder(of).setTimer(of3).build();
        CommittedBundle multiWindowedBundle = multiWindowedBundle(this.filtered, new Integer[0]);
        this.manager.updateWatermarks(multiWindowedBundle, build, this.graph.getProducer(this.filtered), multiWindowedBundle.withElements(Collections.emptyList()), Collections.singleton(multiWindowedBundle(this.intsToFlatten, new Integer[0])), new Instant(1000L));
        this.manager.refreshAll();
        this.manager.updateWatermarks(multiWindowedBundle, build2, this.graph.getProducer(this.filtered), multiWindowedBundle.withElements(Collections.emptyList()), Collections.singleton(multiWindowedBundle(this.intsToFlatten, new Integer[0])), new Instant(1000L));
        this.manager.refreshAll();
        this.manager.updateWatermarks((Bundle) null, WatermarkManager.TimerUpdate.empty(), this.graph.getProducer(this.createdInts), (Bundle) null, Collections.singleton(multiWindowedBundle), new Instant(3000L));
        this.manager.refreshAll();
        Collection extractFiredTimers = this.manager.extractFiredTimers();
        Assert.assertThat(extractFiredTimers, Matchers.not(Matchers.emptyIterable()));
        Assert.assertThat(((WatermarkManager.FiredTimers) Iterables.getOnlyElement(extractFiredTimers)).getTimers(), Matchers.contains(new TimerInternals.TimerData[]{of3}));
    }

    @Test
    public void inputWatermarkDuplicates() {
        WatermarkManager.Watermark watermark = (WatermarkManager.Watermark) Mockito.mock(WatermarkManager.Watermark.class);
        WatermarkManager.AppliedPTransformInputWatermark appliedPTransformInputWatermark = new WatermarkManager.AppliedPTransformInputWatermark("underTest", ImmutableList.of(watermark), timerData -> {
        });
        Mockito.when(watermark.get()).thenReturn(new Instant(0L));
        appliedPTransformInputWatermark.refresh();
        Assert.assertEquals(new Instant(0L), appliedPTransformInputWatermark.get());
        StructuralKey of = StructuralKey.of("key", StringUtf8Coder.of());
        TimerInternals.TimerData of2 = TimerInternals.TimerData.of("a", StateNamespaces.global(), new Instant(100L), new Instant(100L), TimeDomain.EVENT_TIME);
        TimerInternals.TimerData of3 = TimerInternals.TimerData.of("a", StateNamespaces.global(), new Instant(200L), new Instant(200L), TimeDomain.EVENT_TIME);
        appliedPTransformInputWatermark.updateTimers(WatermarkManager.TimerUpdate.builder(of).setTimer(of2).setTimer(of3).build());
        Assert.assertEquals(of3.getTimestamp(), appliedPTransformInputWatermark.getEarliestTimerTimestamp());
        Mockito.when(watermark.get()).thenReturn(new Instant(1000L));
        appliedPTransformInputWatermark.refresh();
        Assert.assertEquals(new Instant(1000L), appliedPTransformInputWatermark.get());
        List list = (List) appliedPTransformInputWatermark.extractFiredEventTimeTimers().get(of);
        Assert.assertNotNull(list);
        Assert.assertThat(list, Matchers.contains(new TimerInternals.TimerData[]{of3}));
        appliedPTransformInputWatermark.updateTimers(WatermarkManager.TimerUpdate.builder(of).withCompletedTimers(list).build());
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, appliedPTransformInputWatermark.getEarliestTimerTimestamp());
        Assert.assertThat(appliedPTransformInputWatermark.extractFiredEventTimeTimers().entrySet(), Matchers.empty());
    }

    @Test
    public void timerUpdateBuilderBuildAddsAllAddedTimers() {
        TimerInternals.TimerData of = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(10L), new Instant(10L), TimeDomain.EVENT_TIME);
        TimerInternals.TimerData of2 = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(24L), new Instant(24L), TimeDomain.PROCESSING_TIME);
        TimerInternals.TimerData of3 = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(1024L), new Instant(1024L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
        TimerInternals.TimerData of4 = TimerInternals.TimerData.of(StateNamespaces.global(), new Instant(2048L), new Instant(2048L), TimeDomain.EVENT_TIME);
        WatermarkManager.TimerUpdate build = WatermarkManager.TimerUpdate.builder(StructuralKey.of("foo", StringUtf8Coder.of())).withCompletedTimers(ImmutableList.of(of3, of4)).setTimer(of).deletedTimer(of2).build();
        Assert.assertThat(build.getCompletedTimers(), Matchers.containsInAnyOrder(new TimerInternals.TimerData[]{of3, of4}));
        Assert.assertThat(build.getSetTimers(), Matchers.contains(new TimerInternals.TimerData[]{of}));
        Assert.assertThat(build.getDeletedTimers(), Matchers.contains(new TimerInternals.TimerData[]{of2}));
    }

    @Test
    public void timerUpdateBuilderWithSetAtEndOfTime() {
        Instant instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
        TimerInternals.TimerData of = TimerInternals.TimerData.of(StateNamespaces.global(), instant, instant, TimeDomain.EVENT_TIME);
        WatermarkManager.TimerUpdate.TimerUpdateBuilder builder = WatermarkManager.TimerUpdate.builder(StructuralKey.empty());
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage(instant.toString());
        builder.setTimer(of);
    }

    @Test
    public void timerUpdateBuilderWithSetPastEndOfTime() {
        Instant plus = BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.standardMinutes(2L));
        TimerInternals.TimerData of = TimerInternals.TimerData.of(StateNamespaces.global(), plus, plus, TimeDomain.EVENT_TIME);
        WatermarkManager.TimerUpdate.TimerUpdateBuilder builder = WatermarkManager.TimerUpdate.builder(StructuralKey.empty());
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage(plus.toString());
        builder.setTimer(of);
    }

    @Test
    public void timerUpdateBuilderWithSetThenDeleteHasOnlyDeleted() {
        WatermarkManager.TimerUpdate.TimerUpdateBuilder builder = WatermarkManager.TimerUpdate.builder((StructuralKey) null);
        Instant now = Instant.now();
        TimerInternals.TimerData of = TimerInternals.TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
        WatermarkManager.TimerUpdate build = builder.setTimer(of).deletedTimer(of).build();
        Assert.assertThat(build.getSetTimers(), Matchers.emptyIterable());
        Assert.assertThat(build.getDeletedTimers(), Matchers.contains(new TimerInternals.TimerData[]{of}));
    }

    @Test
    public void timerUpdateBuilderWithDeleteThenSetHasOnlySet() {
        WatermarkManager.TimerUpdate.TimerUpdateBuilder builder = WatermarkManager.TimerUpdate.builder((StructuralKey) null);
        Instant now = Instant.now();
        TimerInternals.TimerData of = TimerInternals.TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
        WatermarkManager.TimerUpdate build = builder.deletedTimer(of).setTimer(of).build();
        Assert.assertThat(build.getSetTimers(), Matchers.contains(new TimerInternals.TimerData[]{of}));
        Assert.assertThat(build.getDeletedTimers(), Matchers.emptyIterable());
    }

    @Test
    public void timerUpdateBuilderWithSetAfterBuildNotAddedToBuilt() {
        WatermarkManager.TimerUpdate.TimerUpdateBuilder builder = WatermarkManager.TimerUpdate.builder((StructuralKey) null);
        Instant now = Instant.now();
        TimerInternals.TimerData of = TimerInternals.TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
        WatermarkManager.TimerUpdate build = builder.build();
        builder.setTimer(of);
        Assert.assertThat(build.getSetTimers(), Matchers.emptyIterable());
        builder.build();
        Assert.assertThat(build.getSetTimers(), Matchers.emptyIterable());
    }

    @Test
    public void timerUpdateBuilderWithDeleteAfterBuildNotAddedToBuilt() {
        WatermarkManager.TimerUpdate.TimerUpdateBuilder builder = WatermarkManager.TimerUpdate.builder((StructuralKey) null);
        Instant now = Instant.now();
        TimerInternals.TimerData of = TimerInternals.TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
        WatermarkManager.TimerUpdate build = builder.build();
        builder.deletedTimer(of);
        Assert.assertThat(build.getDeletedTimers(), Matchers.emptyIterable());
        builder.build();
        Assert.assertThat(build.getDeletedTimers(), Matchers.emptyIterable());
    }

    @Test
    public void timerUpdateBuilderWithCompletedAfterBuildNotAddedToBuilt() {
        WatermarkManager.TimerUpdate.TimerUpdateBuilder builder = WatermarkManager.TimerUpdate.builder((StructuralKey) null);
        Instant now = Instant.now();
        TimerInternals.TimerData of = TimerInternals.TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
        WatermarkManager.TimerUpdate build = builder.build();
        builder.withCompletedTimers(ImmutableList.of(of));
        Assert.assertThat(build.getCompletedTimers(), Matchers.emptyIterable());
        builder.build();
        Assert.assertThat(build.getCompletedTimers(), Matchers.emptyIterable());
    }

    @Test
    public void timerUpdateWithCompletedTimersNotAddedToExisting() {
        WatermarkManager.TimerUpdate.TimerUpdateBuilder builder = WatermarkManager.TimerUpdate.builder((StructuralKey) null);
        Instant now = Instant.now();
        TimerInternals.TimerData of = TimerInternals.TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
        WatermarkManager.TimerUpdate build = builder.build();
        Assert.assertThat(build.getCompletedTimers(), Matchers.emptyIterable());
        Assert.assertThat(build.withCompletedTimers(ImmutableList.of(of)).getCompletedTimers(), Matchers.contains(new TimerInternals.TimerData[]{of}));
        Assert.assertThat(build.getCompletedTimers(), Matchers.emptyIterable());
    }

    @SafeVarargs
    private final <T> CommittedBundle<T> timestampedBundle(PCollection<T> pCollection, TimestampedValue<T>... timestampedValueArr) {
        UncommittedBundle createBundle = this.bundleFactory.createBundle(pCollection);
        for (TimestampedValue<T> timestampedValue : timestampedValueArr) {
            createBundle.add(WindowedValue.timestampedValueInGlobalWindow(timestampedValue.getValue(), timestampedValue.getTimestamp()));
        }
        return createBundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
    }

    @SafeVarargs
    private final <T> CommittedBundle<T> multiWindowedBundle(PCollection<T> pCollection, T... tArr) {
        UncommittedBundle createBundle = this.bundleFactory.createBundle(pCollection);
        ImmutableList of = ImmutableList.of(GlobalWindow.INSTANCE, new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(0L)));
        for (T t : tArr) {
            createBundle.add(WindowedValue.of(t, BoundedWindow.TIMESTAMP_MIN_VALUE, of, PaneInfo.NO_FIRING));
        }
        return createBundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
    }
}
