/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Function;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Lists;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Ordering;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Sets;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.hash.HashCode;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Reify;
import org.apache.beam.sdk.transforms.Requirements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class WatchTest
implements Serializable {
    @Rule
    public transient TestPipeline p = TestPipeline.create();

    @Test
    @Category(value={NeedsRunner.class})
    public void testSinglePollMultipleInputs() {
        PCollection res = (PCollection)((PCollection)this.p.apply((PTransform)Create.of((Object)"a", (Object[])new String[]{"b"}))).apply((PTransform)Watch.growthOf((Watch.Growth.PollFn)new Watch.Growth.PollFn<String, String>(){

            public Watch.Growth.PollResult<String> apply(String element, Contextful.Fn.Context c) throws Exception {
                return Watch.Growth.PollResult.complete((Instant)Instant.now(), Arrays.asList(element + ".foo", element + ".bar"));
            }
        }).withPollInterval(Duration.ZERO));
        PAssert.that((PCollection)res).containsInAnyOrder(Arrays.asList(KV.of((Object)"a", (Object)"a.foo"), KV.of((Object)"a", (Object)"a.bar"), KV.of((Object)"b", (Object)"b.foo"), KV.of((Object)"b", (Object)"b.bar")));
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testSinglePollMultipleInputsWithSideInput() {
        final PCollectionView moo = (PCollectionView)((PCollection)this.p.apply("moo", (PTransform)Create.of((Object)"moo", (Object[])new String[0]))).apply("moo singleton", (PTransform)View.asSingleton());
        final PCollectionView zoo = (PCollectionView)((PCollection)this.p.apply("zoo", (PTransform)Create.of((Object)"zoo", (Object[])new String[0]))).apply("zoo singleton", (PTransform)View.asSingleton());
        PCollection res = (PCollection)((PCollection)this.p.apply("input", (PTransform)Create.of((Object)"a", (Object[])new String[]{"b"}))).apply((PTransform)Watch.growthOf((Watch.Growth.PollFn)new Watch.Growth.PollFn<String, String>(){

            public Watch.Growth.PollResult<String> apply(String element, Contextful.Fn.Context c) throws Exception {
                return Watch.Growth.PollResult.complete((Instant)Instant.now(), Arrays.asList(element + " " + (String)c.sideInput(moo) + " " + (String)c.sideInput(zoo)));
            }
        }, (Requirements)Requirements.requiresSideInputs((PCollectionView[])new PCollectionView[]{moo, zoo})).withPollInterval(Duration.ZERO));
        PAssert.that((PCollection)res).containsInAnyOrder(Arrays.asList(KV.of((Object)"a", (Object)"a moo zoo"), KV.of((Object)"b", (Object)"b moo zoo")));
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testMultiplePollsWithTerminationBecauseOutputIsFinal() {
        this.testMultiplePolls(false);
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testMultiplePollsWithTerminationDueToTerminationCondition() {
        this.testMultiplePolls(true);
    }

    private void testMultiplePolls(boolean terminationConditionElapsesBeforeOutputIsFinal) {
        List<Integer> all = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
        PCollection res = (PCollection)((PCollection)((PCollection)this.p.apply((PTransform)Create.of((Object)"a", (Object[])new String[0]))).apply((PTransform)Watch.growthOf(new TimedPollFn(all, Duration.standardSeconds((long)1L), Duration.standardSeconds((long)3L), Duration.standardSeconds((long)30L))).withTerminationPerInput((Watch.Growth.TerminationCondition)Watch.Growth.afterTotalOf((ReadableDuration)Duration.standardSeconds((long)(terminationConditionElapsesBeforeOutputIsFinal ? 2L : 100L)))).withPollInterval(Duration.millis((long)300L)).withOutputCoder((Coder)VarIntCoder.of()))).apply("Drop input", (PTransform)Values.create());
        PAssert.that((PCollection)res).containsInAnyOrder(all);
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testMultiplePollsWithKeyExtractor() {
        List<KV> polls = Arrays.asList(KV.of((Object)0, (Object)"0"), KV.of((Object)10, (Object)"10"), KV.of((Object)20, (Object)"20"), KV.of((Object)30, (Object)"30"), KV.of((Object)40, (Object)"40"), KV.of((Object)40, (Object)"40.1"), KV.of((Object)20, (Object)"20.1"), KV.of((Object)50, (Object)"50"), KV.of((Object)10, (Object)"10.1"), KV.of((Object)10, (Object)"10.2"), KV.of((Object)60, (Object)"60"), KV.of((Object)70, (Object)"70"), KV.of((Object)60, (Object)"60.1"), KV.of((Object)80, (Object)"80"), KV.of((Object)40, (Object)"40.2"), KV.of((Object)90, (Object)"90"), KV.of((Object)90, (Object)"90.1"));
        List<Integer> expected = Arrays.asList(0, 10, 20, 30, 40, 50, 60, 70, 80, 90);
        PCollection res = (PCollection)((PCollection)((PCollection)((PCollection)this.p.apply((PTransform)Create.of((Object)"a", (Object[])new String[0]))).apply((PTransform)Watch.growthOf((Contextful)Contextful.of(new TimedPollFn(polls, Duration.standardSeconds((long)1L), Duration.standardSeconds((long)3L), Duration.standardSeconds((long)30L)), (Requirements)Requirements.empty()), KV::getKey).withTerminationPerInput((Watch.Growth.TerminationCondition)Watch.Growth.afterTotalOf((ReadableDuration)Duration.standardSeconds((long)5L))).withPollInterval(Duration.millis((long)100L)).withOutputCoder((Coder)KvCoder.of((Coder)VarIntCoder.of(), (Coder)StringUtf8Coder.of())).withOutputKeyCoder((Coder)VarIntCoder.of()))).apply("Drop input", (PTransform)Values.create())).apply("Drop auxiliary string", (PTransform)Keys.create());
        PAssert.that((PCollection)res).containsInAnyOrder(expected);
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testMultiplePollsStopAfterTimeSinceNewOutput() {
        List<Integer> all = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
        PCollection res = (PCollection)((PCollection)((PCollection)this.p.apply((PTransform)Create.of((Object)"a", (Object[])new String[0]))).apply((PTransform)Watch.growthOf(new TimedPollFn(all, Duration.standardSeconds((long)1L), Duration.standardSeconds((long)1000L), Duration.standardSeconds((long)30L))).withTerminationPerInput((Watch.Growth.TerminationCondition)Watch.Growth.afterTimeSinceNewOutput((ReadableDuration)Duration.standardSeconds((long)3L))).withPollInterval(Duration.millis((long)300L)).withOutputCoder((Coder)VarIntCoder.of()))).apply("Drop input", (PTransform)Values.create());
        PAssert.that((PCollection)res).containsInAnyOrder(all);
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testSinglePollWithManyResults() {
        long numResults = 3000L;
        PCollection res = (PCollection)((PCollection)((PCollection)this.p.apply((PTransform)Create.of((Object)"a", (Object[])new String[0]))).apply((PTransform)Watch.growthOf((Watch.Growth.PollFn)new Watch.Growth.PollFn<String, KV<String, Integer>>(){

            public Watch.Growth.PollResult<KV<String, Integer>> apply(String element, Contextful.Fn.Context c) throws Exception {
                String pollId = UUID.randomUUID().toString();
                ArrayList<KV> output = Lists.newArrayList();
                int i = 0;
                while ((long)i < 3000L) {
                    output.add(KV.of((Object)pollId, (Object)i));
                    ++i;
                }
                return Watch.Growth.PollResult.complete((Instant)Instant.now(), output);
            }
        }).withTerminationPerInput((Watch.Growth.TerminationCondition)Watch.Growth.afterTotalOf((ReadableDuration)Duration.standardSeconds((long)1L))).withPollInterval(Duration.millis((long)1L)).withOutputCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of())))).apply("Drop input", (PTransform)Values.create());
        PAssert.that((String)"Poll called only once", (PCollection)((PCollection)res.apply((PTransform)Keys.create()))).satisfies((SerializableFunction & Serializable)pollIds -> {
            Assert.assertEquals((long)1L, (long)Sets.newHashSet(pollIds).size());
            return null;
        });
        PAssert.that((String)"Yields all expected results", (PCollection)((PCollection)res.apply("Drop poll id", (PTransform)Values.create()))).satisfies((SerializableFunction & Serializable)input -> {
            Assert.assertEquals((String)"Total number of results mismatches", (long)3000L, (long)Lists.newArrayList(input).size());
            Assert.assertEquals((String)"Results are not unique", (long)3000L, (long)Sets.newHashSet(input).size());
            return null;
        });
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testMultiplePollsWithManyResults() {
        long numResults = 3000L;
        ArrayList<Integer> all = Lists.newArrayList();
        int i = 0;
        while ((long)i < 3000L) {
            all.add(i);
            ++i;
        }
        PCollection res = (PCollection)((PCollection)((PCollection)((PCollection)this.p.apply((PTransform)Create.of((Object)"a", (Object[])new String[0]))).apply((PTransform)Watch.growthOf(new TimedPollFn(all, Duration.standardSeconds((long)1L), Duration.standardSeconds((long)3L), Duration.standardSeconds((long)30L))).withPollInterval(Duration.millis((long)500L)).withOutputCoder((Coder)VarIntCoder.of()))).apply(Reify.timestampsInValue())).apply("Drop timestamped input", (PTransform)Values.create());
        PAssert.that((PCollection)res).satisfies((SerializableFunction & Serializable)outputs -> {
            Function<TimestampedValue<Integer>, Integer> extractValueFn = new Function<TimestampedValue<Integer>, Integer>(){

                @Override
                @Nullable
                public Integer apply(@Nullable TimestampedValue<Integer> input) {
                    return (Integer)input.getValue();
                }
            };
            Function<TimestampedValue<Integer>, Instant> extractTimestampFn = new Function<TimestampedValue<Integer>, Instant>(){

                @Override
                @Nullable
                public Instant apply(@Nullable TimestampedValue<Integer> input) {
                    return input.getTimestamp();
                }
            };
            Ordering<TimestampedValue<Integer>> byValue = Ordering.natural().onResultOf(extractValueFn);
            Ordering<TimestampedValue<Integer>> byTimestamp = Ordering.natural().onResultOf(extractTimestampFn);
            Assert.assertTrue((String)"Outputs must be in timestamp order", (boolean)byTimestamp.isOrdered(byValue.sortedCopy(outputs)));
            Assert.assertEquals((String)"Yields all expected values", (long)3000L, (long)Sets.newHashSet(StreamSupport.stream(outputs.spliterator(), false).map(extractValueFn::apply).collect(Collectors.toList())).size());
            Assert.assertThat((String)"Poll called more than once", (Object)Sets.newHashSet(StreamSupport.stream(outputs.spliterator(), false).map(extractTimestampFn::apply).collect(Collectors.toList())).size(), (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(1)));
            return null;
        });
        this.p.run();
    }

    @Test
    public void testTerminationConditionsNever() {
        Watch.Growth.Never c = Watch.Growth.never();
        Integer state = c.forNewInput(Instant.now(), null);
        Assert.assertFalse((boolean)c.canStopPolling(Instant.now(), state));
    }

    @Test
    public void testTerminationConditionsAfterTotalOf() {
        Instant now = Instant.now();
        Watch.Growth.AfterTotalOf c = Watch.Growth.afterTotalOf((ReadableDuration)Duration.standardSeconds((long)5L));
        KV state = c.forNewInput(now, null);
        Assert.assertFalse((boolean)c.canStopPolling(now, state));
        Assert.assertFalse((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)3L)), state));
        Assert.assertTrue((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)6L)), state));
    }

    @Test
    public void testTerminationConditionsAfterTimeSinceNewOutput() {
        Instant now = Instant.now();
        Watch.Growth.AfterTimeSinceNewOutput c = Watch.Growth.afterTimeSinceNewOutput((ReadableDuration)Duration.standardSeconds((long)5L));
        KV state = c.forNewInput(now, null);
        Assert.assertFalse((boolean)c.canStopPolling(now, state));
        Assert.assertFalse((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)3L)), state));
        Assert.assertFalse((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)6L)), state));
        state = c.onSeenNewOutput(now.plus((ReadableDuration)Duration.standardSeconds((long)3L)), state);
        Assert.assertFalse((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)3L)), state));
        Assert.assertFalse((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)6L)), state));
        Assert.assertTrue((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)9L)), state));
        state = c.onSeenNewOutput(now.plus((ReadableDuration)Duration.standardSeconds((long)5L)), state);
        Assert.assertFalse((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)3L)), state));
        Assert.assertFalse((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)6L)), state));
        Assert.assertFalse((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)9L)), state));
        Assert.assertTrue((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)11L)), state));
    }

    @Test
    public void testTerminationConditionsEitherOf() {
        Instant now = Instant.now();
        Watch.Growth.AfterTotalOf a = Watch.Growth.afterTotalOf((ReadableDuration)Duration.standardSeconds((long)5L));
        Watch.Growth.AfterTotalOf b = Watch.Growth.afterTotalOf((ReadableDuration)Duration.standardSeconds((long)10L));
        Watch.Growth.BinaryCombined c = Watch.Growth.eitherOf((Watch.Growth.TerminationCondition)a, (Watch.Growth.TerminationCondition)b);
        KV state = c.forNewInput(now, null);
        Assert.assertFalse((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)3L)), state));
        Assert.assertTrue((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)7L)), state));
        Assert.assertTrue((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)12L)), state));
    }

    @Test
    public void testTerminationConditionsAllOf() {
        Instant now = Instant.now();
        Watch.Growth.AfterTotalOf a = Watch.Growth.afterTotalOf((ReadableDuration)Duration.standardSeconds((long)5L));
        Watch.Growth.AfterTotalOf b = Watch.Growth.afterTotalOf((ReadableDuration)Duration.standardSeconds((long)10L));
        Watch.Growth.BinaryCombined c = Watch.Growth.allOf((Watch.Growth.TerminationCondition)a, (Watch.Growth.TerminationCondition)b);
        KV state = c.forNewInput(now, null);
        Assert.assertFalse((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)3L)), state));
        Assert.assertFalse((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)7L)), state));
        Assert.assertTrue((boolean)c.canStopPolling(now.plus((ReadableDuration)Duration.standardSeconds((long)12L)), state));
    }

    private static Watch.GrowthTracker<String, String, Integer> newTracker(Watch.GrowthState<String, String, Integer> state) {
        return new Watch.GrowthTracker(SerializableFunctions.identity(), (Coder)StringUtf8Coder.of(), state, (Watch.Growth.TerminationCondition)Watch.Growth.never());
    }

    private static Watch.GrowthTracker<String, String, Integer> newTracker() {
        return WatchTest.newTracker((Watch.GrowthState<String, String, Integer>)new Watch.GrowthState((Object)Watch.Growth.never().forNewInput(Instant.now(), null)));
    }

    private String tryClaimNextPending(Watch.GrowthTracker<String, ?, ?> tracker) {
        Assert.assertTrue((boolean)tracker.hasPending());
        Map.Entry entry = tracker.getNextPending();
        tracker.tryClaim((Object)((HashCode)entry.getKey()));
        return (String)((TimestampedValue)entry.getValue()).getValue();
    }

    @Test
    public void testGrowthTrackerCheckpointNonEmpty() {
        Instant now = Instant.now();
        Watch.GrowthTracker<String, String, Integer> tracker = WatchTest.newTracker();
        tracker.addNewAsPending(Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of((Object)"d", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)4L))), TimestampedValue.of((Object)"c", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)3L))), TimestampedValue.of((Object)"a", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)1L))), TimestampedValue.of((Object)"b", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)2L))))).withWatermark(now.plus((ReadableDuration)Duration.standardSeconds((long)7L))));
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)1L)), (Object)tracker.getWatermark());
        Assert.assertEquals((Object)"a", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"b", (Object)this.tryClaimNextPending(tracker));
        Assert.assertTrue((boolean)tracker.hasPending());
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)3L)), (Object)tracker.getWatermark());
        Watch.GrowthTracker<String, String, Integer> residualTracker = WatchTest.newTracker((Watch.GrowthState<String, String, Integer>)tracker.checkpoint());
        Watch.GrowthTracker<String, String, Integer> primaryTracker = WatchTest.newTracker((Watch.GrowthState<String, String, Integer>)tracker.currentRestriction());
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)1L)), (Object)primaryTracker.getWatermark());
        Assert.assertEquals((Object)"a", (Object)this.tryClaimNextPending(primaryTracker));
        Assert.assertEquals((Object)"b", (Object)this.tryClaimNextPending(primaryTracker));
        Assert.assertFalse((boolean)primaryTracker.hasPending());
        Assert.assertFalse((boolean)primaryTracker.shouldPollMore());
        primaryTracker.checkDone();
        Assert.assertEquals((Object)BoundedWindow.TIMESTAMP_MAX_VALUE, (Object)primaryTracker.getWatermark());
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)3L)), (Object)residualTracker.getWatermark());
        Assert.assertEquals((Object)"c", (Object)this.tryClaimNextPending(residualTracker));
        Assert.assertEquals((Object)"d", (Object)this.tryClaimNextPending(residualTracker));
        Assert.assertFalse((boolean)residualTracker.hasPending());
        Assert.assertTrue((boolean)residualTracker.shouldPollMore());
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)7L)), (Object)residualTracker.getWatermark());
        Assert.assertFalse((boolean)tracker.hasPending());
        tracker.checkDone();
        Assert.assertFalse((boolean)tracker.shouldPollMore());
        Assert.assertEquals((Object)BoundedWindow.TIMESTAMP_MAX_VALUE, (Object)tracker.getWatermark());
    }

    @Test
    public void testGrowthTrackerOutputFullyBeforeCheckpointIncomplete() {
        Instant now = Instant.now();
        Watch.GrowthTracker<String, String, Integer> tracker = WatchTest.newTracker();
        tracker.addNewAsPending(Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of((Object)"d", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)4L))), TimestampedValue.of((Object)"c", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)3L))), TimestampedValue.of((Object)"a", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)1L))), TimestampedValue.of((Object)"b", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)2L))))).withWatermark(now.plus((ReadableDuration)Duration.standardSeconds((long)7L))));
        Assert.assertEquals((Object)"a", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"b", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"c", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"d", (Object)this.tryClaimNextPending(tracker));
        Assert.assertFalse((boolean)tracker.hasPending());
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)7L)), (Object)tracker.getWatermark());
        Watch.GrowthTracker<String, String, Integer> residualTracker = WatchTest.newTracker((Watch.GrowthState<String, String, Integer>)tracker.checkpoint());
        Watch.GrowthTracker<String, String, Integer> primaryTracker = WatchTest.newTracker((Watch.GrowthState<String, String, Integer>)tracker.currentRestriction());
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)1L)), (Object)primaryTracker.getWatermark());
        Assert.assertEquals((Object)"a", (Object)this.tryClaimNextPending(primaryTracker));
        Assert.assertEquals((Object)"b", (Object)this.tryClaimNextPending(primaryTracker));
        Assert.assertEquals((Object)"c", (Object)this.tryClaimNextPending(primaryTracker));
        Assert.assertEquals((Object)"d", (Object)this.tryClaimNextPending(primaryTracker));
        Assert.assertFalse((boolean)primaryTracker.hasPending());
        Assert.assertFalse((boolean)primaryTracker.shouldPollMore());
        primaryTracker.checkDone();
        Assert.assertEquals((Object)BoundedWindow.TIMESTAMP_MAX_VALUE, (Object)primaryTracker.getWatermark());
        Assert.assertFalse((boolean)residualTracker.hasPending());
        Assert.assertTrue((boolean)residualTracker.shouldPollMore());
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)7L)), (Object)residualTracker.getWatermark());
        tracker.checkDone();
        Assert.assertFalse((boolean)tracker.hasPending());
        Assert.assertFalse((boolean)tracker.shouldPollMore());
        Assert.assertEquals((Object)BoundedWindow.TIMESTAMP_MAX_VALUE, (Object)tracker.getWatermark());
    }

    @Test
    public void testGrowthTrackerPollAfterCheckpointIncompleteWithNewOutputs() {
        Instant now = Instant.now();
        Watch.GrowthTracker<String, String, Integer> tracker = WatchTest.newTracker();
        tracker.addNewAsPending(Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of((Object)"d", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)4L))), TimestampedValue.of((Object)"c", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)3L))), TimestampedValue.of((Object)"a", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)1L))), TimestampedValue.of((Object)"b", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)2L))))).withWatermark(now.plus((ReadableDuration)Duration.standardSeconds((long)7L))));
        Assert.assertEquals((Object)"a", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"b", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"c", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"d", (Object)this.tryClaimNextPending(tracker));
        Watch.GrowthState checkpoint = tracker.checkpoint();
        Watch.GrowthTracker<String, String, Integer> residualTracker = WatchTest.newTracker((Watch.GrowthState<String, String, Integer>)checkpoint);
        residualTracker.addNewAsPending(Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of((Object)"e", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)5L))), TimestampedValue.of((Object)"d", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)4L))), TimestampedValue.of((Object)"c", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)3L))), TimestampedValue.of((Object)"a", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)1L))), TimestampedValue.of((Object)"b", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)2L))), TimestampedValue.of((Object)"f", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)8L))))).withWatermark(now.plus((ReadableDuration)Duration.standardSeconds((long)12L))));
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)5L)), (Object)residualTracker.getWatermark());
        Assert.assertEquals((Object)"e", (Object)this.tryClaimNextPending(residualTracker));
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)8L)), (Object)residualTracker.getWatermark());
        Assert.assertEquals((Object)"f", (Object)this.tryClaimNextPending(residualTracker));
        Assert.assertFalse((boolean)residualTracker.hasPending());
        Assert.assertTrue((boolean)residualTracker.shouldPollMore());
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)12L)), (Object)residualTracker.getWatermark());
        residualTracker = WatchTest.newTracker((Watch.GrowthState<String, String, Integer>)checkpoint);
        residualTracker.addNewAsPending(Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of((Object)"e", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)5L))), TimestampedValue.of((Object)"d", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)4L))), TimestampedValue.of((Object)"c", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)3L))), TimestampedValue.of((Object)"a", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)1L))), TimestampedValue.of((Object)"b", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)2L))), TimestampedValue.of((Object)"f", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)8L))))));
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)5L)), (Object)residualTracker.getWatermark());
        Assert.assertEquals((Object)"e", (Object)this.tryClaimNextPending(residualTracker));
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)5L)), (Object)residualTracker.getWatermark());
        Assert.assertEquals((Object)"f", (Object)this.tryClaimNextPending(residualTracker));
        Assert.assertFalse((boolean)residualTracker.hasPending());
        Assert.assertTrue((boolean)residualTracker.shouldPollMore());
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)5L)), (Object)residualTracker.getWatermark());
    }

    @Test
    public void testGrowthTrackerPollAfterCheckpointWithoutNewOutputs() {
        Instant now = Instant.now();
        Watch.GrowthTracker<String, String, Integer> tracker = WatchTest.newTracker();
        tracker.addNewAsPending(Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of((Object)"d", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)4L))), TimestampedValue.of((Object)"c", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)3L))), TimestampedValue.of((Object)"a", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)1L))), TimestampedValue.of((Object)"b", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)2L))))).withWatermark(now.plus((ReadableDuration)Duration.standardSeconds((long)7L))));
        Assert.assertEquals((Object)"a", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"b", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"c", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"d", (Object)this.tryClaimNextPending(tracker));
        Watch.GrowthState checkpoint = tracker.checkpoint();
        Watch.GrowthTracker<String, String, Integer> residualTracker = WatchTest.newTracker((Watch.GrowthState<String, String, Integer>)checkpoint);
        residualTracker.addNewAsPending(Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of((Object)"c", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)3L))), TimestampedValue.of((Object)"d", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)4L))), TimestampedValue.of((Object)"a", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)1L))), TimestampedValue.of((Object)"b", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)2L))))).withWatermark(now.plus((ReadableDuration)Duration.standardSeconds((long)12L))));
        Assert.assertFalse((boolean)residualTracker.hasPending());
        Assert.assertTrue((boolean)residualTracker.shouldPollMore());
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)12L)), (Object)residualTracker.getWatermark());
        residualTracker = WatchTest.newTracker((Watch.GrowthState<String, String, Integer>)checkpoint);
        residualTracker.addNewAsPending(Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of((Object)"c", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)3L))), TimestampedValue.of((Object)"d", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)4L))), TimestampedValue.of((Object)"a", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)1L))), TimestampedValue.of((Object)"b", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)2L))))));
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)7L)), (Object)residualTracker.getWatermark());
    }

    @Test
    public void testGrowthTrackerPollAfterCheckpointWithoutNewOutputsNoWatermark() {
        Instant now = Instant.now();
        Watch.GrowthTracker<String, String, Integer> tracker = WatchTest.newTracker();
        tracker.addNewAsPending(Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of((Object)"d", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)4L))), TimestampedValue.of((Object)"c", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)3L))), TimestampedValue.of((Object)"a", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)1L))), TimestampedValue.of((Object)"b", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)2L))))));
        Assert.assertEquals((Object)"a", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"b", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"c", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"d", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)1L)), (Object)tracker.getWatermark());
        Watch.GrowthState checkpoint = tracker.checkpoint();
        Watch.GrowthTracker<String, String, Integer> residualTracker = WatchTest.newTracker((Watch.GrowthState<String, String, Integer>)checkpoint);
        residualTracker.addNewAsPending(Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of((Object)"c", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)3L))), TimestampedValue.of((Object)"d", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)4L))), TimestampedValue.of((Object)"a", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)1L))), TimestampedValue.of((Object)"b", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)2L))))));
        Assert.assertEquals((Object)now.plus((ReadableDuration)Duration.standardSeconds((long)1L)), (Object)residualTracker.getWatermark());
    }

    @Test
    public void testGrowthTrackerRepeatedEmptyPollWatermark() {
        Watch.GrowthTracker<String, String, Integer> tracker = WatchTest.newTracker();
        tracker.addNewAsPending(Watch.Growth.PollResult.incomplete(Collections.emptyList()));
        Assert.assertEquals((Object)BoundedWindow.TIMESTAMP_MIN_VALUE, (Object)tracker.getWatermark());
        Instant now = Instant.now();
        Watch.GrowthTracker<String, String, Integer> tracker2 = WatchTest.newTracker();
        tracker2.addNewAsPending(Watch.Growth.PollResult.incomplete(Collections.emptyList()).withWatermark(now));
        Assert.assertEquals((Object)now, (Object)tracker2.getWatermark());
    }

    @Test
    public void testGrowthTrackerOutputFullyBeforeCheckpointComplete() {
        Instant now = Instant.now();
        Watch.GrowthTracker<String, String, Integer> tracker = WatchTest.newTracker();
        tracker.addNewAsPending(Watch.Growth.PollResult.complete(Arrays.asList(TimestampedValue.of((Object)"d", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)4L))), TimestampedValue.of((Object)"c", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)3L))), TimestampedValue.of((Object)"a", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)1L))), TimestampedValue.of((Object)"b", (Instant)now.plus((ReadableDuration)Duration.standardSeconds((long)2L))))));
        Assert.assertEquals((Object)"a", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"b", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"c", (Object)this.tryClaimNextPending(tracker));
        Assert.assertEquals((Object)"d", (Object)this.tryClaimNextPending(tracker));
        Assert.assertFalse((boolean)tracker.hasPending());
        Assert.assertEquals((Object)BoundedWindow.TIMESTAMP_MAX_VALUE, (Object)tracker.getWatermark());
        Watch.GrowthTracker<String, String, Integer> residualTracker = WatchTest.newTracker((Watch.GrowthState<String, String, Integer>)tracker.checkpoint());
        residualTracker.checkDone();
        Assert.assertFalse((boolean)residualTracker.hasPending());
        Assert.assertFalse((boolean)residualTracker.shouldPollMore());
        Assert.assertEquals((Object)BoundedWindow.TIMESTAMP_MAX_VALUE, (Object)residualTracker.getWatermark());
        tracker.checkDone();
        Assert.assertFalse((boolean)tracker.hasPending());
        Assert.assertFalse((boolean)tracker.shouldPollMore());
        Assert.assertEquals((Object)BoundedWindow.TIMESTAMP_MAX_VALUE, (Object)tracker.getWatermark());
    }

    private static class TimedPollFn<InputT, OutputT>
    extends Watch.Growth.PollFn<InputT, OutputT> {
        private final Instant baseTime = Instant.now();
        private final List<OutputT> outputs;
        private final Duration timeToOutputEverything;
        private final Duration timeToDeclareOutputFinal;
        private final Duration timeToFail;

        public TimedPollFn(List<OutputT> outputs, Duration timeToOutputEverything, Duration timeToDeclareOutputFinal, Duration timeToFail) {
            this.outputs = outputs;
            this.timeToOutputEverything = timeToOutputEverything;
            this.timeToDeclareOutputFinal = timeToDeclareOutputFinal;
            this.timeToFail = timeToFail;
        }

        public Watch.Growth.PollResult<OutputT> apply(InputT element, Contextful.Fn.Context c) throws Exception {
            Instant now = Instant.now();
            Duration elapsed = new Duration((ReadableInstant)this.baseTime, (ReadableInstant)Instant.now());
            if (elapsed.isLongerThan((ReadableDuration)this.timeToFail)) {
                Assert.fail((String)String.format("Poll called %s after base time, which is longer than the threshold of %s", elapsed, this.timeToFail));
            }
            double fractionElapsed = 1.0 * (double)elapsed.getMillis() / (double)this.timeToOutputEverything.getMillis();
            int numToEmit = (int)Math.min((double)this.outputs.size(), fractionElapsed * (double)this.outputs.size());
            ArrayList<TimestampedValue> toEmit = Lists.newArrayList();
            for (int i = 0; i < numToEmit; ++i) {
                toEmit.add(TimestampedValue.of(this.outputs.get(i), (Instant)now));
            }
            return elapsed.isLongerThan((ReadableDuration)this.timeToDeclareOutputFinal) ? Watch.Growth.PollResult.complete(toEmit) : Watch.Growth.PollResult.incomplete(toEmit).withWatermark(now);
        }
    }
}

