package org.apache.beam.sdk.transforms;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import org.apache.beam.repackaged.core.org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Funnels;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/transforms/WatchTest.class */
public class WatchTest implements Serializable {

    @Rule
    public transient TestPipeline p = TestPipeline.create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/transforms/WatchTest$TimedPollFn.class */
    public static class TimedPollFn<InputT, OutputT> extends Watch.Growth.PollFn<InputT, OutputT> {
        private final Instant baseTime = Instant.now();
        private final List<OutputT> outputs;
        private final Duration timeToOutputEverything;
        private final Duration timeToDeclareOutputFinal;
        private final Duration timeToFail;

        public TimedPollFn(List<OutputT> list, Duration duration, Duration duration2, Duration duration3) {
            this.outputs = list;
            this.timeToOutputEverything = duration;
            this.timeToDeclareOutputFinal = duration2;
            this.timeToFail = duration3;
        }

        public Watch.Growth.PollResult<OutputT> apply(InputT inputt, Contextful.Fn.Context context) throws Exception {
            Instant now = Instant.now();
            Duration duration = new Duration(this.baseTime, Instant.now());
            if (duration.isLongerThan(this.timeToFail)) {
                Assert.fail(String.format("Poll called %s after base time, which is longer than the threshold of %s", duration, this.timeToFail));
            }
            int min = (int) Math.min(this.outputs.size(), ((1.0d * duration.getMillis()) / this.timeToOutputEverything.getMillis()) * this.outputs.size());
            ArrayList newArrayList = Lists.newArrayList();
            for (int i = 0; i < min; i++) {
                newArrayList.add(TimestampedValue.of(this.outputs.get(i), now));
            }
            return duration.isLongerThan(this.timeToDeclareOutputFinal) ? Watch.Growth.PollResult.complete(newArrayList) : Watch.Growth.PollResult.incomplete(newArrayList).withWatermark(now);
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: apply, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m499apply(Object obj, Contextful.Fn.Context context) throws Exception {
            return apply((TimedPollFn<InputT, OutputT>) obj, context);
        }
    }

