package org.apache.beam.sdk.transforms;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DurationCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SnappyCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.AutoValue_Watch_Growth;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Funnel;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Funnels;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.HashCode;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/transforms/Watch.class */
public class Watch {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Watch.class);

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$Growth.class */
    public static abstract class Growth<InputT, OutputT, KeyT> extends PTransform<PCollection<InputT>, PCollection<KV<InputT, OutputT>>> {

        /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$Growth$AfterIterations.class */
        static class AfterIterations<InputT> implements TerminationCondition<InputT, Integer> {
            private final int maxIterations;

            private AfterIterations(int i) {
                this.maxIterations = i;
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public Coder<Integer> getStateCoder() {
                return VarIntCoder.of();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public Integer forNewInput(Instant instant, InputT inputt) {
                return 0;
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public Integer onPollComplete(Integer num) {
                return Integer.valueOf(num.intValue() + 1);
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public boolean canStopPolling(Instant instant, Integer num) {
                return num.intValue() >= this.maxIterations;
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public String toString(Integer num) {
                return "AfterIterations{iterations=" + num + ", maxIterations=" + this.maxIterations + '}';
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public /* bridge */ /* synthetic */ Integer forNewInput(Instant instant, Object obj) {
                return forNewInput(instant, (Instant) obj);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$Growth$AfterTimeSinceNewOutput.class */
        public static class AfterTimeSinceNewOutput<InputT> implements TerminationCondition<InputT, KV<Instant, ReadableDuration>> {
            private final SerializableFunction<InputT, ReadableDuration> maxTimeSinceNewOutput;

            private AfterTimeSinceNewOutput(SerializableFunction<InputT, ReadableDuration> serializableFunction) {
                this.maxTimeSinceNewOutput = serializableFunction;
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public Coder<KV<Instant, ReadableDuration>> getStateCoder() {
                return KvCoder.of(NullableCoder.of(InstantCoder.of()), DurationCoder.of());
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public KV<Instant, ReadableDuration> forNewInput(Instant instant, InputT inputt) {
                return KV.of(null, this.maxTimeSinceNewOutput.apply(inputt));
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public KV<Instant, ReadableDuration> onSeenNewOutput(Instant instant, KV<Instant, ReadableDuration> kv) {
                return KV.of(instant, kv.getValue());
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public boolean canStopPolling(Instant instant, KV<Instant, ReadableDuration> kv) {
                Instant key = kv.getKey();
                return key != null && new Duration(key, instant).isLongerThan(kv.getValue());
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public String toString(KV<Instant, ReadableDuration> kv) {
                return "AfterTimeSinceNewOutput{timeOfLastNewOutput=" + kv.getKey() + ", maxTimeSinceNewOutput=" + kv.getValue() + '}';
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public /* bridge */ /* synthetic */ KV<Instant, ReadableDuration> forNewInput(Instant instant, Object obj) {
                return forNewInput(instant, (Instant) obj);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$Growth$AfterTotalOf.class */
        public static class AfterTotalOf<InputT> implements TerminationCondition<InputT, KV<Instant, ReadableDuration>> {
            private final SerializableFunction<InputT, ReadableDuration> maxTimeSinceInput;

            private AfterTotalOf(SerializableFunction<InputT, ReadableDuration> serializableFunction) {
                this.maxTimeSinceInput = serializableFunction;
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public Coder<KV<Instant, ReadableDuration>> getStateCoder() {
                return KvCoder.of(InstantCoder.of(), DurationCoder.of());
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public KV<Instant, ReadableDuration> forNewInput(Instant instant, InputT inputt) {
                return KV.of(instant, this.maxTimeSinceInput.apply(inputt));
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public boolean canStopPolling(Instant instant, KV<Instant, ReadableDuration> kv) {
                return new Duration(kv.getKey(), instant).isLongerThan(kv.getValue());
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public String toString(KV<Instant, ReadableDuration> kv) {
                return "AfterTotalOf{timeStarted=" + kv.getKey() + ", maxTimeSinceInput=" + kv.getValue() + '}';
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public /* bridge */ /* synthetic */ KV<Instant, ReadableDuration> forNewInput(Instant instant, Object obj) {
                return forNewInput(instant, (Instant) obj);
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$Growth$BinaryCombined.class */
        static class BinaryCombined<InputT, FirstStateT, SecondStateT> implements TerminationCondition<InputT, KV<FirstStateT, SecondStateT>> {
            private final Operation operation;
            private final TerminationCondition<InputT, FirstStateT> first;
            private final TerminationCondition<InputT, SecondStateT> second;

            /* JADX INFO: Access modifiers changed from: private */
            /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$Growth$BinaryCombined$Operation.class */
            public enum Operation {
                OR,
                AND
            }

            public BinaryCombined(Operation operation, TerminationCondition<InputT, FirstStateT> terminationCondition, TerminationCondition<InputT, SecondStateT> terminationCondition2) {
                this.operation = operation;
                this.first = terminationCondition;
                this.second = terminationCondition2;
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public Coder<KV<FirstStateT, SecondStateT>> getStateCoder() {
                return KvCoder.of(this.first.getStateCoder(), this.second.getStateCoder());
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public KV<FirstStateT, SecondStateT> forNewInput(Instant instant, InputT inputt) {
                return KV.of(this.first.forNewInput(instant, inputt), this.second.forNewInput(instant, inputt));
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public KV<FirstStateT, SecondStateT> onSeenNewOutput(Instant instant, KV<FirstStateT, SecondStateT> kv) {
                return KV.of(this.first.onSeenNewOutput(instant, kv.getKey()), this.second.onSeenNewOutput(instant, kv.getValue()));
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public KV<FirstStateT, SecondStateT> onPollComplete(KV<FirstStateT, SecondStateT> kv) {
                return KV.of(this.first.onPollComplete(kv.getKey()), this.second.onPollComplete(kv.getValue()));
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public boolean canStopPolling(Instant instant, KV<FirstStateT, SecondStateT> kv) {
                switch (this.operation) {
                    case OR:
                        return this.first.canStopPolling(instant, kv.getKey()) || this.second.canStopPolling(instant, kv.getValue());
                    case AND:
                        return this.first.canStopPolling(instant, kv.getKey()) && this.second.canStopPolling(instant, kv.getValue());
                    default:
                        throw new UnsupportedOperationException("Unexpected operation " + this.operation);
                }
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public String toString(KV<FirstStateT, SecondStateT> kv) {
                return this.operation + "{first=" + this.first.toString(kv.getKey()) + ", second=" + this.second.toString(kv.getValue()) + '}';
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public /* bridge */ /* synthetic */ Object forNewInput(Instant instant, Object obj) {
                return forNewInput(instant, (Instant) obj);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$Growth$Builder.class */
        public static abstract class Builder<InputT, OutputT, KeyT> {
            abstract Builder<InputT, OutputT, KeyT> setPollFn(Contextful<PollFn<InputT, OutputT>> contextful);

            abstract Builder<InputT, OutputT, KeyT> setOutputKeyFn(SerializableFunction<OutputT, KeyT> serializableFunction);

            abstract Builder<InputT, OutputT, KeyT> setOutputKeyCoder(Coder<KeyT> coder);

            abstract Builder<InputT, OutputT, KeyT> setTerminationPerInput(TerminationCondition<InputT, ?> terminationCondition);

            abstract Builder<InputT, OutputT, KeyT> setPollInterval(Duration duration);

            abstract Builder<InputT, OutputT, KeyT> setOutputCoder(Coder<OutputT> coder);

            abstract Growth<InputT, OutputT, KeyT> build();
        }

        /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$Growth$IgnoreInput.class */
        static class IgnoreInput<InputT, StateT> implements TerminationCondition<InputT, StateT> {
            private final TerminationCondition<?, StateT> wrapped;

            IgnoreInput(TerminationCondition<?, StateT> terminationCondition) {
                this.wrapped = terminationCondition;
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public Coder<StateT> getStateCoder() {
                return this.wrapped.getStateCoder();
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public StateT forNewInput(Instant instant, InputT inputt) {
                return this.wrapped.forNewInput(instant, null);
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public StateT onSeenNewOutput(Instant instant, StateT statet) {
                return this.wrapped.onSeenNewOutput(instant, statet);
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public StateT onPollComplete(StateT statet) {
                return this.wrapped.onPollComplete(statet);
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public boolean canStopPolling(Instant instant, StateT statet) {
                return this.wrapped.canStopPolling(instant, statet);
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public String toString(StateT statet) {
                return this.wrapped.toString(statet);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$Growth$Never.class */
        public static class Never<InputT> implements TerminationCondition<InputT, Integer> {
            Never() {
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public Coder<Integer> getStateCoder() {
                return VarIntCoder.of();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public Integer forNewInput(Instant instant, InputT inputt) {
                return 0;
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public boolean canStopPolling(Instant instant, Integer num) {
                return false;
            }

            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public String toString(Integer num) {
                return "Never";
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition
            public /* bridge */ /* synthetic */ Integer forNewInput(Instant instant, Object obj) {
                return forNewInput(instant, (Instant) obj);
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$Growth$PollFn.class */
        public static abstract class PollFn<InputT, OutputT> implements Contextful.Fn<InputT, PollResult<OutputT>> {
        }

        /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$Growth$PollResult.class */
        public static final class PollResult<OutputT> {
            private final List<TimestampedValue<OutputT>> outputs;
            private final Instant watermark;

            private PollResult(List<TimestampedValue<OutputT>> list, Instant instant) {
                this.outputs = list;
                this.watermark = instant;
            }

            List<TimestampedValue<OutputT>> getOutputs() {
                return this.outputs;
            }

            Instant getWatermark() {
                return this.watermark;
            }

            public PollResult<OutputT> withWatermark(Instant instant) {
                Preconditions.checkNotNull(instant, "watermark");
                return new PollResult<>(this.outputs, instant);
            }

            public PollResult<OutputT> withOutputs(List<TimestampedValue<OutputT>> list) {
                Preconditions.checkNotNull(list);
                return new PollResult<>(list, this.watermark);
            }

            public String toString() {
                return MoreObjects.toStringHelper(this).add("watermark", this.watermark).add("outputs", this.outputs).toString();
            }

            public static <OutputT> PollResult<OutputT> complete(List<TimestampedValue<OutputT>> list) {
                return new PollResult<>(list, BoundedWindow.TIMESTAMP_MAX_VALUE);
            }

            public static <OutputT> PollResult<OutputT> complete(Instant instant, List<OutputT> list) {
                return new PollResult<>(addTimestamp(instant, list), BoundedWindow.TIMESTAMP_MAX_VALUE);
            }

            public static <OutputT> PollResult<OutputT> incomplete(List<TimestampedValue<OutputT>> list) {
                return new PollResult<>(list, null);
            }

            public static <OutputT> PollResult<OutputT> incomplete(Instant instant, List<OutputT> list) {
                return new PollResult<>(addTimestamp(instant, list), null);
            }

            private static <OutputT> List<TimestampedValue<OutputT>> addTimestamp(Instant instant, List<OutputT> list) {
                ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(list.size());
                Iterator<OutputT> it = list.iterator();
                while (it.hasNext()) {
                    newArrayListWithExpectedSize.add(TimestampedValue.of(it.next(), instant));
                }
                return newArrayListWithExpectedSize;
            }

            public boolean equals(Object obj) {
                if (this == obj) {
                    return true;
                }
                if (obj == null || getClass() != obj.getClass()) {
                    return false;
                }
                PollResult pollResult = (PollResult) obj;
                return Objects.equals(this.outputs, pollResult.outputs) && Objects.equals(this.watermark, pollResult.watermark);
            }

            public int hashCode() {
                return Objects.hash(this.outputs, this.watermark);
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$Growth$TerminationCondition.class */
        public interface TerminationCondition<InputT, StateT> extends Serializable {
            Coder<StateT> getStateCoder();

            StateT forNewInput(Instant instant, InputT inputt);

            default StateT onSeenNewOutput(Instant instant, StateT statet) {
                return statet;
            }

            default StateT onPollComplete(StateT statet) {
                return statet;
            }

            boolean canStopPolling(Instant instant, StateT statet);

            String toString(StateT statet);
        }

        public static <InputT> Never<InputT> never() {
            return new Never<>();
        }

        public static <InputT, StateT> TerminationCondition<InputT, StateT> ignoreInput(TerminationCondition<?, StateT> terminationCondition) {
            return new IgnoreInput(terminationCondition);
        }

        public static <InputT> AfterTotalOf<InputT> afterTotalOf(ReadableDuration readableDuration) {
            return afterTotalOf(SerializableFunctions.constant(readableDuration));
        }

        public static <InputT> AfterTotalOf<InputT> afterTotalOf(SerializableFunction<InputT, ReadableDuration> serializableFunction) {
            return new AfterTotalOf<>(serializableFunction);
        }

        public static <InputT> AfterTimeSinceNewOutput<InputT> afterTimeSinceNewOutput(ReadableDuration readableDuration) {
            return afterTimeSinceNewOutput(SerializableFunctions.constant(readableDuration));
        }

        public static <InputT> AfterTimeSinceNewOutput<InputT> afterTimeSinceNewOutput(SerializableFunction<InputT, ReadableDuration> serializableFunction) {
            return new AfterTimeSinceNewOutput<>(serializableFunction);
        }

        public static <InputT> AfterIterations<InputT> afterIterations(int i) {
            return new AfterIterations<>(i);
        }

        public static <InputT, FirstStateT, SecondStateT> BinaryCombined<InputT, FirstStateT, SecondStateT> eitherOf(TerminationCondition<InputT, FirstStateT> terminationCondition, TerminationCondition<InputT, SecondStateT> terminationCondition2) {
            return new BinaryCombined<>(BinaryCombined.Operation.OR, terminationCondition, terminationCondition2);
        }

        public static <InputT, FirstStateT, SecondStateT> BinaryCombined<InputT, FirstStateT, SecondStateT> allOf(TerminationCondition<InputT, FirstStateT> terminationCondition, TerminationCondition<InputT, SecondStateT> terminationCondition2) {
            return new BinaryCombined<>(BinaryCombined.Operation.AND, terminationCondition, terminationCondition2);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Contextful<PollFn<InputT, OutputT>> getPollFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SerializableFunction<OutputT, KeyT> getOutputKeyFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Coder<KeyT> getOutputKeyCoder();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration getPollInterval();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract TerminationCondition<InputT, ?> getTerminationPerInput();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Coder<OutputT> getOutputCoder();

        abstract Builder<InputT, OutputT, KeyT> toBuilder();

        public Growth<InputT, OutputT, KeyT> withOutputKeyCoder(Coder<KeyT> coder) {
            return toBuilder().setOutputKeyCoder(coder).build();
        }

        public Growth<InputT, OutputT, KeyT> withTerminationPerInput(TerminationCondition<InputT, ?> terminationCondition) {
            return toBuilder().setTerminationPerInput(terminationCondition).build();
        }

        public Growth<InputT, OutputT, KeyT> withPollInterval(Duration duration) {
            return toBuilder().setPollInterval(duration).build();
        }

        public Growth<InputT, OutputT, KeyT> withOutputCoder(Coder<OutputT> coder) {
            return toBuilder().setOutputCoder(coder).build();
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        /* renamed from: expand, reason: merged with bridge method [inline-methods] */
        public PCollection<KV<InputT, OutputT>> mo3758expand(PCollection<InputT> pCollection) {
            Preconditions.checkNotNull(getPollInterval(), "pollInterval");
            Preconditions.checkNotNull(getTerminationPerInput(), "terminationPerInput");
            Coder<OutputT> outputCoder = getOutputCoder();
            if (outputCoder == null) {
                TypeDescriptor extractFromTypeParameters = TypeDescriptors.extractFromTypeParameters(getPollFn().getClosure(), (Class<? super PollFn<InputT, OutputT>>) PollFn.class, (TypeDescriptors.TypeVariableExtractor<PollFn<InputT, OutputT>, V>) new TypeDescriptors.TypeVariableExtractor<PollFn<InputT, OutputT>, OutputT>() { // from class: org.apache.beam.sdk.transforms.Watch.Growth.1
                });
                try {
                    outputCoder = pCollection.getPipeline().getCoderRegistry().getCoder(extractFromTypeParameters);
                } catch (CannotProvideCoderException e) {
                    throw new RuntimeException("Unable to infer coder for OutputT (" + extractFromTypeParameters + "). Specify it explicitly using withOutputCoder().");
                }
            }
            Coder<OutputT> outputKeyCoder = getOutputKeyCoder();
            SerializableFunction<OutputT, KeyT> outputKeyFn = getOutputKeyFn();
            if (getOutputKeyFn() == null) {
                outputKeyCoder = outputCoder;
                outputKeyFn = SerializableFunctions.identity();
            } else {
                if (outputKeyCoder == null) {
                    TypeDescriptor outputOf = TypeDescriptors.outputOf((SerializableFunction) getOutputKeyFn());
                    try {
                        outputKeyCoder = pCollection.getPipeline().getCoderRegistry().getCoder(outputOf);
                    } catch (CannotProvideCoderException e2) {
                        throw new RuntimeException("Unable to infer coder for KeyT (" + outputOf + "). Specify it explicitly using withOutputKeyCoder().");
                    }
                }
                try {
                    outputKeyCoder.verifyDeterministic();
                } catch (Coder.NonDeterministicException e3) {
                    throw new IllegalArgumentException("Key coder " + outputKeyCoder + " must be deterministic");
                }
            }
            return ((PCollection) ((PCollection) pCollection.apply(ParDo.of(new WatchGrowthFn(this, outputCoder, outputKeyFn, outputKeyCoder)).withSideInputs(getPollFn().getRequirements().getSideInputs()))).setCoder(KvCoder.of(pCollection.getCoder(), ListCoder.of(TimestampedValue.TimestampedValueCoder.of(outputCoder)))).apply(ParDo.of(new PollResultSplitFn()))).setCoder(KvCoder.of(pCollection.getCoder(), outputCoder));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$GrowthState.class */
    public static abstract class GrowthState {
        GrowthState() {
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$GrowthStateCoder.class */
    static class GrowthStateCoder<OutputT, TerminationStateT> extends StructuredCoder<GrowthState> {
        private static final int POLLING_GROWTH_STATE = 0;
        private static final int NON_POLLING_GROWTH_STATE = 1;
        private static final MapCoder<HashCode, Instant> COMPLETED_CODER = MapCoder.of(HashCode128Coder.of(), InstantCoder.of());
        private static final Coder<Instant> NULLABLE_INSTANT_CODER = NullableCoder.of(InstantCoder.of());
        private final Coder<OutputT> outputCoder;
        private final Coder<List<TimestampedValue<OutputT>>> timestampedOutputCoder;
        private final Coder<TerminationStateT> terminationStateCoder;

        public static <OutputT, TerminationStateT> GrowthStateCoder<OutputT, TerminationStateT> of(Coder<OutputT> coder, Coder<TerminationStateT> coder2) {
            return new GrowthStateCoder<>(coder, coder2);
        }

        private GrowthStateCoder(Coder<OutputT> coder, Coder<TerminationStateT> coder2) {
            this.outputCoder = coder;
            this.terminationStateCoder = coder2;
            this.timestampedOutputCoder = ListCoder.of(TimestampedValue.TimestampedValueCoder.of(coder));
        }

        @Override // org.apache.beam.sdk.coders.Coder
        public void encode(GrowthState growthState, OutputStream outputStream) throws IOException {
            if (growthState instanceof PollingGrowthState) {
                VarInt.encode(0, outputStream);
                encodePollingGrowthState((PollingGrowthState) growthState, outputStream);
            } else {
                if (!(growthState instanceof NonPollingGrowthState)) {
                    throw new IOException("Unknown growth state: " + growthState);
                }
                VarInt.encode(1, outputStream);
                encodeNonPollingGrowthState((NonPollingGrowthState) growthState, outputStream);
            }
        }

        private void encodePollingGrowthState(PollingGrowthState<TerminationStateT> pollingGrowthState, OutputStream outputStream) throws IOException {
            this.terminationStateCoder.encode(pollingGrowthState.getTerminationState(), outputStream);
            NULLABLE_INSTANT_CODER.encode(pollingGrowthState.getPollWatermark(), outputStream);
            COMPLETED_CODER.encode((Map<HashCode, Instant>) pollingGrowthState.getCompleted(), outputStream);
        }

        private void encodeNonPollingGrowthState(NonPollingGrowthState<OutputT> nonPollingGrowthState, OutputStream outputStream) throws IOException {
            NULLABLE_INSTANT_CODER.encode(nonPollingGrowthState.getPending().getWatermark(), outputStream);
            this.timestampedOutputCoder.encode(nonPollingGrowthState.getPending().getOutputs(), outputStream);
        }

        @Override // org.apache.beam.sdk.coders.Coder
        public GrowthState decode(InputStream inputStream) throws IOException {
            int decodeInt = VarInt.decodeInt(inputStream);
            switch (decodeInt) {
                case 0:
                    return decodePollingGrowthState(inputStream);
                case 1:
                    return decodeNonPollingGrowthState(inputStream);
                default:
                    throw new IOException("Unknown growth state type " + decodeInt);
            }
        }

        private GrowthState decodeNonPollingGrowthState(InputStream inputStream) throws IOException {
            return NonPollingGrowthState.of(new Growth.PollResult(this.timestampedOutputCoder.decode(inputStream), NULLABLE_INSTANT_CODER.decode(inputStream)));
        }

        private GrowthState decodePollingGrowthState(InputStream inputStream) throws IOException {
            TerminationStateT decode = this.terminationStateCoder.decode(inputStream);
            return PollingGrowthState.of(ImmutableMap.copyOf((Map) COMPLETED_CODER.decode(inputStream)), NULLABLE_INSTANT_CODER.decode(inputStream), decode);
        }

        @Override // org.apache.beam.sdk.coders.Coder
        public List<? extends Coder<?>> getCoderArguments() {
            return Arrays.asList(this.outputCoder, this.terminationStateCoder);
        }

        @Override // org.apache.beam.sdk.coders.Coder
        public void verifyDeterministic() throws Coder.NonDeterministicException {
            this.outputCoder.verifyDeterministic();
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$GrowthTracker.class */
    static class GrowthTracker<OutputT, TerminationStateT> extends RestrictionTracker<GrowthState, KV<Growth.PollResult<OutputT>, TerminationStateT>> {
        static final GrowthState EMPTY_STATE = NonPollingGrowthState.of(new Growth.PollResult(Collections.emptyList(), null));
        private final Funnel<OutputT> coderFunnel;
        private Growth.PollResult<OutputT> claimedPollResult;
        private TerminationStateT claimedTerminationState;
        private ImmutableMap<HashCode, Instant> claimedHashes;
        private GrowthState state;
        private boolean shouldStop = false;

        /* JADX INFO: Access modifiers changed from: package-private */
        public GrowthTracker(GrowthState growthState, Funnel<OutputT> funnel) {
            this.state = growthState;
            this.coderFunnel = funnel;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
        public GrowthState currentRestriction() {
            return this.state;
        }

        @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
        public SplitResult<GrowthState> trySplit(double d) {
            Object of;
            if (this.claimedPollResult == null) {
                of = this.state;
                this.state = EMPTY_STATE;
            } else if (this.state instanceof NonPollingGrowthState) {
                of = EMPTY_STATE;
            } else {
                PollingGrowthState pollingGrowthState = (PollingGrowthState) this.state;
                ImmutableMap.Builder builder = ImmutableMap.builder();
                builder.putAll(pollingGrowthState.getCompleted());
                builder.putAll(this.claimedHashes);
                of = PollingGrowthState.of(builder.build(), (Instant) Ordering.natural().nullsFirst().max(pollingGrowthState.getPollWatermark(), ((Growth.PollResult) this.claimedPollResult).watermark), this.claimedTerminationState);
                this.state = NonPollingGrowthState.of(this.claimedPollResult);
            }
            this.shouldStop = true;
            return SplitResult.of(this.state, of);
        }

        private HashCode hash128(OutputT outputt) {
            return Hashing.murmur3_128().hashObject(outputt, this.coderFunnel);
        }

        @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
        public void checkDone() throws IllegalStateException {
            Preconditions.checkState(this.shouldStop, "Missing tryClaim()/checkpoint() call. Expected one or the other.");
        }

        @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
        public RestrictionTracker.IsBounded isBounded() {
            return (this.state == EMPTY_STATE || (this.state instanceof NonPollingGrowthState)) ? RestrictionTracker.IsBounded.BOUNDED : RestrictionTracker.IsBounded.UNBOUNDED;
        }

        @Override // org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker
        public boolean tryClaim(KV<Growth.PollResult<OutputT>, TerminationStateT> kv) {
            if (this.shouldStop) {
                return false;
            }
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (TimestampedValue<OutputT> timestampedValue : kv.getKey().getOutputs()) {
                builder.put(hash128(timestampedValue.getValue()), timestampedValue.getTimestamp());
            }
            ImmutableMap<HashCode, Instant> build = builder.build();
            if (!(this.state instanceof PollingGrowthState)) {
                HashSet hashSet = new HashSet();
                Iterator<TimestampedValue<OutputT>> it = ((NonPollingGrowthState) this.state).getPending().getOutputs().iterator();
                while (it.hasNext()) {
                    hashSet.add(hash128(it.next().getValue()));
                }
                if (!hashSet.equals(build.keySet())) {
                    return false;
                }
            } else if (!Collections.disjoint(build.keySet(), ((PollingGrowthState) this.state).getCompleted().keySet())) {
                return false;
            }
            this.shouldStop = true;
            this.claimedPollResult = kv.getKey();
            this.claimedTerminationState = kv.getValue();
            this.claimedHashes = build;
            return true;
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add(ByteBuddyDoFnInvokerFactory.STATE_PARAMETER_METHOD, this.state).add("pollResult", this.claimedPollResult).add("terminationState", this.claimedTerminationState).add("shouldStop", this.shouldStop).toString();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$HashCode128Coder.class */
    private static class HashCode128Coder extends AtomicCoder<HashCode> {
        private static final HashCode128Coder INSTANCE = new HashCode128Coder();

        private HashCode128Coder() {
        }

        public static HashCode128Coder of() {
            return INSTANCE;
        }

        @Override // org.apache.beam.sdk.coders.Coder
        public void encode(HashCode hashCode, OutputStream outputStream) throws IOException {
            Preconditions.checkArgument(hashCode.bits() == 128, "Expected a 128-bit hash code, but got %s bits", hashCode.bits());
            byte[] bArr = new byte[16];
            hashCode.writeBytesTo(bArr, 0, 16);
            outputStream.write(bArr);
        }

        @Override // org.apache.beam.sdk.coders.Coder
        public HashCode decode(InputStream inputStream) throws IOException {
            byte[] bArr = new byte[16];
            int read = inputStream.read(bArr, 0, 16);
            Preconditions.checkArgument(read == 16, "Expected to read 16 bytes, but read %s", read);
            return HashCode.fromBytes(bArr);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$NonPollingGrowthState.class */
    public static abstract class NonPollingGrowthState<OutputT> extends GrowthState {
        public static <OutputT> NonPollingGrowthState<OutputT> of(Growth.PollResult<OutputT> pollResult) {
            return new AutoValue_Watch_NonPollingGrowthState(pollResult);
        }

        public abstract Growth.PollResult<OutputT> getPending();
    }

    /* JADX INFO: Access modifiers changed from: private */
    @DoFn.BoundedPerElement
    /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$PollResultSplitFn.class */
    public static class PollResultSplitFn<InputT, OutputT> extends DoFn<KV<InputT, List<TimestampedValue<OutputT>>>, KV<InputT, OutputT>> {
        private PollResultSplitFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<InputT, List<TimestampedValue<OutputT>>>, KV<InputT, OutputT>>.ProcessContext processContext, RestrictionTracker<OffsetRange, Long> restrictionTracker) {
            long from = restrictionTracker.currentRestriction().getFrom();
            while (true) {
                long j = from;
                if (!restrictionTracker.tryClaim(Long.valueOf(j))) {
                    return;
                }
                TimestampedValue timestampedValue = (TimestampedValue) ((List) ((KV) processContext.element()).getValue()).get((int) j);
                processContext.outputWithTimestamp(KV.of(((KV) processContext.element()).getKey(), timestampedValue.getValue()), timestampedValue.getTimestamp());
                from = j + 1;
            }
        }

        @DoFn.GetInitialWatermarkEstimatorState
        public Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp Instant instant) {
            return instant;
        }

        @DoFn.NewWatermarkEstimator
        public WatermarkEstimators.MonotonicallyIncreasing newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
            return new WatermarkEstimators.MonotonicallyIncreasing(instant);
        }

        @DoFn.GetInitialRestriction
        public OffsetRange getInitialRestriction(@DoFn.Element KV<InputT, List<TimestampedValue<OutputT>>> kv) {
            return new OffsetRange(0L, kv.getValue().size());
        }

        @DoFn.NewTracker
        public OffsetRangeTracker newTracker(@DoFn.Restriction OffsetRange offsetRange) {
            return offsetRange.newTracker();
        }

        @DoFn.GetRestrictionCoder
        public Coder<OffsetRange> getRestrictionCoder() {
            return OffsetRange.Coder.of();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$PollingGrowthState.class */
    public static abstract class PollingGrowthState<TerminationStateT> extends GrowthState {
        public static <TerminationStateT> PollingGrowthState<TerminationStateT> of(TerminationStateT terminationstatet) {
            return new AutoValue_Watch_PollingGrowthState(ImmutableMap.of(), null, terminationstatet);
        }

        public static <TerminationStateT> PollingGrowthState<TerminationStateT> of(ImmutableMap<HashCode, Instant> immutableMap, Instant instant, TerminationStateT terminationstatet) {
            return new AutoValue_Watch_PollingGrowthState(immutableMap, instant, terminationstatet);
        }

        public abstract ImmutableMap<HashCode, Instant> getCompleted();

        public abstract Instant getPollWatermark();

        public abstract TerminationStateT getTerminationState();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @DoFn.UnboundedPerElement
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/sdk/transforms/Watch$WatchGrowthFn.class */
    public static class WatchGrowthFn<InputT, OutputT, KeyT, TerminationStateT> extends DoFn<InputT, KV<InputT, List<TimestampedValue<OutputT>>>> {
        private final Growth<InputT, OutputT, KeyT> spec;
        private final Coder<OutputT> outputCoder;
        private final SerializableFunction<OutputT, KeyT> outputKeyFn;
        private final Coder<KeyT> outputKeyCoder;
        private final Funnel<OutputT> coderFunnel;

        /* JADX INFO: Access modifiers changed from: package-private */
        public WatchGrowthFn(Growth<InputT, OutputT, KeyT> growth, Coder<OutputT> coder, SerializableFunction<OutputT, KeyT> serializableFunction, Coder<KeyT> coder2) {
            this.spec = growth;
            this.outputCoder = coder;
            this.outputKeyFn = serializableFunction;
            this.outputKeyCoder = coder2;
            this.coderFunnel = (obj, primitiveSink) -> {
                try {
                    coder2.encode(serializableFunction.apply(obj), Funnels.asOutputStream(primitiveSink));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            };
        }

        @DoFn.GetInitialWatermarkEstimatorState
        public Instant getInitialWatermarkEstimatorState(@DoFn.Timestamp Instant instant) {
            return instant;
        }

        @DoFn.NewWatermarkEstimator
        public WatermarkEstimators.Manual newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
            return new WatermarkEstimators.Manual(instant);
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation process(DoFn<InputT, KV<InputT, List<TimestampedValue<OutputT>>>>.ProcessContext processContext, RestrictionTracker<GrowthState, KV<Growth.PollResult<OutputT>, TerminationStateT>> restrictionTracker, ManualWatermarkEstimator<Instant> manualWatermarkEstimator) throws Exception {
            GrowthState currentRestriction = restrictionTracker.currentRestriction();
            if (currentRestriction instanceof NonPollingGrowthState) {
                Growth.PollResult<OutputT> pending = ((NonPollingGrowthState) currentRestriction).getPending();
                if (restrictionTracker.tryClaim(KV.of(pending, null)) && !pending.getOutputs().isEmpty()) {
                    Watch.LOG.info("{} - re-emitting output of prior poll containing {} results.", processContext.element(), Integer.valueOf(pending.getOutputs().size()));
                    processContext.output(KV.of(processContext.element(), pending.getOutputs()));
                }
                return DoFn.ProcessContinuation.stop();
            }
            Instant now = Instant.now();
            Growth.PollResult<OutputT> pollResult = (Growth.PollResult) ((Growth.PollFn<InputT, OutputT>) this.spec.getPollFn().getClosure()).apply(processContext.element(), Contextful.Fn.Context.wrapProcessContext(processContext));
            PollingGrowthState<TerminationStateT> pollingGrowthState = (PollingGrowthState) currentRestriction;
            Growth.PollResult computeNeverSeenBeforeResults = computeNeverSeenBeforeResults(pollingGrowthState, pollResult);
            Watch.LOG.info("{} - current round of polling took {} ms and returned {} results, of which {} were new.", processContext.element(), Long.valueOf(new Duration(now, Instant.now()).getMillis()), Integer.valueOf(pollResult.getOutputs().size()), Integer.valueOf(computeNeverSeenBeforeResults.getOutputs().size()));
            TerminationStateT terminationState = pollingGrowthState.getTerminationState();
            if (!computeNeverSeenBeforeResults.getOutputs().isEmpty()) {
                terminationState = getTerminationCondition().onSeenNewOutput(Instant.now(), terminationState);
            }
            TerminationStateT onPollComplete = getTerminationCondition().onPollComplete(terminationState);
            if (!restrictionTracker.tryClaim(KV.of(computeNeverSeenBeforeResults, onPollComplete))) {
                Watch.LOG.info("{} - will not emit poll result tryClaim failed.", processContext.element());
                return DoFn.ProcessContinuation.stop();
            }
            if (!computeNeverSeenBeforeResults.getOutputs().isEmpty()) {
                processContext.output(KV.of(processContext.element(), computeNeverSeenBeforeResults.getOutputs()));
            }
            Instant instant = null;
            if (computeNeverSeenBeforeResults.getWatermark() != null) {
                instant = computeNeverSeenBeforeResults.getWatermark();
            } else if (!computeNeverSeenBeforeResults.getOutputs().isEmpty()) {
                instant = computeNeverSeenBeforeResults.getOutputs().get(0).getTimestamp();
            }
            Instant now2 = Instant.now();
            if (getTerminationCondition().canStopPolling(now2, onPollComplete)) {
                Watch.LOG.info("{} - told to stop polling by polling function at {} with termination state {}.", processContext.element(), now2, getTerminationCondition().toString(onPollComplete));
                return DoFn.ProcessContinuation.stop();
            }
            if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(instant)) {
                Watch.LOG.info("{} - will stop polling, reached max timestamp.", processContext.element());
                return DoFn.ProcessContinuation.stop();
            }
            if (instant != null) {
                manualWatermarkEstimator.setWatermark(instant);
            }
            Watch.LOG.info("{} - will resume polling in {} ms.", processContext.element(), Long.valueOf(this.spec.getPollInterval().getMillis()));
            return DoFn.ProcessContinuation.resume().withResumeDelay(this.spec.getPollInterval());
        }

        private HashCode hash128(OutputT outputt) {
            return Hashing.murmur3_128().hashObject(outputt, this.coderFunnel);
        }

        private Growth.PollResult computeNeverSeenBeforeResults(PollingGrowthState<TerminationStateT> pollingGrowthState, Growth.PollResult<OutputT> pollResult) {
            HashMap newHashMap = Maps.newHashMap();
            for (TimestampedValue<OutputT> timestampedValue : pollResult.getOutputs()) {
                HashCode hash128 = hash128(timestampedValue.getValue());
                if (!pollingGrowthState.getCompleted().containsKey(hash128) && !newHashMap.containsKey(hash128)) {
                    newHashMap.put(hash128, timestampedValue);
                }
            }
            return pollResult.withOutputs(Ordering.natural().onResultOf(timestampedValue2 -> {
                return timestampedValue2.getTimestamp();
            }).sortedCopy(newHashMap.values()));
        }

        private Growth.TerminationCondition<InputT, TerminationStateT> getTerminationCondition() {
            return this.spec.getTerminationPerInput();
        }

        @DoFn.GetInitialRestriction
        public GrowthState getInitialRestriction(@DoFn.Element InputT inputt) {
            return PollingGrowthState.of(getTerminationCondition().forNewInput(Instant.now(), inputt));
        }

        @DoFn.NewTracker
        public GrowthTracker<OutputT, TerminationStateT> newTracker(@DoFn.Restriction GrowthState growthState) {
            return new GrowthTracker<>(growthState, this.coderFunnel);
        }

        @DoFn.GetRestrictionCoder
        public Coder<GrowthState> getRestrictionCoder() {
            return SnappyCoder.of(GrowthStateCoder.of(this.outputCoder, this.spec.getTerminationPerInput().getStateCoder()));
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 637000526:
                    if (implMethodName.equals("lambda$new$b100bdc2$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/vendor/guava/v26_0_jre/com/google/common/hash/Funnel") && serializedLambda.getFunctionalInterfaceMethodName().equals("funnel") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/beam/vendor/guava/v26_0_jre/com/google/common/hash/PrimitiveSink;)V") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/transforms/Watch$WatchGrowthFn") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/transforms/SerializableFunction;Lorg/apache/beam/sdk/coders/Coder;Ljava/lang/Object;Lorg/apache/beam/vendor/guava/v26_0_jre/com/google/common/hash/PrimitiveSink;)V")) {
                        SerializableFunction serializableFunction = (SerializableFunction) serializedLambda.getCapturedArg(0);
                        Coder coder = (Coder) serializedLambda.getCapturedArg(1);
                        return (obj, primitiveSink) -> {
                            try {
                                coder.encode(serializableFunction.apply(obj), Funnels.asOutputStream(primitiveSink));
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    public static <InputT, OutputT> Growth<InputT, OutputT, OutputT> growthOf(Growth.PollFn<InputT, OutputT> pollFn, Requirements requirements) {
        return new AutoValue_Watch_Growth.Builder().setTerminationPerInput(Growth.never()).setPollFn(Contextful.of(pollFn, requirements)).setOutputKeyFn(null).build();
    }

    public static <InputT, OutputT> Growth<InputT, OutputT, OutputT> growthOf(Growth.PollFn<InputT, OutputT> pollFn) {
        return growthOf(pollFn, Requirements.empty());
    }

    public static <InputT, OutputT, KeyT> Growth<InputT, OutputT, KeyT> growthOf(Contextful<Growth.PollFn<InputT, OutputT>> contextful, SerializableFunction<OutputT, KeyT> serializableFunction) {
        Preconditions.checkArgument(contextful != null, "pollFn can not be null");
        Preconditions.checkArgument(serializableFunction != null, "outputKeyFn can not be null");
        return new AutoValue_Watch_Growth.Builder().setTerminationPerInput(Growth.never()).setPollFn(contextful).setOutputKeyFn(serializableFunction).build();
    }
}