    @Test
    @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class})
    public void testSinglePollMultipleInputs() {
        PAssert.that(this.p.apply(Create.of("a", new String[]{"b"})).apply(Watch.growthOf(new Watch.Growth.PollFn<String, String>() { // from class: org.apache.beam.sdk.transforms.WatchTest.1
            public Watch.Growth.PollResult<String> apply(String str, Contextful.Fn.Context context) throws Exception {
                return Watch.Growth.PollResult.complete(Instant.now(), Arrays.asList(str + ".foo", str + ".bar"));
            }
        }).withPollInterval(Duration.ZERO))).containsInAnyOrder(Arrays.asList(KV.of("a", "a.foo"), KV.of("a", "a.bar"), KV.of("b", "b.foo"), KV.of("b", "b.bar")));
        this.p.run();
    }

    @Test
    @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class})
    public void testSinglePollMultipleInputsWithSideInput() {
        final PCollectionView apply = this.p.apply("moo", Create.of("moo", new String[0])).apply("moo singleton", View.asSingleton());
        final PCollectionView apply2 = this.p.apply("zoo", Create.of("zoo", new String[0])).apply("zoo singleton", View.asSingleton());
        PAssert.that(this.p.apply("input", Create.of("a", new String[]{"b"})).apply(Watch.growthOf(new Watch.Growth.PollFn<String, String>() { // from class: org.apache.beam.sdk.transforms.WatchTest.2
            public Watch.Growth.PollResult<String> apply(String str, Contextful.Fn.Context context) throws Exception {
                return Watch.Growth.PollResult.complete(Instant.now(), Arrays.asList(str + StringUtils.SPACE + ((String) context.sideInput(apply)) + StringUtils.SPACE + ((String) context.sideInput(apply2))));
            }
        }, Requirements.requiresSideInputs(new PCollectionView[]{apply, apply2})).withPollInterval(Duration.ZERO))).containsInAnyOrder(Arrays.asList(KV.of("a", "a moo zoo"), KV.of("b", "b moo zoo")));
        this.p.run();
    }

    @Test
    @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class})
    public void testMultiplePollsWithTerminationBecauseOutputIsFinal() {
        testMultiplePolls(false);
    }

    @Test
    @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class})
    public void testMultiplePollsWithTerminationDueToTerminationCondition() {
        testMultiplePolls(true);
    }

    private void testMultiplePolls(boolean z) {
        List asList = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
        PAssert.that(this.p.apply(Create.of("a", new String[0])).apply(Watch.growthOf(new TimedPollFn(asList, Duration.standardSeconds(1L), Duration.standardSeconds(3L), Duration.standardSeconds(30L))).withTerminationPerInput(Watch.Growth.afterTotalOf(Duration.standardSeconds(z ? 2L : 100L))).withPollInterval(Duration.millis(300L)).withOutputCoder(VarIntCoder.of())).apply("Drop input", Values.create())).containsInAnyOrder(asList);
        this.p.run();
    }

    @Test
    @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class})
    public void testMultiplePollsWithKeyExtractor() {
        PAssert.that(this.p.apply(Create.of("a", new String[0])).apply(Watch.growthOf(Contextful.of(new TimedPollFn(Arrays.asList(KV.of(0, "0"), KV.of(10, "10"), KV.of(20, "20"), KV.of(30, "30"), KV.of(40, "40"), KV.of(40, "40.1"), KV.of(20, "20.1"), KV.of(50, "50"), KV.of(10, "10.1"), KV.of(10, "10.2"), KV.of(60, "60"), KV.of(70, "70"), KV.of(60, "60.1"), KV.of(80, "80"), KV.of(40, "40.2"), KV.of(90, "90"), KV.of(90, "90.1")), Duration.standardSeconds(1L), Duration.standardSeconds(3L), Duration.standardSeconds(30L)), Requirements.empty()), (v0) -> {
            return v0.getKey();
        }).withTerminationPerInput(Watch.Growth.afterTotalOf(Duration.standardSeconds(5L))).withPollInterval(Duration.millis(100L)).withOutputCoder(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of())).withOutputKeyCoder(VarIntCoder.of())).apply("Drop input", Values.create()).apply("Drop auxiliary string", Keys.create())).containsInAnyOrder(Arrays.asList(0, 10, 20, 30, 40, 50, 60, 70, 80, 90));
        this.p.run();
    }

    @Test
    @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class})
    public void testMultiplePollsStopAfterTimeSinceNewOutput() {
        List asList = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
        PAssert.that(this.p.apply(Create.of("a", new String[0])).apply(Watch.growthOf(new TimedPollFn(asList, Duration.standardSeconds(1L), Duration.standardSeconds(1000L), Duration.standardSeconds(30L))).withTerminationPerInput(Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3L))).withPollInterval(Duration.millis(300L)).withOutputCoder(VarIntCoder.of())).apply("Drop input", Values.create())).containsInAnyOrder(asList);
        this.p.run();
    }

    @Test
    @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class})
    public void testSinglePollWithManyResults() {
        PCollection apply = this.p.apply(Create.of("a", new String[0])).apply(Watch.growthOf(new Watch.Growth.PollFn<String, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.WatchTest.3
            public Watch.Growth.PollResult<KV<String, Integer>> apply(String str, Contextful.Fn.Context context) throws Exception {
                String uuid = UUID.randomUUID().toString();
                ArrayList newArrayList = Lists.newArrayList();
                for (int i = 0; i < 3000; i++) {
                    newArrayList.add(KV.of(uuid, Integer.valueOf(i)));
                }
                return Watch.Growth.PollResult.complete(Instant.now(), newArrayList);
            }
        }).withTerminationPerInput(Watch.Growth.afterTotalOf(Duration.standardSeconds(1L))).withPollInterval(Duration.millis(1L)).withOutputCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))).apply("Drop input", Values.create());
        PAssert.that("Poll called only once", apply.apply(Keys.create())).satisfies(iterable -> {
            Assert.assertEquals(1L, Sets.newHashSet(iterable).size());
            return null;
        });
        PAssert.that("Yields all expected results", apply.apply("Drop poll id", Values.create())).satisfies(iterable2 -> {
            Assert.assertEquals("Total number of results mismatches", 3000L, Lists.newArrayList(iterable2).size());
            Assert.assertEquals("Results are not unique", 3000L, Sets.newHashSet(iterable2).size());
            return null;
        });
        this.p.run();
    }

    @Test
    public void testCoder() throws Exception {
        Watch.PollingGrowthState of = Watch.PollingGrowthState.of(ImmutableMap.of(HashCode.fromString("0123456789abcdef0123456789abcdef"), Instant.now(), HashCode.fromString("01230123012301230123012301230123"), Instant.now()), Instant.now(), "STATE");
        Watch.NonPollingGrowthState of2 = Watch.NonPollingGrowthState.of(Watch.Growth.PollResult.incomplete(Instant.now(), Arrays.asList("A", "B")));
        Watch.GrowthStateCoder of3 = Watch.GrowthStateCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
        CoderProperties.coderDecodeEncodeEqual(of3, of);
        CoderProperties.coderDecodeEncodeEqual(of3, of2);
    }

    @Test
    public void testTerminationConditionsNever() {
        Watch.Growth.Never never = Watch.Growth.never();
        Assert.assertFalse(never.canStopPolling(Instant.now(), never.forNewInput(Instant.now(), (Object) null)));
    }

    @Test
    public void testTerminationConditionsAfterTotalOf() {
        Instant now = Instant.now();
        Watch.Growth.AfterTotalOf afterTotalOf = Watch.Growth.afterTotalOf(Duration.standardSeconds(5L));
        KV forNewInput = afterTotalOf.forNewInput(now, (Object) null);
        Assert.assertFalse(afterTotalOf.canStopPolling(now, forNewInput));
        Assert.assertFalse(afterTotalOf.canStopPolling(now.plus(Duration.standardSeconds(3L)), forNewInput));
        Assert.assertTrue(afterTotalOf.canStopPolling(now.plus(Duration.standardSeconds(6L)), forNewInput));
    }

    @Test
    public void testTerminationConditionsAfterTimeSinceNewOutput() {
        Instant now = Instant.now();
        Watch.Growth.AfterTimeSinceNewOutput afterTimeSinceNewOutput = Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5L));
        KV forNewInput = afterTimeSinceNewOutput.forNewInput(now, (Object) null);
        Assert.assertFalse(afterTimeSinceNewOutput.canStopPolling(now, forNewInput));
        Assert.assertFalse(afterTimeSinceNewOutput.canStopPolling(now.plus(Duration.standardSeconds(3L)), forNewInput));
        Assert.assertFalse(afterTimeSinceNewOutput.canStopPolling(now.plus(Duration.standardSeconds(6L)), forNewInput));
        KV onSeenNewOutput = afterTimeSinceNewOutput.onSeenNewOutput(now.plus(Duration.standardSeconds(3L)), forNewInput);
        Assert.assertFalse(afterTimeSinceNewOutput.canStopPolling(now.plus(Duration.standardSeconds(3L)), onSeenNewOutput));
        Assert.assertFalse(afterTimeSinceNewOutput.canStopPolling(now.plus(Duration.standardSeconds(6L)), onSeenNewOutput));
        Assert.assertTrue(afterTimeSinceNewOutput.canStopPolling(now.plus(Duration.standardSeconds(9L)), onSeenNewOutput));
        KV onSeenNewOutput2 = afterTimeSinceNewOutput.onSeenNewOutput(now.plus(Duration.standardSeconds(5L)), onSeenNewOutput);
        Assert.assertFalse(afterTimeSinceNewOutput.canStopPolling(now.plus(Duration.standardSeconds(3L)), onSeenNewOutput2));
        Assert.assertFalse(afterTimeSinceNewOutput.canStopPolling(now.plus(Duration.standardSeconds(6L)), onSeenNewOutput2));
        Assert.assertFalse(afterTimeSinceNewOutput.canStopPolling(now.plus(Duration.standardSeconds(9L)), onSeenNewOutput2));
        Assert.assertTrue(afterTimeSinceNewOutput.canStopPolling(now.plus(Duration.standardSeconds(11L)), onSeenNewOutput2));
    }

    @Test
    public void testTerminationConditionsEitherOf() {
        Instant now = Instant.now();
        Watch.Growth.BinaryCombined eitherOf = Watch.Growth.eitherOf(Watch.Growth.afterTotalOf(Duration.standardSeconds(5L)), Watch.Growth.afterTotalOf(Duration.standardSeconds(10L)));
        KV forNewInput = eitherOf.forNewInput(now, (Object) null);
        Assert.assertFalse(eitherOf.canStopPolling(now.plus(Duration.standardSeconds(3L)), forNewInput));
        Assert.assertTrue(eitherOf.canStopPolling(now.plus(Duration.standardSeconds(7L)), forNewInput));
        Assert.assertTrue(eitherOf.canStopPolling(now.plus(Duration.standardSeconds(12L)), forNewInput));
    }

    @Test
    public void testTerminationConditionsAllOf() {
        Instant now = Instant.now();
        Watch.Growth.BinaryCombined allOf = Watch.Growth.allOf(Watch.Growth.afterTotalOf(Duration.standardSeconds(5L)), Watch.Growth.afterTotalOf(Duration.standardSeconds(10L)));
        KV forNewInput = allOf.forNewInput(now, (Object) null);
        Assert.assertFalse(allOf.canStopPolling(now.plus(Duration.standardSeconds(3L)), forNewInput));
        Assert.assertFalse(allOf.canStopPolling(now.plus(Duration.standardSeconds(7L)), forNewInput));
        Assert.assertTrue(allOf.canStopPolling(now.plus(Duration.standardSeconds(12L)), forNewInput));
    }

    private static Watch.GrowthTracker<String, Integer> newTracker(Watch.GrowthState growthState) {
        return new Watch.GrowthTracker<>(growthState, (str, primitiveSink) -> {
            try {
                StringUtf8Coder.of().encode(str, Funnels.asOutputStream(primitiveSink));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private static HashCode hash128(String str) {
        return Hashing.murmur3_128().hashObject(str, (str2, primitiveSink) -> {
            try {
                StringUtf8Coder.of().encode(str2, Funnels.asOutputStream(primitiveSink));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private static Watch.GrowthTracker<String, Integer> newPollingGrowthTracker() {
        return newTracker(Watch.PollingGrowthState.of(Watch.Growth.never().forNewInput(Instant.now(), (Object) null)));
    }

    @Test
    public void testPollingGrowthTrackerUsesElementTimestampIfNoWatermarkProvided() throws Exception {
        final Instant now = Instant.now();
        Watch.WatchGrowthFn watchGrowthFn = new Watch.WatchGrowthFn(Watch.growthOf(new Watch.Growth.PollFn<String, String>() { // from class: org.apache.beam.sdk.transforms.WatchTest.4
            public Watch.Growth.PollResult<String> apply(String str, Contextful.Fn.Context context) throws Exception {
                return Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of("d", now.plus(Duration.standardSeconds(4L))), TimestampedValue.of("c", now.plus(Duration.standardSeconds(3L))), TimestampedValue.of("a", now.plus(Duration.standardSeconds(1L))), TimestampedValue.of("b", now.plus(Duration.standardSeconds(2L)))));
            }
        }).withPollInterval(Duration.standardSeconds(10L)), StringUtf8Coder.of(), SerializableFunctions.identity(), StringUtf8Coder.of());
        Watch.GrowthTracker<String, Integer> newPollingGrowthTracker = newPollingGrowthTracker();
        DoFn.ProcessContext processContext = (DoFn.ProcessContext) Mockito.mock(DoFn.ProcessContext.class);
        WatermarkEstimators.Manual manual = new WatermarkEstimators.Manual(BoundedWindow.TIMESTAMP_MIN_VALUE);
        DoFn.ProcessContinuation process = watchGrowthFn.process(processContext, newPollingGrowthTracker, manual);
        Assert.assertEquals(now.plus(Duration.standardSeconds(1L)), manual.currentWatermark());
        Assert.assertTrue(process.shouldResume());
    }

    @Test
    public void testPollingGrowthTrackerCheckpointNonEmpty() {
        Instant now = Instant.now();
        Watch.GrowthTracker<String, Integer> newPollingGrowthTracker = newPollingGrowthTracker();
        Watch.Growth.PollResult withWatermark = Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of("d", now.plus(Duration.standardSeconds(4L))), TimestampedValue.of("c", now.plus(Duration.standardSeconds(3L))), TimestampedValue.of("a", now.plus(Duration.standardSeconds(1L))), TimestampedValue.of("b", now.plus(Duration.standardSeconds(2L))))).withWatermark(now.plus(Duration.standardSeconds(7L)));
        Assert.assertTrue(newPollingGrowthTracker.tryClaim(KV.of(withWatermark, 1)));
        Watch.PollingGrowthState pollingGrowthState = (Watch.PollingGrowthState) newPollingGrowthTracker.trySplit(0.0d).getResidual();
        Watch.NonPollingGrowthState currentRestriction = newPollingGrowthTracker.currentRestriction();
        newPollingGrowthTracker.checkDone();
        Assert.assertEquals(withWatermark, currentRestriction.getPending());
        Assert.assertEquals(now.plus(Duration.standardSeconds(7L)), pollingGrowthState.getPollWatermark());
        MatcherAssert.assertThat(pollingGrowthState.getCompleted().keySet(), Matchers.containsInAnyOrder(new HashCode[]{hash128("a"), hash128("b"), hash128("c"), hash128("d")}));
        Assert.assertEquals(1L, ((Integer) pollingGrowthState.getTerminationState()).intValue());
    }

    @Test
    public void testPollingGrowthTrackerCheckpointEmpty() {
        Watch.GrowthTracker<String, Integer> newPollingGrowthTracker = newPollingGrowthTracker();
        Watch.PollingGrowthState pollingGrowthState = (Watch.PollingGrowthState) newPollingGrowthTracker.trySplit(0.0d).getResidual();
        Watch.GrowthState currentRestriction = newPollingGrowthTracker.currentRestriction();
        newPollingGrowthTracker.checkDone();
        Assert.assertEquals(Watch.GrowthTracker.EMPTY_STATE, currentRestriction);
        Assert.assertNull(pollingGrowthState.getPollWatermark());
        Assert.assertEquals(0L, pollingGrowthState.getCompleted().size());
        Assert.assertEquals(0L, ((Integer) pollingGrowthState.getTerminationState()).intValue());
    }

    @Test
    public void testPollingGrowthTrackerHashAlreadyClaimed() {
        Instant now = Instant.now();
        Watch.GrowthTracker<String, Integer> newPollingGrowthTracker = newPollingGrowthTracker();
        Watch.Growth.PollResult withWatermark = Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of("d", now.plus(Duration.standardSeconds(4L))), TimestampedValue.of("c", now.plus(Duration.standardSeconds(3L))), TimestampedValue.of("a", now.plus(Duration.standardSeconds(1L))), TimestampedValue.of("b", now.plus(Duration.standardSeconds(2L))))).withWatermark(now.plus(Duration.standardSeconds(7L)));
        Assert.assertTrue(newPollingGrowthTracker.tryClaim(KV.of(withWatermark, 1)));
        Assert.assertFalse(newTracker((Watch.PollingGrowthState) newPollingGrowthTracker.trySplit(0.0d).getResidual()).tryClaim(KV.of(withWatermark, 2)));
    }

    @Test
    public void testNonPollingGrowthTrackerIgnoresWatermark() throws Exception {
        Instant now = Instant.now();
        Watch.Growth.PollResult withWatermark = Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of("d", now.plus(Duration.standardSeconds(4L))), TimestampedValue.of("c", now.plus(Duration.standardSeconds(3L))), TimestampedValue.of("a", now.plus(Duration.standardSeconds(1L))), TimestampedValue.of("b", now.plus(Duration.standardSeconds(2L))))).withWatermark(now.plus(Duration.standardSeconds(7L)));
        Watch.Growth withPollInterval = Watch.growthOf(new Watch.Growth.PollFn<String, String>() { // from class: org.apache.beam.sdk.transforms.WatchTest.5
            public Watch.Growth.PollResult<String> apply(String str, Contextful.Fn.Context context) throws Exception {
                Assert.fail("Never expected to be invoked for NonPollingGrowthState.");
                return null;
            }
        }).withPollInterval(Duration.standardSeconds(10L));
        Watch.GrowthTracker<String, Integer> newTracker = newTracker(Watch.NonPollingGrowthState.of(withWatermark));
        Watch.WatchGrowthFn watchGrowthFn = new Watch.WatchGrowthFn(withPollInterval, StringUtf8Coder.of(), SerializableFunctions.identity(), StringUtf8Coder.of());
        DoFn.ProcessContext processContext = (DoFn.ProcessContext) Mockito.mock(DoFn.ProcessContext.class);
        WatermarkEstimators.Manual manual = new WatermarkEstimators.Manual(BoundedWindow.TIMESTAMP_MIN_VALUE);
        DoFn.ProcessContinuation process = watchGrowthFn.process(processContext, newTracker, manual);
        Assert.assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, manual.currentWatermark());
        Assert.assertFalse(process.shouldResume());
    }

    @Test
    public void testNonPollingGrowthTrackerCheckpointNonEmpty() {
        Instant now = Instant.now();
        Watch.Growth.PollResult withWatermark = Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of("d", now.plus(Duration.standardSeconds(4L))), TimestampedValue.of("c", now.plus(Duration.standardSeconds(3L))), TimestampedValue.of("a", now.plus(Duration.standardSeconds(1L))), TimestampedValue.of("b", now.plus(Duration.standardSeconds(2L))))).withWatermark(now.plus(Duration.standardSeconds(7L)));
        Watch.GrowthTracker<String, Integer> newTracker = newTracker(Watch.NonPollingGrowthState.of(withWatermark));
        Assert.assertTrue(newTracker.tryClaim(KV.of(withWatermark, 1)));
        Watch.GrowthState growthState = (Watch.GrowthState) newTracker.trySplit(0.0d).getResidual();
        Watch.NonPollingGrowthState currentRestriction = newTracker.currentRestriction();
        newTracker.checkDone();
        Assert.assertEquals(withWatermark, currentRestriction.getPending());
        Assert.assertEquals(Watch.GrowthTracker.EMPTY_STATE, growthState);
    }

    @Test
    public void testNonPollingGrowthTrackerCheckpointEmpty() {
        Instant now = Instant.now();
        Watch.Growth.PollResult withWatermark = Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of("d", now.plus(Duration.standardSeconds(4L))), TimestampedValue.of("c", now.plus(Duration.standardSeconds(3L))), TimestampedValue.of("a", now.plus(Duration.standardSeconds(1L))), TimestampedValue.of("b", now.plus(Duration.standardSeconds(2L))))).withWatermark(now.plus(Duration.standardSeconds(7L)));
        Watch.GrowthTracker<String, Integer> newTracker = newTracker(Watch.NonPollingGrowthState.of(withWatermark));
        Watch.NonPollingGrowthState nonPollingGrowthState = (Watch.NonPollingGrowthState) newTracker.trySplit(0.0d).getResidual();
        Watch.GrowthState currentRestriction = newTracker.currentRestriction();
        newTracker.checkDone();
        Assert.assertEquals(Watch.GrowthTracker.EMPTY_STATE, currentRestriction);
        Assert.assertEquals(withWatermark, nonPollingGrowthState.getPending());
    }

    @Test
    public void testNonPollingGrowthTrackerFailedToClaimOtherPollResult() {
        Instant now = Instant.now();
        Assert.assertFalse(newTracker(Watch.NonPollingGrowthState.of(Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of("d", now.plus(Duration.standardSeconds(4L))), TimestampedValue.of("c", now.plus(Duration.standardSeconds(3L))), TimestampedValue.of("a", now.plus(Duration.standardSeconds(1L))), TimestampedValue.of("b", now.plus(Duration.standardSeconds(2L))))).withWatermark(now.plus(Duration.standardSeconds(7L))))).tryClaim(KV.of(Watch.Growth.PollResult.incomplete(Arrays.asList(TimestampedValue.of("x", now.plus(Duration.standardSeconds(14L))), TimestampedValue.of("y", now.plus(Duration.standardSeconds(13L))), TimestampedValue.of(CompressorStreamFactory.Z, now.plus(Duration.standardSeconds(12L))))).withWatermark(now.plus(Duration.standardSeconds(17L))), 1)));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1582038175:
                if (implMethodName.equals("lambda$newTracker$45d5fea5$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1249358039:
                if (implMethodName.equals("getKey")) {
                    z = false;
                    break;
                }
                break;
            case -614672747:
                if (implMethodName.equals("lambda$hash128$b0a6753b$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1257743541:
                if (implMethodName.equals("lambda$testSinglePollWithManyResults$43268ee4$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1257743542:
                if (implMethodName.equals("lambda$testSinglePollWithManyResults$43268ee4$2")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/values/KV") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/transforms/WatchTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable2 -> {
                        Assert.assertEquals("Total number of results mismatches", 3000L, Lists.newArrayList(iterable2).size());
                        Assert.assertEquals("Results are not unique", 3000L, Sets.newHashSet(iterable2).size());
                        return null;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/transforms/WatchTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable -> {
                        Assert.assertEquals(1L, Sets.newHashSet(iterable).size());
                        return null;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/vendor/guava/v32_1_2_jre/com/google/common/hash/Funnel") && serializedLambda.getFunctionalInterfaceMethodName().equals("funnel") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/beam/vendor/guava/v32_1_2_jre/com/google/common/hash/PrimitiveSink;)V") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/transforms/WatchTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/beam/vendor/guava/v32_1_2_jre/com/google/common/hash/PrimitiveSink;)V")) {
                    return (str, primitiveSink) -> {
                        try {
                            StringUtf8Coder.of().encode(str, Funnels.asOutputStream(primitiveSink));
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/vendor/guava/v32_1_2_jre/com/google/common/hash/Funnel") && serializedLambda.getFunctionalInterfaceMethodName().equals("funnel") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/beam/vendor/guava/v32_1_2_jre/com/google/common/hash/PrimitiveSink;)V") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/transforms/WatchTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/beam/vendor/guava/v32_1_2_jre/com/google/common/hash/PrimitiveSink;)V")) {
                    return (str2, primitiveSink2) -> {
                        try {
                            StringUtf8Coder.of().encode(str2, Funnels.asOutputStream(primitiveSink2));
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
