package org.apache.beam.fn.harness;

import com.google.common.collect.testing.SampleElements;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.beam.fn.harness.FnApiDoFnRunner;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.BeamFnTimerClient;
import org.apache.beam.fn.harness.data.FakeBeamFnTimerClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.FakeBeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
import org.apache.beam.runners.core.construction.graph.SplittableParDoExpander;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricUpdates;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerMap;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.util.Durations;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsMapContaining;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Enclosed.class)
/* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest.class */
public class FnApiDoFnRunnerTest implements Serializable {

    @RunWith(JUnit4.class)
    /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$ExecutionTest.class */
    public static class ExecutionTest implements Serializable {

        @Rule
        public transient ResetDateTimeProvider dateTimeProvider = new ResetDateTimeProvider();
        private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FnApiDoFnRunnerTest.class);
        public static final String TEST_TRANSFORM_ID = "pTransformId";

        /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$ExecutionTest$ConcatCombineFn.class */
        private static class ConcatCombineFn extends Combine.CombineFn<String, String, String> {
            private ConcatCombineFn() {
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.beam.sdk.transforms.Combine.CombineFn
            public String createAccumulator() {
                return "";
            }

            @Override // org.apache.beam.sdk.transforms.Combine.CombineFn
            public String addInput(String str, String str2) {
                return str.concat(str2);
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.beam.sdk.transforms.Combine.CombineFn
            public String mergeAccumulators(Iterable<String> iterable) {
                StringBuilder sb = new StringBuilder();
                Iterator<String> it = iterable.iterator();
                while (it.hasNext()) {
                    sb.append(it.next());
                }
                return sb.toString();
            }

            @Override // org.apache.beam.sdk.transforms.Combine.CombineFn
            public String extractOutput(String str) {
                return str;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$ExecutionTest$NonWindowObservingTestSplittableDoFn.class */
        public static class NonWindowObservingTestSplittableDoFn extends DoFn<String, String> {
            private static final ConcurrentMap<String, KV<CountDownLatch, CountDownLatch>> DOFN_INSTANCE_TO_LOCK = new ConcurrentHashMap();
            private static final long SPLIT_ELEMENT = 3;
            private static final long CHECKPOINT_UPPER_BOUND = 8;
            private final String uuid;

            private KV<CountDownLatch, CountDownLatch> getLatches() {
                return DOFN_INSTANCE_TO_LOCK.computeIfAbsent(this.uuid, str -> {
                    return KV.of(new CountDownLatch(1), new CountDownLatch(1));
                });
            }

            public void enableAndWaitForTrySplitToHappen() throws Exception {
                KV<CountDownLatch, CountDownLatch> latches = getLatches();
                latches.getKey().countDown();
                if (latches.getValue().await(30L, TimeUnit.SECONDS)) {
                    return;
                }
                Assert.fail("Failed to wait for trySplit to occur.");
            }

            public void waitForSplitElementToBeProcessed() throws Exception {
                if (getLatches().getKey().await(30L, TimeUnit.SECONDS)) {
                    return;
                }
                Assert.fail("Failed to wait for split element to be processed.");
            }

            public void releaseWaitingProcessElementThread() {
                getLatches().getValue().countDown();
            }

            private NonWindowObservingTestSplittableDoFn() {
                this.uuid = UUID.randomUUID().toString();
            }

            @DoFn.ProcessElement
            public DoFn.ProcessContinuation processElement(DoFn<String, String>.ProcessContext processContext, RestrictionTracker<OffsetRange, Long> restrictionTracker, ManualWatermarkEstimator<Instant> manualWatermarkEstimator) throws Exception {
                boolean tryClaim;
                long from = restrictionTracker.currentRestriction().getFrom();
                do {
                    tryClaim = restrictionTracker.tryClaim(Long.valueOf(from));
                    if (!tryClaim) {
                        break;
                    }
                    if (from == 3) {
                        enableAndWaitForTrySplitToHappen();
                    }
                    processContext.outputWithTimestamp(processContext.element() + ":" + from, GlobalWindow.TIMESTAMP_MIN_VALUE.plus(from));
                    manualWatermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(from));
                    from++;
                } while (from != 8);
                return !tryClaim ? DoFn.ProcessContinuation.stop() : DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(54321L));
            }

            @DoFn.GetInitialRestriction
            public OffsetRange restriction(@DoFn.Element String str) {
                return new OffsetRange(0L, Integer.parseInt(str));
            }

            @DoFn.NewTracker
            public RestrictionTracker<OffsetRange, Long> newTracker(@DoFn.Restriction OffsetRange offsetRange) {
                return new OffsetRangeTracker(offsetRange);
            }

            @DoFn.SplitRestriction
            public void splitRange(@DoFn.Restriction OffsetRange offsetRange, DoFn.OutputReceiver<OffsetRange> outputReceiver) {
                outputReceiver.output(new OffsetRange(offsetRange.getFrom(), (offsetRange.getFrom() + offsetRange.getTo()) / 2));
                outputReceiver.output(new OffsetRange((offsetRange.getFrom() + offsetRange.getTo()) / 2, offsetRange.getTo()));
            }

            @DoFn.TruncateRestriction
            public RestrictionTracker.TruncateResult<OffsetRange> truncateRestriction(@DoFn.Restriction OffsetRange offsetRange) throws Exception {
                return RestrictionTracker.TruncateResult.of(new OffsetRange(offsetRange.getFrom(), offsetRange.getTo() / 2));
            }

            @DoFn.GetInitialWatermarkEstimatorState
            public Instant getInitialWatermarkEstimatorState() {
                return GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L);
            }

            @DoFn.NewWatermarkEstimator
            public WatermarkEstimators.Manual newWatermarkEstimator(@DoFn.WatermarkEstimatorState Instant instant) {
                return new WatermarkEstimators.Manual(instant);
            }
        }

        /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$ExecutionTest$SplittableFnDataReceiver.class */
        private static class SplittableFnDataReceiver implements HandlesSplits, FnDataReceiver<WindowedValue> {
            private final List<WindowedValue<KV<KV<String, OffsetRange>, Double>>> mainOutputValues;

            SplittableFnDataReceiver(List<WindowedValue<KV<KV<String, OffsetRange>, Double>>> list) {
                this.mainOutputValues = list;
            }

            public HandlesSplits.SplitResult trySplit(double d) {
                return ExecutionTest.createSplitResult(d);
            }

            public double getProgress() {
                return 0.7d;
            }

            @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
            public void accept(WindowedValue windowedValue) throws Exception {
                this.mainOutputValues.add(windowedValue);
            }
        }

        /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$ExecutionTest$TestNonWindowObservingDoFn.class */
        private static class TestNonWindowObservingDoFn extends DoFn<String, String> {
            private final TupleTag<String> additionalOutput;

            private TestNonWindowObservingDoFn(TupleTag<String> tupleTag) {
                this.additionalOutput = tupleTag;
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<String, String>.ProcessContext processContext) {
                processContext.output(processContext.element() + ":main");
                processContext.output(this.additionalOutput, processContext.element() + ":additional");
            }
        }

        /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$ExecutionTest$TestSideInputDoFn.class */
        private static class TestSideInputDoFn extends DoFn<String, String> {
            private final PCollectionView<String> defaultSingletonSideInput;
            private final PCollectionView<String> singletonSideInput;
            private final PCollectionView<Iterable<String>> iterableSideInput;
            private final TupleTag<String> additionalOutput;

            private TestSideInputDoFn(PCollectionView<String> pCollectionView, PCollectionView<String> pCollectionView2, PCollectionView<Iterable<String>> pCollectionView3, TupleTag<String> tupleTag) {
                this.defaultSingletonSideInput = pCollectionView;
                this.singletonSideInput = pCollectionView2;
                this.iterableSideInput = pCollectionView3;
                this.additionalOutput = tupleTag;
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<String, String>.ProcessContext processContext) {
                processContext.output(processContext.element() + ":" + ((String) processContext.sideInput(this.defaultSingletonSideInput)));
                processContext.output(processContext.element() + ":" + ((String) processContext.sideInput(this.singletonSideInput)));
                Iterator it = ((Iterable) processContext.sideInput(this.iterableSideInput)).iterator();
                while (it.hasNext()) {
                    processContext.output(processContext.element() + ":" + ((String) it.next()));
                }
                processContext.output(this.additionalOutput, processContext.element() + ":additional");
            }
        }

        /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$ExecutionTest$TestSideInputIsAccessibleForDownstreamCallersDoFn.class */
        private static class TestSideInputIsAccessibleForDownstreamCallersDoFn extends DoFn<String, Iterable<String>> {
            public static final String USER_COUNTER_NAME = "userCountedElems";
            private final Counter countedElements;
            private final PCollectionView<Iterable<String>> iterableSideInput;

            private TestSideInputIsAccessibleForDownstreamCallersDoFn(PCollectionView<Iterable<String>> pCollectionView) {
                this.countedElements = Metrics.counter((Class<?>) TestSideInputIsAccessibleForDownstreamCallersDoFn.class, USER_COUNTER_NAME);
                this.iterableSideInput = pCollectionView;
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<String, Iterable<String>>.ProcessContext processContext) {
                this.countedElements.inc();
                processContext.output((Iterable) processContext.sideInput(this.iterableSideInput));
            }
        }

        /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$ExecutionTest$TestStatefulDoFn.class */
        private static class TestStatefulDoFn extends DoFn<KV<String, String>, String> {
            private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput");
            private static final TupleTag<String> additionalOutput = new TupleTag<>("output");

            @DoFn.StateId("value")
            private final StateSpec<ValueState<String>> valueStateSpec;

            @DoFn.StateId("bag")
            private final StateSpec<BagState<String>> bagStateSpec;

            @DoFn.StateId("combine")
            private final StateSpec<CombiningState<String, String, String>> combiningStateSpec;

            private TestStatefulDoFn() {
                this.valueStateSpec = StateSpecs.value(StringUtf8Coder.of());
                this.bagStateSpec = StateSpecs.bag(StringUtf8Coder.of());
                this.combiningStateSpec = StateSpecs.combining(StringUtf8Coder.of(), new ConcatCombineFn());
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, String>, String>.ProcessContext processContext, @DoFn.StateId("value") ValueState<String> valueState, @DoFn.StateId("bag") BagState<String> bagState, @DoFn.StateId("combine") CombiningState<String, String, String> combiningState) {
                processContext.output("value:" + valueState.read());
                valueState.write(processContext.element().getValue());
                processContext.output("bag:" + Iterables.toString(bagState.read()));
                bagState.add(processContext.element().getValue());
                processContext.output("combine:" + combiningState.read());
                combiningState.add(processContext.element().getValue());
            }
        }

        /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$ExecutionTest$TestTimerfulDoFn.class */
        private static class TestTimerfulDoFn extends DoFn<KV<String, String>, String> {

            @DoFn.StateId("bag")
            private final StateSpec<BagState<String>> bagStateSpec;

            @DoFn.TimerId("event")
            private final TimerSpec eventTimerSpec;

            @DoFn.TimerId("processing")
            private final TimerSpec processingTimerSpec;

            @DoFn.TimerFamily("event-family")
            private final TimerSpec eventTimerFamilySpec;

            @DoFn.TimerFamily("processing-family")
            private final TimerSpec processingTimerFamilySpec;

            private TestTimerfulDoFn() {
                this.bagStateSpec = StateSpecs.bag(StringUtf8Coder.of());
                this.eventTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
                this.processingTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
                this.eventTimerFamilySpec = TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
                this.processingTimerFamilySpec = TimerSpecs.timerMap(TimeDomain.PROCESSING_TIME);
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, String>, String>.ProcessContext processContext, @DoFn.StateId("bag") BagState<String> bagState, @DoFn.TimerId("event") Timer timer, @DoFn.TimerId("processing") Timer timer2, @DoFn.TimerFamily("event-family") TimerMap timerMap, @DoFn.TimerFamily("processing-family") TimerMap timerMap2) {
                processContext.output("main" + processContext.element().getKey() + Iterables.toString(bagState.read()));
                bagState.add(processContext.element().getValue());
                timer.withOutputTimestamp(processContext.timestamp()).set(processContext.timestamp().plus(1L));
                timer.clear();
                timer2.offset(Duration.millis(2L));
                timer2.setRelative();
                timer2.clear();
                timerMap.get("event-timer1").withOutputTimestamp(processContext.timestamp()).set(processContext.timestamp().plus(3L));
                timerMap2.get("processing-timer1").offset(Duration.millis(4L)).setRelative();
            }

            @DoFn.OnTimer("event")
            public void eventTimer(DoFn<KV<String, String>, String>.OnTimerContext onTimerContext, @DoFn.StateId("bag") BagState<String> bagState, @DoFn.TimerId("event") Timer timer, @DoFn.TimerId("processing") Timer timer2, @DoFn.TimerFamily("event-family") TimerMap timerMap, @DoFn.TimerFamily("processing-family") TimerMap timerMap2) {
                onTimerContext.output("event" + Iterables.toString(bagState.read()));
                bagState.add("event");
                timer.withOutputTimestamp(onTimerContext.timestamp()).set(onTimerContext.fireTimestamp().plus(11L));
                timer2.offset(Duration.millis(12L));
                timer2.setRelative();
                timerMap.get("event-timer1").withOutputTimestamp(onTimerContext.timestamp()).set(onTimerContext.fireTimestamp().plus(13L));
                timerMap2.get("processing-timer1").offset(Duration.millis(14L)).setRelative();
            }

            @DoFn.OnTimer("processing")
            public void processingTimer(DoFn<KV<String, String>, String>.OnTimerContext onTimerContext, @DoFn.StateId("bag") BagState<String> bagState, @DoFn.TimerId("event") Timer timer, @DoFn.TimerId("processing") Timer timer2, @DoFn.TimerFamily("event-family") TimerMap timerMap, @DoFn.TimerFamily("processing-family") TimerMap timerMap2) {
                onTimerContext.output("processing" + Iterables.toString(bagState.read()));
                bagState.add("processing");
                timer.withOutputTimestamp(onTimerContext.timestamp()).set(onTimerContext.timestamp().plus(21L));
                timer2.offset(Duration.millis(22L));
                timer2.setRelative();
                timerMap.get("event-timer1").withOutputTimestamp(onTimerContext.timestamp()).set(onTimerContext.timestamp().plus(23L));
                timerMap2.get("processing-timer1").offset(Duration.millis(24L)).setRelative();
            }

            @DoFn.OnTimerFamily("event-family")
            public void eventFamilyOnTimer(DoFn<KV<String, String>, String>.OnTimerContext onTimerContext, @DoFn.StateId("bag") BagState<String> bagState, @DoFn.TimerId("event") Timer timer, @DoFn.TimerId("processing") Timer timer2, @DoFn.TimerFamily("event-family") TimerMap timerMap, @DoFn.TimerFamily("processing-family") TimerMap timerMap2) {
                onTimerContext.output("event-family" + Iterables.toString(bagState.read()));
                bagState.add("event-family");
                timer.withOutputTimestamp(onTimerContext.timestamp()).set(onTimerContext.timestamp().plus(31L));
                timer2.offset(Duration.millis(32L));
                timer2.setRelative();
                timerMap.get("event-timer1").withOutputTimestamp(onTimerContext.timestamp()).set(onTimerContext.timestamp().plus(33L));
                timerMap2.get("processing-timer1").offset(Duration.millis(34L)).setRelative();
            }

            @DoFn.OnTimerFamily("processing-family")
            public void processingFamilyOnTimer(DoFn<KV<String, String>, String>.OnTimerContext onTimerContext, @DoFn.StateId("bag") BagState<String> bagState, @DoFn.TimerId("event") Timer timer, @DoFn.TimerId("processing") Timer timer2, @DoFn.TimerFamily("event-family") TimerMap timerMap, @DoFn.TimerFamily("processing-family") TimerMap timerMap2) {
                onTimerContext.output("processing-family" + Iterables.toString(bagState.read()));
                bagState.add("processing-family");
                timer.withOutputTimestamp(onTimerContext.timestamp()).set(onTimerContext.timestamp().plus(41L));
                timer2.offset(Duration.millis(42L));
                timer2.setRelative();
                timerMap.get("event-timer1").withOutputTimestamp(onTimerContext.timestamp()).set(onTimerContext.timestamp().plus(43L));
                timerMap2.get("processing-timer1").offset(Duration.millis(44L)).setRelative();
            }
        }

        /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$ExecutionTest$WindowObservingTestSplittableDoFn.class */
        static class WindowObservingTestSplittableDoFn extends NonWindowObservingTestSplittableDoFn {
            private final PCollectionView<String> singletonSideInput;
            private static final long PROCESSED_WINDOW = 1;
            private boolean splitAtTruncate;
            private long processedWindowCount;

            private WindowObservingTestSplittableDoFn(PCollectionView<String> pCollectionView) {
                super();
                this.splitAtTruncate = false;
                this.processedWindowCount = 0L;
                this.singletonSideInput = pCollectionView;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public static WindowObservingTestSplittableDoFn forSplitAtTruncate(PCollectionView<String> pCollectionView) {
                WindowObservingTestSplittableDoFn windowObservingTestSplittableDoFn = new WindowObservingTestSplittableDoFn(pCollectionView);
                windowObservingTestSplittableDoFn.splitAtTruncate = true;
                return windowObservingTestSplittableDoFn;
            }

            @Override // org.apache.beam.fn.harness.FnApiDoFnRunnerTest.ExecutionTest.NonWindowObservingTestSplittableDoFn
            @DoFn.ProcessElement
            public DoFn.ProcessContinuation processElement(DoFn<String, String>.ProcessContext processContext, RestrictionTracker<OffsetRange, Long> restrictionTracker, ManualWatermarkEstimator<Instant> manualWatermarkEstimator) throws Exception {
                boolean tryClaim;
                long parseLong = Long.parseLong((String) processContext.sideInput(this.singletonSideInput));
                long from = restrictionTracker.currentRestriction().getFrom();
                do {
                    tryClaim = restrictionTracker.tryClaim(Long.valueOf(from));
                    if (!tryClaim) {
                        break;
                    }
                    if (from == 3) {
                        enableAndWaitForTrySplitToHappen();
                    }
                    processContext.outputWithTimestamp(processContext.element() + ":" + from, GlobalWindow.TIMESTAMP_MIN_VALUE.plus(from));
                    manualWatermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(from));
                    from++;
                } while (from != parseLong);
                return !tryClaim ? DoFn.ProcessContinuation.stop() : DoFn.ProcessContinuation.resume().withResumeDelay(Duration.millis(54321L));
            }

            @Override // org.apache.beam.fn.harness.FnApiDoFnRunnerTest.ExecutionTest.NonWindowObservingTestSplittableDoFn
            @DoFn.TruncateRestriction
            public RestrictionTracker.TruncateResult<OffsetRange> truncateRestriction(@DoFn.Restriction OffsetRange offsetRange) throws Exception {
                if (this.splitAtTruncate && this.processedWindowCount == 1) {
                    enableAndWaitForTrySplitToHappen();
                }
                this.processedWindowCount++;
                return RestrictionTracker.TruncateResult.of(new OffsetRange(offsetRange.getFrom(), offsetRange.getTo() / 2));
            }
        }

        @Test
        public void testUsingUserState() throws Exception {
            Pipeline create = Pipeline.create();
            PCollection<?> pCollection = (PCollection) create.apply(Create.of(KV.of("unused", "unused"), new KV[0]));
            PCollection<?> pCollection2 = (PCollection) pCollection.apply(TEST_TRANSFORM_ID, ParDo.of(new TestStatefulDoFn()));
            SdkComponents create2 = SdkComponents.create(create.getOptions());
            RunnerApi.Pipeline proto = PipelineTranslation.toProto(create, create2);
            String registerPCollection = create2.registerPCollection(pCollection);
            String registerPCollection2 = create2.registerPCollection(pCollection2);
            RunnerApi.PTransform transformsOrThrow = proto.getComponents().getTransformsOrThrow(proto.getComponents().getTransformsOrThrow(TEST_TRANSFORM_ID).getSubtransforms(0));
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of(bagUserStateKey("value", "X"), encode("X0"), bagUserStateKey("bag", "X"), encode("X0"), bagUserStateKey("combine", "X"), encode("X0")));
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            Objects.requireNonNull(arrayList);
            pCollectionConsumerRegistry.register(registerPCollection2, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, StringUtf8Coder.of());
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create3 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = proto.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = proto.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = proto.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            factory.createRunnerForPTransform(create3, (BeamFnDataClient) null, fakeBeamFnStateClient, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry.getFunctions())).run();
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(registerPCollection, registerPCollection2));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(registerPCollection);
            multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("X", "X1")));
            multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("Y", "Y1")));
            multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("X", "X2")));
            multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("Y", "Y2")));
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow("value:X0"), WindowedValue.valueInGlobalWindow("bag:[X0]"), WindowedValue.valueInGlobalWindow("combine:X0"), WindowedValue.valueInGlobalWindow("value:null"), WindowedValue.valueInGlobalWindow("bag:[]"), WindowedValue.valueInGlobalWindow("combine:"), WindowedValue.valueInGlobalWindow("value:X1"), WindowedValue.valueInGlobalWindow("bag:[X0, X1]"), WindowedValue.valueInGlobalWindow("combine:X0X1"), WindowedValue.valueInGlobalWindow("value:Y1"), WindowedValue.valueInGlobalWindow("bag:[Y1]"), WindowedValue.valueInGlobalWindow("combine:Y1")));
            arrayList.clear();
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry2.getFunctions())).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            Assert.assertEquals(ImmutableMap.builder().put(bagUserStateKey("value", "X"), encode("X2")).put(bagUserStateKey("bag", "X"), encode("X0", "X1", "X2")).put(bagUserStateKey("combine", "X"), encode("X0X1X2")).put(bagUserStateKey("value", "Y"), encode("Y2")).put(bagUserStateKey("bag", "Y"), encode("Y1", "Y2")).put(bagUserStateKey("combine", "Y"), encode("Y1Y2")).build(), fakeBeamFnStateClient.getData());
        }

        private BeamFnApi.StateKey bagUserStateKey(String str, String str2) throws IOException {
            return BeamFnApi.StateKey.newBuilder().setBagUserState(BeamFnApi.StateKey.BagUserState.newBuilder().setTransformId(TEST_TRANSFORM_ID).setUserStateId(str).setKey(encode(str2)).setWindow(ByteString.copyFrom(CoderUtils.encodeToByteArray(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE)))).build();
        }

        @Test
        public void testProcessElementWithSideInputsAndOutputs() throws Exception {
            Pipeline create = Pipeline.create();
            ExperimentalOptions.addExperiment((ExperimentalOptions) create.getOptions().as(ExperimentalOptions.class), "beam_fn_api");
            ExperimentalOptions.addExperiment((ExperimentalOptions) create.getOptions().as(ExperimentalOptions.class), "use_runner_v2");
            PCollection<?> pCollection = (PCollection) create.apply(Create.of("unused", new String[0]));
            PCollectionView<?> pCollectionView = (PCollectionView) pCollection.apply(View.asSingleton().withDefaultValue("defaultSingletonValue"));
            PCollectionView<?> pCollectionView2 = (PCollectionView) pCollection.apply(View.asSingleton());
            PCollectionView<?> pCollectionView3 = (PCollectionView) pCollection.apply(View.asIterable());
            TupleTag<String> tupleTag = new TupleTag<String>("main") { // from class: org.apache.beam.fn.harness.FnApiDoFnRunnerTest.ExecutionTest.1
            };
            TupleTag<String> tupleTag2 = new TupleTag<String>("additional") { // from class: org.apache.beam.fn.harness.FnApiDoFnRunnerTest.ExecutionTest.2
            };
            PCollectionTuple pCollectionTuple = (PCollectionTuple) pCollection.apply(TEST_TRANSFORM_ID, ParDo.of(new TestSideInputDoFn(pCollectionView, pCollectionView2, pCollectionView3, tupleTag2)).withSideInputs(pCollectionView, pCollectionView2, pCollectionView3).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
            SdkComponents create2 = SdkComponents.create(create.getOptions());
            RunnerApi.Pipeline proto = PipelineTranslation.toProto(create, create2, true);
            String registerPCollection = create2.registerPCollection(pCollection);
            String registerPCollection2 = create2.registerPCollection(pCollectionTuple.get(tupleTag));
            String registerPCollection3 = create2.registerPCollection(pCollectionTuple.get(tupleTag2));
            RunnerApi.PTransform transformsOrThrow = proto.getComponents().getTransformsOrThrow(TEST_TRANSFORM_ID);
            ImmutableMap of = ImmutableMap.of(iterableSideInputKey(pCollectionView2.getTagInternal().getId()), encode("singletonValue"), iterableSideInputKey(pCollectionView3.getTagInternal().getId()), encode("iterableValue1", "iterableValue2", "iterableValue3"));
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(of, 1000);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            Objects.requireNonNull(arrayList);
            pCollectionConsumerRegistry.register(registerPCollection2, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, StringUtf8Coder.of());
            Objects.requireNonNull(arrayList2);
            pCollectionConsumerRegistry.register(registerPCollection3, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, StringUtf8Coder.of());
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList3 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create3 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = proto.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = proto.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = proto.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList3);
            factory.createRunnerForPTransform(create3, (BeamFnDataClient) null, fakeBeamFnStateClient, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry.getFunctions())).run();
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(registerPCollection, registerPCollection2, registerPCollection3));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(registerPCollection);
            multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow("X"));
            multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow("Y"));
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow("X:defaultSingletonValue"), WindowedValue.valueInGlobalWindow("X:singletonValue"), WindowedValue.valueInGlobalWindow("X:iterableValue1"), WindowedValue.valueInGlobalWindow("X:iterableValue2"), WindowedValue.valueInGlobalWindow("X:iterableValue3"), WindowedValue.valueInGlobalWindow("Y:defaultSingletonValue"), WindowedValue.valueInGlobalWindow("Y:singletonValue"), WindowedValue.valueInGlobalWindow("Y:iterableValue1"), WindowedValue.valueInGlobalWindow("Y:iterableValue2"), WindowedValue.valueInGlobalWindow("Y:iterableValue3")));
            MatcherAssert.assertThat(arrayList2, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow("X:additional"), WindowedValue.valueInGlobalWindow("Y:additional")));
            arrayList.clear();
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry2.getFunctions())).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList3)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            Assert.assertEquals(of, fakeBeamFnStateClient.getData());
        }

        @Test
        public void testProcessElementWithNonWindowObservingOptimization() throws Exception {
            Pipeline create = Pipeline.create();
            ExperimentalOptions.addExperiment((ExperimentalOptions) create.getOptions().as(ExperimentalOptions.class), "beam_fn_api");
            ExperimentalOptions.addExperiment((ExperimentalOptions) create.getOptions().as(ExperimentalOptions.class), "use_runner_v2");
            PCollection<?> pCollection = (PCollection) ((PCollection) create.apply(Create.of("unused", new String[0]))).apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L))));
            TupleTag<String> tupleTag = new TupleTag<String>("main") { // from class: org.apache.beam.fn.harness.FnApiDoFnRunnerTest.ExecutionTest.3
            };
            TupleTag<String> tupleTag2 = new TupleTag<String>("additional") { // from class: org.apache.beam.fn.harness.FnApiDoFnRunnerTest.ExecutionTest.4
            };
            PCollectionTuple pCollectionTuple = (PCollectionTuple) pCollection.apply(TEST_TRANSFORM_ID, ParDo.of(new TestNonWindowObservingDoFn(tupleTag2)).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
            SdkComponents create2 = SdkComponents.create(create.getOptions());
            RunnerApi.Pipeline proto = PipelineTranslation.toProto(create, create2, true);
            String registerPCollection = create2.registerPCollection(pCollection);
            String registerPCollection2 = create2.registerPCollection(pCollectionTuple.get(tupleTag));
            String registerPCollection3 = create2.registerPCollection(pCollectionTuple.get(tupleTag2));
            RunnerApi.PTransform transformsOrThrow = proto.getComponents().getTransformsOrThrow(TEST_TRANSFORM_ID);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            Objects.requireNonNull(arrayList);
            pCollectionConsumerRegistry.register(registerPCollection2, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, StringUtf8Coder.of());
            Objects.requireNonNull(arrayList2);
            pCollectionConsumerRegistry.register(registerPCollection3, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, StringUtf8Coder.of());
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList3 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create3 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = proto.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = proto.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = proto.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList3);
            factory.createRunnerForPTransform(create3, (BeamFnDataClient) null, (BeamFnStateClient) null, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry.getFunctions())).run();
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(registerPCollection, registerPCollection2, registerPCollection3));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(registerPCollection);
            multiplexingConsumer.accept(valueInWindows("X", new IntervalWindow(new Instant(0L), Duration.standardMinutes(1L)), new IntervalWindow(new Instant(10L), Duration.standardMinutes(1L))));
            multiplexingConsumer.accept(valueInWindows("Y", new IntervalWindow(new Instant(1000L), Duration.standardMinutes(1L)), new IntervalWindow(new Instant(1010L), Duration.standardMinutes(1L))));
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(valueInWindows("X:main", new IntervalWindow(new Instant(0L), Duration.standardMinutes(1L)), new IntervalWindow(new Instant(10L), Duration.standardMinutes(1L))), valueInWindows("Y:main", new IntervalWindow(new Instant(1000L), Duration.standardMinutes(1L)), new IntervalWindow(new Instant(1010L), Duration.standardMinutes(1L)))));
            MatcherAssert.assertThat(arrayList2, (Matcher<? super ArrayList>) Matchers.contains(valueInWindows("X:additional", new IntervalWindow(new Instant(0L), Duration.standardMinutes(1L)), new IntervalWindow(new Instant(10L), Duration.standardMinutes(1L))), valueInWindows("Y:additional", new IntervalWindow(new Instant(1000L), Duration.standardMinutes(1L)), new IntervalWindow(new Instant(1010L), Duration.standardMinutes(1L)))));
            arrayList.clear();
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry2.getFunctions())).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList3)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
        }

        @Test
        public void testSideInputIsAccessibleForDownstreamCallers() throws Exception {
            FixedWindows of = FixedWindows.of(Duration.millis(1L));
            IntervalWindow assignWindow = of.assignWindow(new Instant(1L));
            IntervalWindow assignWindow2 = of.assignWindow(new Instant(2L));
            ByteString copyFrom = ByteString.copyFrom(CoderUtils.encodeToByteArray(of.windowCoder(), assignWindow));
            ByteString copyFrom2 = ByteString.copyFrom(CoderUtils.encodeToByteArray(of.windowCoder(), assignWindow2));
            Pipeline create = Pipeline.create();
            ExperimentalOptions.addExperiment((ExperimentalOptions) create.getOptions().as(ExperimentalOptions.class), "beam_fn_api");
            ExperimentalOptions.addExperiment((ExperimentalOptions) create.getOptions().as(ExperimentalOptions.class), "use_runner_v2");
            PCollection<?> pCollection = (PCollection) ((PCollection) create.apply(Create.of("unused", new String[0]))).apply(Window.into(of));
            PCollectionView<?> pCollectionView = (PCollectionView) pCollection.apply(View.asIterable());
            PCollection<?> pCollection2 = (PCollection) pCollection.apply(TEST_TRANSFORM_ID, ParDo.of(new TestSideInputIsAccessibleForDownstreamCallersDoFn(pCollectionView)).withSideInputs(pCollectionView));
            SdkComponents create2 = SdkComponents.create(create.getOptions());
            RunnerApi.Pipeline proto = PipelineTranslation.toProto(create, create2, true);
            String registerPCollection = create2.registerPCollection(pCollection);
            String registerPCollection2 = create2.registerPCollection(pCollection2);
            RunnerApi.PTransform transformsOrThrow = proto.getComponents().getTransformsOrThrow(proto.getComponents().getTransformsOrThrow(TEST_TRANSFORM_ID).getSubtransforms(0));
            ImmutableMap of2 = ImmutableMap.of(iterableSideInputKey(pCollectionView.getTagInternal().getId(), copyFrom), encode("iterableValue1A", "iterableValue2A", "iterableValue3A"), iterableSideInputKey(pCollectionView.getTagInternal().getId(), copyFrom2), encode("iterableValue1B", "iterableValue2B", "iterableValue3B"));
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(of2, 1000);
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            String str = (String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values());
            Objects.requireNonNull(arrayList);
            pCollectionConsumerRegistry.register(str, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, IterableCoder.of(StringUtf8Coder.of()));
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create3 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = proto.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = proto.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = proto.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            factory.createRunnerForPTransform(create3, (BeamFnDataClient) null, fakeBeamFnStateClient, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry.getFunctions())).run();
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(registerPCollection, registerPCollection2));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(registerPCollection);
            multiplexingConsumer.accept(valueInWindows("X", assignWindow, new BoundedWindow[0]));
            multiplexingConsumer.accept(valueInWindows("Y", assignWindow2, new BoundedWindow[0]));
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.hasSize(2));
            MatcherAssert.assertThat((Iterable) ((WindowedValue) arrayList.get(0)).getValue(), (Matcher<? super Iterable>) Matchers.contains("iterableValue1A", "iterableValue2A", "iterableValue3A"));
            MatcherAssert.assertThat((Iterable) ((WindowedValue) arrayList.get(1)).getValue(), (Matcher<? super Iterable>) Matchers.contains("iterableValue1B", "iterableValue2B", "iterableValue3B"));
            arrayList.clear();
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry2.getFunctions())).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            Assert.assertEquals(of2, fakeBeamFnStateClient.getData());
        }

        public MetricUpdates.MetricUpdate create(String str, MetricName metricName, long j) {
            return MetricUpdates.MetricUpdate.create(MetricKey.create(str, metricName), Long.valueOf(j));
        }

        @Test
        @Ignore("https://issues.apache.org/jira/browse/BEAM-12230")
        public void testUsingMetrics() throws Exception {
            MetricsContainerStepMap metricsContainerStepMap = new MetricsContainerStepMap();
            Closeable scopedMetricsContainer = MetricsEnvironment.scopedMetricsContainer(metricsContainerStepMap.getUnboundContainer());
            FixedWindows of = FixedWindows.of(Duration.millis(1L));
            IntervalWindow assignWindow = of.assignWindow(new Instant(1L));
            IntervalWindow assignWindow2 = of.assignWindow(new Instant(2L));
            ByteString copyFrom = ByteString.copyFrom(CoderUtils.encodeToByteArray(of.windowCoder(), assignWindow));
            ByteString copyFrom2 = ByteString.copyFrom(CoderUtils.encodeToByteArray(of.windowCoder(), assignWindow2));
            Pipeline create = Pipeline.create();
            PCollection<?> pCollection = (PCollection) ((PCollection) create.apply(Create.of("unused", new String[0]))).apply(Window.into(of));
            PCollectionView<?> pCollectionView = (PCollectionView) pCollection.apply(View.asIterable());
            PCollection<?> pCollection2 = (PCollection) pCollection.apply(TEST_TRANSFORM_ID, ParDo.of(new TestSideInputIsAccessibleForDownstreamCallersDoFn(pCollectionView)).withSideInputs(pCollectionView));
            SdkComponents create2 = SdkComponents.create(create.getOptions());
            RunnerApi.Pipeline proto = PipelineTranslation.toProto(create, create2, true);
            String registerPCollection = create2.registerPCollection(pCollection);
            String registerPCollection2 = create2.registerPCollection(pCollection2);
            RunnerApi.PTransform transformsOrThrow = proto.getComponents().getTransformsOrThrow(proto.getComponents().getTransformsOrThrow(TEST_TRANSFORM_ID).getSubtransforms(0));
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of(iterableSideInputKey(pCollectionView.getTagInternal().getId(), copyFrom), encode("iterableValue1A", "iterableValue2A", "iterableValue3A"), iterableSideInputKey(pCollectionView.getTagInternal().getId(), copyFrom2), encode("iterableValue1B", "iterableValue2B", "iterableValue3B")));
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(metricsContainerStepMap, (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            String str = (String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values());
            Objects.requireNonNull(arrayList);
            pCollectionConsumerRegistry.register(str, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, IterableCoder.of(StringUtf8Coder.of()));
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create3 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = proto.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = proto.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = proto.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            factory.createRunnerForPTransform(create3, (BeamFnDataClient) null, fakeBeamFnStateClient, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry.getFunctions())).run();
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(registerPCollection, registerPCollection2));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(registerPCollection);
            multiplexingConsumer.accept(valueInWindows("X", assignWindow, new BoundedWindow[0]));
            multiplexingConsumer.accept(valueInWindows("Y", assignWindow2, new BoundedWindow[0]));
            arrayList.clear();
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry2.getFunctions())).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            MetricsEnvironment.getCurrentContainer();
            ArrayList arrayList3 = new ArrayList();
            SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder = new SimpleMonitoringInfoBuilder();
            simpleMonitoringInfoBuilder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
            simpleMonitoringInfoBuilder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "Window.Into()/Window.Assign.out");
            simpleMonitoringInfoBuilder.setInt64SumValue(2L);
            arrayList3.add(simpleMonitoringInfoBuilder.build());
            SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder2 = new SimpleMonitoringInfoBuilder();
            simpleMonitoringInfoBuilder2.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
            simpleMonitoringInfoBuilder2.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "pTransformId/ParMultiDo(TestSideInputIsAccessibleForDownstreamCallers).output");
            simpleMonitoringInfoBuilder2.setInt64SumValue(2L);
            arrayList3.add(simpleMonitoringInfoBuilder2.build());
            SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder3 = new SimpleMonitoringInfoBuilder();
            simpleMonitoringInfoBuilder3.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64).setLabel(MonitoringInfoConstants.Labels.NAMESPACE, TestSideInputIsAccessibleForDownstreamCallersDoFn.class.getName()).setLabel(MonitoringInfoConstants.Labels.NAME, TestSideInputIsAccessibleForDownstreamCallersDoFn.USER_COUNTER_NAME);
            simpleMonitoringInfoBuilder3.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, TEST_TRANSFORM_ID);
            simpleMonitoringInfoBuilder3.setInt64SumValue(2L);
            arrayList3.add(simpleMonitoringInfoBuilder3.build());
            SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder4 = new SimpleMonitoringInfoBuilder();
            simpleMonitoringInfoBuilder4.setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE);
            simpleMonitoringInfoBuilder4.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "Window.Into()/Window.Assign.out");
            simpleMonitoringInfoBuilder4.setInt64DistributionValue(DistributionData.create(4L, 2L, 2L, 2L));
            arrayList3.add(simpleMonitoringInfoBuilder4.build());
            SimpleMonitoringInfoBuilder simpleMonitoringInfoBuilder5 = new SimpleMonitoringInfoBuilder();
            simpleMonitoringInfoBuilder5.setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE);
            simpleMonitoringInfoBuilder5.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, "pTransformId/ParMultiDo(TestSideInputIsAccessibleForDownstreamCallers).output");
            simpleMonitoringInfoBuilder5.setInt64DistributionValue(DistributionData.create(10L, 2L, 5L, 5L));
            arrayList3.add(simpleMonitoringInfoBuilder5.build());
            scopedMetricsContainer.close();
            ArrayList arrayList4 = new ArrayList();
            Iterator<MetricsApi.MonitoringInfo> it = metricsContainerStepMap.getMonitoringInfos().iterator();
            while (it.hasNext()) {
                arrayList4.add(it.next());
            }
            MatcherAssert.assertThat(arrayList4, (Matcher<? super ArrayList>) Matchers.containsInAnyOrder(arrayList3.toArray()));
        }

        @Test
        public void testTimers() throws Exception {
            this.dateTimeProvider.setDateTimeFixed(10000L);
            Pipeline create = Pipeline.create();
            PCollection<?> pCollection = (PCollection) create.apply(Create.of(KV.of("unused", "unused"), new KV[0]));
            PCollection<?> pCollection2 = (PCollection) pCollection.apply(TEST_TRANSFORM_ID, ParDo.of(new TestTimerfulDoFn()));
            SdkComponents create2 = SdkComponents.create();
            create2.registerEnvironment(RunnerApi.Environment.getDefaultInstance());
            RunnerApi.Pipeline proto = PipelineTranslation.toProto(create, create2);
            String registerPCollection = create2.registerPCollection(pCollection);
            String registerPCollection2 = create2.registerPCollection(pCollection2);
            RunnerApi.PTransform build = proto.getComponents().getTransformsOrThrow(proto.getComponents().getTransformsOrThrow(TEST_TRANSFORM_ID).getSubtransforms(0)).toBuilder().build();
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of(bagUserStateKey("bag", "X"), encode("X0"), bagUserStateKey("bag", "A"), encode("A0"), bagUserStateKey("bag", "C"), encode("C0")));
            FakeBeamFnTimerClient fakeBeamFnTimerClient = new FakeBeamFnTimerClient();
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            Objects.requireNonNull(arrayList);
            pCollectionConsumerRegistry.register(registerPCollection2, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, StringUtf8Coder.of());
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create3 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = proto.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = proto.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = proto.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            factory.createRunnerForPTransform(create3, (BeamFnDataClient) null, fakeBeamFnStateClient, fakeBeamFnTimerClient, TEST_TRANSFORM_ID, build, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry.getFunctions())).run();
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(registerPCollection, registerPCollection2));
            LogicalEndpoint timer = LogicalEndpoint.timer("57L", TEST_TRANSFORM_ID, "ts-event");
            LogicalEndpoint timer2 = LogicalEndpoint.timer("57L", TEST_TRANSFORM_ID, "ts-processing");
            LogicalEndpoint timer3 = LogicalEndpoint.timer("57L", TEST_TRANSFORM_ID, "tfs-event-family");
            LogicalEndpoint timer4 = LogicalEndpoint.timer("57L", TEST_TRANSFORM_ID, "tfs-processing-family");
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(registerPCollection);
            multiplexingConsumer.accept(WindowedValue.timestampedValueInGlobalWindow(KV.of("X", "X1"), new Instant(1000L)));
            multiplexingConsumer.accept(WindowedValue.timestampedValueInGlobalWindow(KV.of("Y", "Y1"), new Instant(1100L)));
            multiplexingConsumer.accept(WindowedValue.timestampedValueInGlobalWindow(KV.of("X", "X2"), new Instant(1200L)));
            multiplexingConsumer.accept(WindowedValue.timestampedValueInGlobalWindow(KV.of("Y", "Y2"), new Instant(1300L)));
            fakeBeamFnTimerClient.sendTimer(timer, timerInGlobalWindow("A", new Instant(1400L), new Instant(2400L)));
            fakeBeamFnTimerClient.sendTimer(timer, timerInGlobalWindow("B", new Instant(1500L), new Instant(2500L)));
            fakeBeamFnTimerClient.sendTimer(timer, timerInGlobalWindow("A", new Instant(1600L), new Instant(2600L)));
            fakeBeamFnTimerClient.sendTimer(timer2, timerInGlobalWindow("X", new Instant(1700L), new Instant(2700L)));
            fakeBeamFnTimerClient.sendTimer(timer2, timerInGlobalWindow("C", new Instant(1800L), new Instant(2800L)));
            fakeBeamFnTimerClient.sendTimer(timer2, timerInGlobalWindow("B", new Instant(1900L), new Instant(2900L)));
            fakeBeamFnTimerClient.sendTimer(timer3, dynamicTimerInGlobalWindow("B", "event-timer2", new Instant(2000L), new Instant(3000L)));
            fakeBeamFnTimerClient.sendTimer(timer4, dynamicTimerInGlobalWindow("Y", "processing-timer2", new Instant(2100L), new Instant(3100L)));
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.timestampedValueInGlobalWindow("mainX[X0]", new Instant(1000L)), WindowedValue.timestampedValueInGlobalWindow("mainY[]", new Instant(1100L)), WindowedValue.timestampedValueInGlobalWindow("mainX[X0, X1]", new Instant(1200L)), WindowedValue.timestampedValueInGlobalWindow("mainY[Y1]", new Instant(1300L)), WindowedValue.timestampedValueInGlobalWindow("event[A0]", new Instant(1400L)), WindowedValue.timestampedValueInGlobalWindow("event[]", new Instant(1500L)), WindowedValue.timestampedValueInGlobalWindow("event[A0, event]", new Instant(1600L)), WindowedValue.timestampedValueInGlobalWindow("processing[X0, X1, X2]", new Instant(1700L)), WindowedValue.timestampedValueInGlobalWindow("processing[C0]", new Instant(1800L)), WindowedValue.timestampedValueInGlobalWindow("processing[event]", new Instant(1900L)), WindowedValue.timestampedValueInGlobalWindow("event-family[event, processing]", new Instant(2000L)), WindowedValue.timestampedValueInGlobalWindow("processing-family[Y1, Y2]", new Instant(2100L))));
            MatcherAssert.assertThat(fakeBeamFnTimerClient.getTimers(timer), (Matcher<? super List<org.apache.beam.runners.core.construction.Timer<?>>>) Matchers.contains(timerInGlobalWindow("X", new Instant(1000L), new Instant(1001L)), clearedTimerInGlobalWindow("X"), timerInGlobalWindow("Y", new Instant(1100L), new Instant(1101L)), clearedTimerInGlobalWindow("Y"), timerInGlobalWindow("X", new Instant(1200L), new Instant(1201L)), clearedTimerInGlobalWindow("X"), timerInGlobalWindow("Y", new Instant(1300L), new Instant(1301L)), clearedTimerInGlobalWindow("Y"), timerInGlobalWindow("A", new Instant(1400L), new Instant(2411L)), timerInGlobalWindow("B", new Instant(1500L), new Instant(2511L)), timerInGlobalWindow("A", new Instant(1600L), new Instant(2611L)), timerInGlobalWindow("X", new Instant(1700L), new Instant(1721L)), timerInGlobalWindow("C", new Instant(1800L), new Instant(1821L)), timerInGlobalWindow("B", new Instant(1900L), new Instant(1921L)), timerInGlobalWindow("B", new Instant(2000L), new Instant(2031L)), timerInGlobalWindow("Y", new Instant(2100L), new Instant(2141L))));
            MatcherAssert.assertThat(fakeBeamFnTimerClient.getTimers(timer2), (Matcher<? super List<org.apache.beam.runners.core.construction.Timer<?>>>) Matchers.contains(timerInGlobalWindow("X", new Instant(1000L), new Instant(10002L)), clearedTimerInGlobalWindow("X"), timerInGlobalWindow("Y", new Instant(1100L), new Instant(10002L)), clearedTimerInGlobalWindow("Y"), timerInGlobalWindow("X", new Instant(1200L), new Instant(10002L)), clearedTimerInGlobalWindow("X"), timerInGlobalWindow("Y", new Instant(1300L), new Instant(10002L)), clearedTimerInGlobalWindow("Y"), timerInGlobalWindow("A", new Instant(1400L), new Instant(10012L)), timerInGlobalWindow("B", new Instant(1500L), new Instant(10012L)), timerInGlobalWindow("A", new Instant(1600L), new Instant(10012L)), timerInGlobalWindow("X", new Instant(1700L), new Instant(10022L)), timerInGlobalWindow("C", new Instant(1800L), new Instant(10022L)), timerInGlobalWindow("B", new Instant(1900L), new Instant(10022L)), timerInGlobalWindow("B", new Instant(2000L), new Instant(10032L)), timerInGlobalWindow("Y", new Instant(2100L), new Instant(10042L))));
            MatcherAssert.assertThat(fakeBeamFnTimerClient.getTimers(timer3), (Matcher<? super List<org.apache.beam.runners.core.construction.Timer<?>>>) Matchers.contains(dynamicTimerInGlobalWindow("X", "event-timer1", new Instant(1000L), new Instant(1003L)), dynamicTimerInGlobalWindow("Y", "event-timer1", new Instant(1100L), new Instant(1103L)), dynamicTimerInGlobalWindow("X", "event-timer1", new Instant(1200L), new Instant(1203L)), dynamicTimerInGlobalWindow("Y", "event-timer1", new Instant(1300L), new Instant(1303L)), dynamicTimerInGlobalWindow("A", "event-timer1", new Instant(1400L), new Instant(2413L)), dynamicTimerInGlobalWindow("B", "event-timer1", new Instant(1500L), new Instant(2513L)), dynamicTimerInGlobalWindow("A", "event-timer1", new Instant(1600L), new Instant(2613L)), dynamicTimerInGlobalWindow("X", "event-timer1", new Instant(1700L), new Instant(1723L)), dynamicTimerInGlobalWindow("C", "event-timer1", new Instant(1800L), new Instant(1823L)), dynamicTimerInGlobalWindow("B", "event-timer1", new Instant(1900L), new Instant(1923L)), dynamicTimerInGlobalWindow("B", "event-timer1", new Instant(2000L), new Instant(2033L)), dynamicTimerInGlobalWindow("Y", "event-timer1", new Instant(2100L), new Instant(2143L))));
            MatcherAssert.assertThat(fakeBeamFnTimerClient.getTimers(timer4), (Matcher<? super List<org.apache.beam.runners.core.construction.Timer<?>>>) Matchers.contains(dynamicTimerInGlobalWindow("X", "processing-timer1", new Instant(1000L), new Instant(10004L)), dynamicTimerInGlobalWindow("Y", "processing-timer1", new Instant(1100L), new Instant(10004L)), dynamicTimerInGlobalWindow("X", "processing-timer1", new Instant(1200L), new Instant(10004L)), dynamicTimerInGlobalWindow("Y", "processing-timer1", new Instant(1300L), new Instant(10004L)), dynamicTimerInGlobalWindow("A", "processing-timer1", new Instant(1400L), new Instant(10014L)), dynamicTimerInGlobalWindow("B", "processing-timer1", new Instant(1500L), new Instant(10014L)), dynamicTimerInGlobalWindow("A", "processing-timer1", new Instant(1600L), new Instant(10014L)), dynamicTimerInGlobalWindow("X", "processing-timer1", new Instant(1700L), new Instant(10024L)), dynamicTimerInGlobalWindow("C", "processing-timer1", new Instant(1800L), new Instant(10024L)), dynamicTimerInGlobalWindow("B", "processing-timer1", new Instant(1900L), new Instant(10024L)), dynamicTimerInGlobalWindow("B", "processing-timer1", new Instant(2000L), new Instant(10034L)), dynamicTimerInGlobalWindow("Y", "processing-timer1", new Instant(2100L), new Instant(10044L))));
            arrayList.clear();
            Assert.assertFalse(fakeBeamFnTimerClient.isOutboundClosed(timer));
            Assert.assertFalse(fakeBeamFnTimerClient.isOutboundClosed(timer2));
            Assert.assertFalse(fakeBeamFnTimerClient.isOutboundClosed(timer3));
            Assert.assertFalse(fakeBeamFnTimerClient.isOutboundClosed(timer4));
            fakeBeamFnTimerClient.closeInbound(timer);
            fakeBeamFnTimerClient.closeInbound(timer2);
            fakeBeamFnTimerClient.closeInbound(timer3);
            fakeBeamFnTimerClient.closeInbound(timer4);
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry2.getFunctions())).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            Assert.assertTrue(fakeBeamFnTimerClient.isOutboundClosed(timer));
            Assert.assertTrue(fakeBeamFnTimerClient.isOutboundClosed(timer2));
            Assert.assertTrue(fakeBeamFnTimerClient.isOutboundClosed(timer3));
            Assert.assertTrue(fakeBeamFnTimerClient.isOutboundClosed(timer4));
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            Assert.assertEquals(ImmutableMap.builder().put(bagUserStateKey("bag", "X"), encode("X0", "X1", "X2", "processing")).put(bagUserStateKey("bag", "Y"), encode("Y1", "Y2", "processing-family")).put(bagUserStateKey("bag", "A"), encode("A0", "event", "event")).put(bagUserStateKey("bag", "B"), encode("event", "processing", "event-family")).put(bagUserStateKey("bag", "C"), encode("C0", "processing")).build(), fakeBeamFnStateClient.getData());
        }

        private <K> org.apache.beam.runners.core.construction.Timer<K> timerInGlobalWindow(K k, Instant instant, Instant instant2) {
            return dynamicTimerInGlobalWindow(k, "", instant, instant2);
        }

        private <K> org.apache.beam.runners.core.construction.Timer<K> clearedTimerInGlobalWindow(K k) {
            return org.apache.beam.runners.core.construction.Timer.cleared(k, "", Collections.singletonList(GlobalWindow.INSTANCE));
        }

        private <K> org.apache.beam.runners.core.construction.Timer<K> dynamicTimerInGlobalWindow(K k, String str, Instant instant, Instant instant2) {
            return org.apache.beam.runners.core.construction.Timer.of(k, str, Collections.singletonList(GlobalWindow.INSTANCE), instant2, instant, PaneInfo.NO_FIRING);
        }

        private <T> WindowedValue<T> valueInWindows(T t, BoundedWindow boundedWindow, BoundedWindow... boundedWindowArr) {
            return WindowedValue.of(t, boundedWindow.maxTimestamp(), ImmutableList.builder().add((ImmutableList.Builder) boundedWindow).add((Object[]) boundedWindowArr).build(), PaneInfo.NO_FIRING);
        }

        private BeamFnApi.StateKey iterableSideInputKey(String str) throws IOException {
            return iterableSideInputKey(str, ByteString.copyFrom(CoderUtils.encodeToByteArray(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE)));
        }

        private BeamFnApi.StateKey iterableSideInputKey(String str, ByteString byteString) {
            return BeamFnApi.StateKey.newBuilder().setIterableSideInput(BeamFnApi.StateKey.IterableSideInput.newBuilder().setTransformId(TEST_TRANSFORM_ID).setSideInputId(str).setWindow(byteString)).build();
        }

        private ByteString encode(String... strArr) throws IOException {
            ByteString.Output newOutput = ByteString.newOutput();
            for (String str : strArr) {
                StringUtf8Coder.of().encode(str, (OutputStream) newOutput);
            }
            return newOutput.toByteString();
        }

        @Test
        public void testRegistration() {
            Iterator it = ServiceLoader.load(PTransformRunnerFactory.Registrar.class).iterator();
            while (it.hasNext()) {
                PTransformRunnerFactory.Registrar registrar = (PTransformRunnerFactory.Registrar) it.next();
                if (registrar instanceof FnApiDoFnRunner.Registrar) {
                    MatcherAssert.assertThat(registrar.getPTransformRunnerFactories(), (Matcher<? super Map>) IsMapContaining.hasKey(PTransformTranslation.PAR_DO_TRANSFORM_URN));
                    return;
                }
            }
            Assert.fail("Expected registrar not found.");
        }

        @Test
        public void testProcessElementForSizedElementAndRestriction() throws Exception {
            Pipeline create = Pipeline.create();
            ExperimentalOptions.addExperiment((ExperimentalOptions) create.getOptions().as(ExperimentalOptions.class), "beam_fn_api");
            ExperimentalOptions.addExperiment((ExperimentalOptions) create.getOptions().as(ExperimentalOptions.class), "use_runner_v2");
            PCollection pCollection = (PCollection) create.apply(Create.of("unused", new String[0]));
            PCollectionView<?> pCollectionView = (PCollectionView) pCollection.apply(View.asSingleton());
            WindowObservingTestSplittableDoFn windowObservingTestSplittableDoFn = new WindowObservingTestSplittableDoFn(pCollectionView);
            pCollection.apply(TEST_TRANSFORM_ID, ParDo.of(windowObservingTestSplittableDoFn).withSideInputs(pCollectionView));
            RunnerApi.Pipeline updateTransform = ProtoOverrides.updateTransform(PTransformTranslation.PAR_DO_TRANSFORM_URN, PipelineTranslation.toProto(create, SdkComponents.create(create.getOptions()), true), SplittableParDoExpander.createSizedReplacement());
            RunnerApi.PTransform transformsOrThrow = updateTransform.getComponents().getTransformsOrThrow((String) ((Map.Entry) Iterables.find(updateTransform.getComponents().getTransformsMap().entrySet(), entry -> {
                return ((RunnerApi.PTransform) entry.getValue()).getSpec().getUrn().equals(PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN) && ((RunnerApi.PTransform) entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID);
            })).getKey());
            String inputsOrThrow = transformsOrThrow.getInputsOrThrow(ParDoTranslation.getMainInputName(transformsOrThrow));
            RunnerApi.PCollection pcollectionsOrThrow = updateTransform.getComponents().getPcollectionsOrThrow(inputsOrThrow);
            RehydratedComponents forComponents = RehydratedComponents.forComponents(updateTransform.getComponents());
            WindowedValue.FullWindowedValueCoder fullCoder = WindowedValue.getFullCoder(CoderTranslation.fromProto(updateTransform.getComponents().getCodersOrThrow(pcollectionsOrThrow.getCoderId()), forComponents, CoderTranslation.TranslationContext.DEFAULT), CoderTranslation.fromProto(updateTransform.getComponents().getCodersOrThrow(updateTransform.getComponents().getWindowingStrategiesOrThrow(pcollectionsOrThrow.getWindowingStrategyId()).getWindowCoderId()), forComponents, CoderTranslation.TranslationContext.DEFAULT));
            String outputsOrThrow = transformsOrThrow.getOutputsOrThrow("output");
            ImmutableMap of = ImmutableMap.of(iterableSideInputKey(pCollectionView.getTagInternal().getId(), ByteString.EMPTY), encode("8"));
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(of);
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            Objects.requireNonNull(arrayList);
            pCollectionConsumerRegistry.register(outputsOrThrow, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, StringUtf8Coder.of());
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            BundleSplitListener.InMemory create2 = BundleSplitListener.InMemory.create();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create3 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = updateTransform.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = updateTransform.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = updateTransform.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            Consumer consumer = (v1) -> {
                r15.add(v1);
            };
            Objects.requireNonNull(arrayList3);
            factory.createRunnerForPTransform(create3, (BeamFnDataClient) null, fakeBeamFnStateClient, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, consumer, (v1) -> {
                r16.add(v1);
            }, create2, (DoFn.BundleFinalizer) null);
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry.getFunctions())).run();
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(inputsOrThrow, outputsOrThrow));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(inputsOrThrow);
            MatcherAssert.assertThat(multiplexingConsumer, (Matcher<? super FnDataReceiver>) Matchers.instanceOf(HandlesSplits.class));
            MatcherAssert.assertThat(((PTransformRunnerFactory.ProgressRequestCallback) Iterables.getOnlyElement(arrayList3)).getMonitoringInfos(), (Matcher<? super List>) Matchers.empty());
            multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of(KV.of("5", KV.of(new OffsetRange(5L, 10L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(5.0d))));
            MatcherAssert.assertThat(((PTransformRunnerFactory.ProgressRequestCallback) Iterables.getOnlyElement(arrayList3)).getMonitoringInfos(), (Matcher<? super List>) Matchers.empty());
            BeamFnApi.BundleApplication bundleApplication = (BeamFnApi.BundleApplication) Iterables.getOnlyElement(create2.getPrimaryRoots());
            BeamFnApi.DelayedBundleApplication delayedBundleApplication = (BeamFnApi.DelayedBundleApplication) Iterables.getOnlyElement(create2.getResidualRoots());
            Assert.assertEquals(ParDoTranslation.getMainInputName(transformsOrThrow), bundleApplication.getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, bundleApplication.getTransformId());
            Assert.assertEquals(ParDoTranslation.getMainInputName(transformsOrThrow), delayedBundleApplication.getApplication().getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, delayedBundleApplication.getApplication().getTransformId());
            Assert.assertEquals(WindowedValue.valueInGlobalWindow(KV.of(KV.of("5", KV.of(new OffsetRange(5L, 8L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(3.0d))), fullCoder.decode(bundleApplication.getElement().newInput()));
            Assert.assertEquals(WindowedValue.valueInGlobalWindow(KV.of(KV.of("5", KV.of(new OffsetRange(8L, 10L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(7L))), Double.valueOf(2.0d))), fullCoder.decode(delayedBundleApplication.getApplication().getElement().newInput()));
            Instant plus = GlobalWindow.TIMESTAMP_MIN_VALUE.plus(7L);
            Assert.assertEquals(ImmutableMap.of("output", Timestamp.newBuilder().setSeconds(plus.getMillis() / 1000).setNanos(((int) (plus.getMillis() % 1000)) * 1000000).build()), delayedBundleApplication.getApplication().getOutputWatermarksMap());
            Assert.assertEquals(org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.Duration.newBuilder().setSeconds(54L).setNanos(321000000).build(), delayedBundleApplication.getRequestedTimeDelay());
            create2.clear();
            MatcherAssert.assertThat(((PTransformRunnerFactory.ProgressRequestCallback) Iterables.getOnlyElement(arrayList3)).getMonitoringInfos(), (Matcher<? super List>) Matchers.empty());
            multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of(KV.of("2", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(2.0d))));
            MatcherAssert.assertThat(((PTransformRunnerFactory.ProgressRequestCallback) Iterables.getOnlyElement(arrayList3)).getMonitoringInfos(), (Matcher<? super List>) Matchers.empty());
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.timestampedValueInGlobalWindow("5:5", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(5L)), WindowedValue.timestampedValueInGlobalWindow("5:6", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(6L)), WindowedValue.timestampedValueInGlobalWindow("5:7", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(7L)), WindowedValue.timestampedValueInGlobalWindow("2:0", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(0L)), WindowedValue.timestampedValueInGlobalWindow("2:1", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L))));
            Assert.assertTrue(create2.getPrimaryRoots().isEmpty());
            Assert.assertTrue(create2.getResidualRoots().isEmpty());
            arrayList.clear();
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            Future submit = newSingleThreadExecutor.submit(() -> {
                try {
                    windowObservingTestSplittableDoFn.waitForSplitElementToBeProcessed();
                    Assert.assertEquals(0.6d, ((HandlesSplits) multiplexingConsumer).getProgress(), 0.01d);
                    List monitoringInfos = ((PTransformRunnerFactory.ProgressRequestCallback) Iterables.getOnlyElement(arrayList3)).getMonitoringInfos();
                    MetricsApi.MonitoringInfo.Builder newBuilder = MetricsApi.MonitoringInfo.newBuilder();
                    newBuilder.setUrn(MonitoringInfoConstants.Urns.WORK_COMPLETED);
                    newBuilder.setType(MonitoringInfoConstants.TypeUrns.PROGRESS_TYPE);
                    newBuilder.putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, TEST_TRANSFORM_ID);
                    newBuilder.setPayload(ByteString.copyFrom(CoderUtils.encodeToByteArray(IterableCoder.of(DoubleCoder.of()), Collections.singletonList(Double.valueOf(3.0d)))));
                    MetricsApi.MonitoringInfo.Builder newBuilder2 = MetricsApi.MonitoringInfo.newBuilder();
                    newBuilder2.setUrn(MonitoringInfoConstants.Urns.WORK_REMAINING);
                    newBuilder2.setType(MonitoringInfoConstants.TypeUrns.PROGRESS_TYPE);
                    newBuilder2.putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, TEST_TRANSFORM_ID);
                    newBuilder2.setPayload(ByteString.copyFrom(CoderUtils.encodeToByteArray(IterableCoder.of(DoubleCoder.of()), Collections.singletonList(Double.valueOf(2.0d)))));
                    MatcherAssert.assertThat(monitoringInfos, (Matcher<? super List>) Matchers.containsInAnyOrder(newBuilder.build(), newBuilder2.build()));
                    HandlesSplits.SplitResult trySplit = ((HandlesSplits) multiplexingConsumer).trySplit(0.0d);
                    windowObservingTestSplittableDoFn.releaseWaitingProcessElementThread();
                    return trySplit;
                } catch (Throwable th) {
                    windowObservingTestSplittableDoFn.releaseWaitingProcessElementThread();
                    throw th;
                }
            });
            MatcherAssert.assertThat(((PTransformRunnerFactory.ProgressRequestCallback) Iterables.getOnlyElement(arrayList3)).getMonitoringInfos(), (Matcher<? super List>) Matchers.empty());
            multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of(KV.of("7", KV.of(new OffsetRange(0L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(2.0d))));
            HandlesSplits.SplitResult splitResult = (HandlesSplits.SplitResult) submit.get();
            MatcherAssert.assertThat(((PTransformRunnerFactory.ProgressRequestCallback) Iterables.getOnlyElement(arrayList3)).getMonitoringInfos(), (Matcher<? super List>) Matchers.empty());
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.timestampedValueInGlobalWindow("7:0", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(0L)), WindowedValue.timestampedValueInGlobalWindow("7:1", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L)), WindowedValue.timestampedValueInGlobalWindow("7:2", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2L)), WindowedValue.timestampedValueInGlobalWindow("7:3", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(3L))));
            BeamFnApi.BundleApplication bundleApplication2 = (BeamFnApi.BundleApplication) Iterables.getOnlyElement(splitResult.getPrimaryRoots());
            BeamFnApi.DelayedBundleApplication delayedBundleApplication2 = (BeamFnApi.DelayedBundleApplication) Iterables.getOnlyElement(splitResult.getResidualRoots());
            Assert.assertEquals(ParDoTranslation.getMainInputName(transformsOrThrow), bundleApplication2.getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, bundleApplication2.getTransformId());
            Assert.assertEquals(ParDoTranslation.getMainInputName(transformsOrThrow), delayedBundleApplication2.getApplication().getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, delayedBundleApplication2.getApplication().getTransformId());
            Assert.assertEquals(WindowedValue.valueInGlobalWindow(KV.of(KV.of("7", KV.of(new OffsetRange(0L, 4L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(4.0d))), fullCoder.decode(bundleApplication2.getElement().newInput()));
            Assert.assertEquals(WindowedValue.valueInGlobalWindow(KV.of(KV.of("7", KV.of(new OffsetRange(4L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2L))), Double.valueOf(1.0d))), fullCoder.decode(delayedBundleApplication2.getApplication().getElement().newInput()));
            Instant plus2 = GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2L);
            Assert.assertEquals(ImmutableMap.of("output", Timestamp.newBuilder().setSeconds(plus2.getMillis() / 1000).setNanos(((int) (plus2.getMillis() % 1000)) * 1000000).build()), delayedBundleApplication2.getApplication().getOutputWatermarksMap());
            Assert.assertEquals(delayedBundleApplication2.getRequestedTimeDelay().getDefaultInstanceForType(), delayedBundleApplication2.getRequestedTimeDelay());
            Assert.assertTrue(create2.getPrimaryRoots().isEmpty());
            Assert.assertTrue(create2.getResidualRoots().isEmpty());
            arrayList.clear();
            newSingleThreadExecutor.shutdown();
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry2.getFunctions())).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            Assert.assertEquals(of, fakeBeamFnStateClient.getData());
        }

        @Test
        public void testProcessElementForWindowedSizedElementAndRestriction() throws Exception {
            Pipeline create = Pipeline.create();
            ExperimentalOptions.addExperiment((ExperimentalOptions) create.getOptions().as(ExperimentalOptions.class), "beam_fn_api");
            ExperimentalOptions.addExperiment((ExperimentalOptions) create.getOptions().as(ExperimentalOptions.class), "use_runner_v2");
            PCollection pCollection = (PCollection) create.apply(Create.of("unused", new String[0]));
            PCollectionView<?> pCollectionView = (PCollectionView) pCollection.apply(View.asSingleton());
            WindowObservingTestSplittableDoFn windowObservingTestSplittableDoFn = new WindowObservingTestSplittableDoFn(pCollectionView);
            ((PCollection) pCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(1L))))).apply(TEST_TRANSFORM_ID, ParDo.of(windowObservingTestSplittableDoFn).withSideInputs(pCollectionView));
            RunnerApi.Pipeline updateTransform = ProtoOverrides.updateTransform(PTransformTranslation.PAR_DO_TRANSFORM_URN, PipelineTranslation.toProto(create, SdkComponents.create(create.getOptions()), true), SplittableParDoExpander.createSizedReplacement());
            RunnerApi.PTransform transformsOrThrow = updateTransform.getComponents().getTransformsOrThrow((String) ((Map.Entry) Iterables.find(updateTransform.getComponents().getTransformsMap().entrySet(), entry -> {
                return ((RunnerApi.PTransform) entry.getValue()).getSpec().getUrn().equals(PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN) && ((RunnerApi.PTransform) entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID);
            })).getKey());
            String inputsOrThrow = transformsOrThrow.getInputsOrThrow(ParDoTranslation.getMainInputName(transformsOrThrow));
            RunnerApi.PCollection pcollectionsOrThrow = updateTransform.getComponents().getPcollectionsOrThrow(inputsOrThrow);
            RehydratedComponents forComponents = RehydratedComponents.forComponents(updateTransform.getComponents());
            WindowedValue.FullWindowedValueCoder fullCoder = WindowedValue.getFullCoder(CoderTranslation.fromProto(updateTransform.getComponents().getCodersOrThrow(pcollectionsOrThrow.getCoderId()), forComponents, CoderTranslation.TranslationContext.DEFAULT), CoderTranslation.fromProto(updateTransform.getComponents().getCodersOrThrow(updateTransform.getComponents().getWindowingStrategiesOrThrow(pcollectionsOrThrow.getWindowingStrategyId()).getWindowCoderId()), forComponents, CoderTranslation.TranslationContext.DEFAULT));
            String outputsOrThrow = transformsOrThrow.getOutputsOrThrow("output");
            ImmutableMap of = ImmutableMap.of(iterableSideInputKey(pCollectionView.getTagInternal().getId(), ByteString.EMPTY), encode("8"));
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(of);
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            Objects.requireNonNull(arrayList);
            pCollectionConsumerRegistry.register(outputsOrThrow, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, StringUtf8Coder.of());
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            BundleSplitListener.InMemory create2 = BundleSplitListener.InMemory.create();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create3 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = updateTransform.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = updateTransform.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = updateTransform.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            Consumer consumer = (v1) -> {
                r15.add(v1);
            };
            Objects.requireNonNull(arrayList3);
            factory.createRunnerForPTransform(create3, (BeamFnDataClient) null, fakeBeamFnStateClient, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, consumer, (v1) -> {
                r16.add(v1);
            }, create2, (DoFn.BundleFinalizer) null);
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry.getFunctions())).run();
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(inputsOrThrow, outputsOrThrow));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(inputsOrThrow);
            MatcherAssert.assertThat(multiplexingConsumer, (Matcher<? super FnDataReceiver>) Matchers.instanceOf(HandlesSplits.class));
            IntervalWindow intervalWindow = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow intervalWindow2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            MatcherAssert.assertThat(((PTransformRunnerFactory.ProgressRequestCallback) Iterables.getOnlyElement(arrayList3)).getMonitoringInfos(), (Matcher<? super List>) Matchers.empty());
            WindowedValue valueInWindows = valueInWindows(KV.of(KV.of("5", KV.of(new OffsetRange(5L, 10L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L))), Double.valueOf(5.0d)), intervalWindow, intervalWindow2);
            multiplexingConsumer.accept(valueInWindows);
            MatcherAssert.assertThat(((PTransformRunnerFactory.ProgressRequestCallback) Iterables.getOnlyElement(arrayList3)).getMonitoringInfos(), (Matcher<? super List>) Matchers.empty());
            BeamFnApi.BundleApplication bundleApplication = (BeamFnApi.BundleApplication) Iterables.getOnlyElement(create2.getPrimaryRoots());
            Assert.assertEquals(2L, create2.getResidualRoots().size());
            BeamFnApi.DelayedBundleApplication delayedBundleApplication = (BeamFnApi.DelayedBundleApplication) create2.getResidualRoots().get(1);
            BeamFnApi.DelayedBundleApplication delayedBundleApplication2 = (BeamFnApi.DelayedBundleApplication) create2.getResidualRoots().get(0);
            Assert.assertEquals(ParDoTranslation.getMainInputName(transformsOrThrow), bundleApplication.getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, bundleApplication.getTransformId());
            Assert.assertEquals(ParDoTranslation.getMainInputName(transformsOrThrow), delayedBundleApplication.getApplication().getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, delayedBundleApplication.getApplication().getTransformId());
            Instant plus = GlobalWindow.TIMESTAMP_MIN_VALUE.plus(7L);
            ImmutableMap of2 = ImmutableMap.of("output", Timestamp.newBuilder().setSeconds(plus.getMillis() / 1000).setNanos(((int) (plus.getMillis() % 1000)) * 1000000).build());
            Instant plus2 = GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L);
            ImmutableMap of3 = ImmutableMap.of("output", Timestamp.newBuilder().setSeconds(plus2.getMillis() / 1000).setNanos(((int) (plus2.getMillis() % 1000)) * 1000000).build());
            Assert.assertEquals(of2, delayedBundleApplication.getApplication().getOutputWatermarksMap());
            Assert.assertEquals(org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.Duration.newBuilder().setSeconds(54L).setNanos(321000000).build(), delayedBundleApplication.getRequestedTimeDelay());
            Assert.assertEquals(ParDoTranslation.getMainInputName(transformsOrThrow), delayedBundleApplication2.getApplication().getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, delayedBundleApplication2.getApplication().getTransformId());
            Assert.assertEquals(delayedBundleApplication2.getRequestedTimeDelay().getDefaultInstanceForType(), delayedBundleApplication2.getRequestedTimeDelay());
            Assert.assertEquals(of3, delayedBundleApplication2.getApplication().getOutputWatermarksMap());
            Assert.assertEquals(decode(fullCoder, bundleApplication.getElement()), WindowedValue.of(KV.of(KV.of("5", KV.of(new OffsetRange(5L, 8L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L))), Double.valueOf(3.0d)), valueInWindows.getTimestamp(), intervalWindow, valueInWindows.getPane()));
            Assert.assertEquals(decode(fullCoder, delayedBundleApplication.getApplication().getElement()), WindowedValue.of(KV.of(KV.of("5", KV.of(new OffsetRange(8L, 10L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(7L))), Double.valueOf(2.0d)), valueInWindows.getTimestamp(), intervalWindow, valueInWindows.getPane()));
            Assert.assertEquals(decode(fullCoder, delayedBundleApplication2.getApplication().getElement()), WindowedValue.of(KV.of(KV.of("5", KV.of(new OffsetRange(5L, 10L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L))), Double.valueOf(5.0d)), valueInWindows.getTimestamp(), intervalWindow2, valueInWindows.getPane()));
            create2.clear();
            MatcherAssert.assertThat(((PTransformRunnerFactory.ProgressRequestCallback) Iterables.getOnlyElement(arrayList3)).getMonitoringInfos(), (Matcher<? super List>) Matchers.empty());
            multiplexingConsumer.accept(valueInWindows(KV.of(KV.of("2", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L))), Double.valueOf(2.0d)), intervalWindow, intervalWindow2));
            MatcherAssert.assertThat(((PTransformRunnerFactory.ProgressRequestCallback) Iterables.getOnlyElement(arrayList3)).getMonitoringInfos(), (Matcher<? super List>) Matchers.empty());
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.of("5:5", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(5L), intervalWindow, valueInWindows.getPane()), WindowedValue.of("5:6", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(6L), intervalWindow, valueInWindows.getPane()), WindowedValue.of("5:7", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(7L), intervalWindow, valueInWindows.getPane()), WindowedValue.of("2:0", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(0L), intervalWindow, valueInWindows.getPane()), WindowedValue.of("2:1", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L), intervalWindow, valueInWindows.getPane()), WindowedValue.of("2:0", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(0L), intervalWindow2, valueInWindows.getPane()), WindowedValue.of("2:1", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L), intervalWindow2, valueInWindows.getPane())));
            Assert.assertTrue(create2.getPrimaryRoots().isEmpty());
            Assert.assertTrue(create2.getResidualRoots().isEmpty());
            arrayList.clear();
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            Future submit = newSingleThreadExecutor.submit(() -> {
                try {
                    windowObservingTestSplittableDoFn.waitForSplitElementToBeProcessed();
                    Assert.assertEquals(0.3d, ((HandlesSplits) multiplexingConsumer).getProgress(), 0.01d);
                    List monitoringInfos = ((PTransformRunnerFactory.ProgressRequestCallback) Iterables.getOnlyElement(arrayList3)).getMonitoringInfos();
                    MetricsApi.MonitoringInfo.Builder newBuilder = MetricsApi.MonitoringInfo.newBuilder();
                    newBuilder.setUrn(MonitoringInfoConstants.Urns.WORK_COMPLETED);
                    newBuilder.setType(MonitoringInfoConstants.TypeUrns.PROGRESS_TYPE);
                    newBuilder.putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, TEST_TRANSFORM_ID);
                    newBuilder.setPayload(ByteString.copyFrom(CoderUtils.encodeToByteArray(IterableCoder.of(DoubleCoder.of()), Collections.singletonList(Double.valueOf(3.0d)))));
                    MetricsApi.MonitoringInfo.Builder newBuilder2 = MetricsApi.MonitoringInfo.newBuilder();
                    newBuilder2.setUrn(MonitoringInfoConstants.Urns.WORK_REMAINING);
                    newBuilder2.setType(MonitoringInfoConstants.TypeUrns.PROGRESS_TYPE);
                    newBuilder2.putLabels(MonitoringInfoConstants.Labels.PTRANSFORM, TEST_TRANSFORM_ID);
                    newBuilder2.setPayload(ByteString.copyFrom(CoderUtils.encodeToByteArray(IterableCoder.of(DoubleCoder.of()), Collections.singletonList(Double.valueOf(7.0d)))));
                    MatcherAssert.assertThat(monitoringInfos, (Matcher<? super List>) Matchers.containsInAnyOrder(newBuilder.build(), newBuilder2.build()));
                    HandlesSplits.SplitResult trySplit = ((HandlesSplits) multiplexingConsumer).trySplit(0.0d);
                    windowObservingTestSplittableDoFn.releaseWaitingProcessElementThread();
                    return trySplit;
                } catch (Throwable th) {
                    windowObservingTestSplittableDoFn.releaseWaitingProcessElementThread();
                    throw th;
                }
            });
            MatcherAssert.assertThat(((PTransformRunnerFactory.ProgressRequestCallback) Iterables.getOnlyElement(arrayList3)).getMonitoringInfos(), (Matcher<? super List>) Matchers.empty());
            WindowedValue valueInWindows2 = valueInWindows(KV.of(KV.of("7", KV.of(new OffsetRange(0L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L))), Double.valueOf(2.0d)), intervalWindow, intervalWindow2);
            multiplexingConsumer.accept(valueInWindows2);
            HandlesSplits.SplitResult splitResult = (HandlesSplits.SplitResult) submit.get();
            MatcherAssert.assertThat(((PTransformRunnerFactory.ProgressRequestCallback) Iterables.getOnlyElement(arrayList3)).getMonitoringInfos(), (Matcher<? super List>) Matchers.empty());
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.of("7:0", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(0L), intervalWindow, valueInWindows2.getPane()), WindowedValue.of("7:1", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L), intervalWindow, valueInWindows2.getPane()), WindowedValue.of("7:2", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2L), intervalWindow, valueInWindows2.getPane()), WindowedValue.of("7:3", GlobalWindow.TIMESTAMP_MIN_VALUE.plus(3L), intervalWindow, valueInWindows2.getPane())));
            BeamFnApi.BundleApplication bundleApplication2 = (BeamFnApi.BundleApplication) Iterables.getOnlyElement(splitResult.getPrimaryRoots());
            Assert.assertEquals(2L, splitResult.getResidualRoots().size());
            BeamFnApi.DelayedBundleApplication delayedBundleApplication3 = (BeamFnApi.DelayedBundleApplication) splitResult.getResidualRoots().get(1);
            BeamFnApi.DelayedBundleApplication delayedBundleApplication4 = (BeamFnApi.DelayedBundleApplication) splitResult.getResidualRoots().get(0);
            Assert.assertEquals(ParDoTranslation.getMainInputName(transformsOrThrow), bundleApplication2.getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, bundleApplication2.getTransformId());
            Assert.assertEquals(ParDoTranslation.getMainInputName(transformsOrThrow), delayedBundleApplication3.getApplication().getInputId());
            Assert.assertEquals(TEST_TRANSFORM_ID, delayedBundleApplication3.getApplication().getTransformId());
            Assert.assertEquals(TEST_TRANSFORM_ID, delayedBundleApplication4.getApplication().getTransformId());
            Assert.assertEquals(delayedBundleApplication4.getRequestedTimeDelay().getDefaultInstanceForType(), delayedBundleApplication4.getRequestedTimeDelay());
            Instant plus3 = GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L);
            Instant plus4 = GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2L);
            ImmutableMap of4 = ImmutableMap.of("output", Timestamp.newBuilder().setSeconds(plus3.getMillis() / 1000).setNanos(((int) (plus3.getMillis() % 1000)) * 1000000).build());
            ImmutableMap of5 = ImmutableMap.of("output", Timestamp.newBuilder().setSeconds(plus4.getMillis() / 1000).setNanos(((int) (plus4.getMillis() % 1000)) * 1000000).build());
            Assert.assertEquals(of4, delayedBundleApplication4.getApplication().getOutputWatermarksMap());
            Assert.assertEquals(valueInWindows(KV.of(KV.of("7", KV.of(new OffsetRange(0L, 4L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L))), Double.valueOf(4.0d)), intervalWindow, new BoundedWindow[0]), fullCoder.decode(bundleApplication2.getElement().newInput()));
            Assert.assertEquals(valueInWindows(KV.of(KV.of("7", KV.of(new OffsetRange(4L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2L))), Double.valueOf(1.0d)), intervalWindow, new BoundedWindow[0]), fullCoder.decode(delayedBundleApplication3.getApplication().getElement().newInput()));
            Assert.assertEquals(of5, delayedBundleApplication3.getApplication().getOutputWatermarksMap());
            Assert.assertEquals(WindowedValue.of(KV.of(KV.of("7", KV.of(new OffsetRange(0L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L))), Double.valueOf(5.0d)), valueInWindows2.getTimestamp(), intervalWindow2, valueInWindows2.getPane()), fullCoder.decode(delayedBundleApplication4.getApplication().getElement().newInput()));
            Assert.assertEquals(delayedBundleApplication3.getRequestedTimeDelay().getDefaultInstanceForType(), delayedBundleApplication3.getRequestedTimeDelay());
            Assert.assertTrue(create2.getPrimaryRoots().isEmpty());
            Assert.assertTrue(create2.getResidualRoots().isEmpty());
            arrayList.clear();
            newSingleThreadExecutor.shutdown();
            ((ThrowingRunnable) Iterables.getOnlyElement(pTransformFunctionRegistry2.getFunctions())).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            Assert.assertEquals(of, fakeBeamFnStateClient.getData());
        }

        private static <T> T decode(Coder<T> coder, ByteString byteString) {
            try {
                return coder.decode(byteString.newInput());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Test
        public void testProcessElementForPairWithRestriction() throws Exception {
            Pipeline create = Pipeline.create();
            PCollection pCollection = (PCollection) create.apply(Create.of("unused", new String[0]));
            PCollectionView<?> pCollectionView = (PCollectionView) pCollection.apply(View.asSingleton());
            pCollection.apply(TEST_TRANSFORM_ID, ParDo.of(new WindowObservingTestSplittableDoFn(pCollectionView)).withSideInputs(pCollectionView));
            RunnerApi.Pipeline updateTransform = ProtoOverrides.updateTransform(PTransformTranslation.PAR_DO_TRANSFORM_URN, PipelineTranslation.toProto(create, SdkComponents.create(create.getOptions()), true), SplittableParDoExpander.createSizedReplacement());
            RunnerApi.PTransform transformsOrThrow = updateTransform.getComponents().getTransformsOrThrow((String) ((Map.Entry) Iterables.find(updateTransform.getComponents().getTransformsMap().entrySet(), entry -> {
                return ((RunnerApi.PTransform) entry.getValue()).getSpec().getUrn().equals(PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN) && ((RunnerApi.PTransform) entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID);
            })).getKey());
            String inputsOrThrow = transformsOrThrow.getInputsOrThrow(ParDoTranslation.getMainInputName(transformsOrThrow));
            String str = (String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values());
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of());
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            Objects.requireNonNull(arrayList);
            pCollectionConsumerRegistry.register(str, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, KvCoder.of(StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())));
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create2 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = updateTransform.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = updateTransform.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = updateTransform.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            factory.createRunnerForPTransform(create2, (BeamFnDataClient) null, fakeBeamFnStateClient, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            Assert.assertTrue(pTransformFunctionRegistry.getFunctions().isEmpty());
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(inputsOrThrow, str));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(inputsOrThrow);
            multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow("5"));
            multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow("2"));
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow(KV.of("5", KV.of(new OffsetRange(0L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L)))), WindowedValue.valueInGlobalWindow(KV.of("2", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L))))));
            arrayList.clear();
            Assert.assertTrue(pTransformFunctionRegistry2.getFunctions().isEmpty());
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
        }

        @Test
        public void testProcessElementForWindowedPairWithRestriction() throws Exception {
            Pipeline create = Pipeline.create();
            PCollection pCollection = (PCollection) create.apply(Create.of("unused", new String[0]));
            PCollectionView<?> pCollectionView = (PCollectionView) pCollection.apply(View.asSingleton());
            ((PCollection) pCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(1L))))).apply(TEST_TRANSFORM_ID, ParDo.of(new WindowObservingTestSplittableDoFn(pCollectionView)).withSideInputs(pCollectionView));
            RunnerApi.Pipeline updateTransform = ProtoOverrides.updateTransform(PTransformTranslation.PAR_DO_TRANSFORM_URN, PipelineTranslation.toProto(create, SdkComponents.create(create.getOptions()), true), SplittableParDoExpander.createSizedReplacement());
            RunnerApi.PTransform transformsOrThrow = updateTransform.getComponents().getTransformsOrThrow((String) ((Map.Entry) Iterables.find(updateTransform.getComponents().getTransformsMap().entrySet(), entry -> {
                return ((RunnerApi.PTransform) entry.getValue()).getSpec().getUrn().equals(PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN) && ((RunnerApi.PTransform) entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID);
            })).getKey());
            String inputsOrThrow = transformsOrThrow.getInputsOrThrow(ParDoTranslation.getMainInputName(transformsOrThrow));
            String str = (String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values());
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of());
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            Objects.requireNonNull(arrayList);
            pCollectionConsumerRegistry.register(str, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, KvCoder.of(StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())));
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create2 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = updateTransform.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = updateTransform.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = updateTransform.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            factory.createRunnerForPTransform(create2, (BeamFnDataClient) null, fakeBeamFnStateClient, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            Assert.assertTrue(pTransformFunctionRegistry.getFunctions().isEmpty());
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(inputsOrThrow, str));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(inputsOrThrow);
            IntervalWindow intervalWindow = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow intervalWindow2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            WindowedValue valueInWindows = valueInWindows("5", intervalWindow, intervalWindow2);
            WindowedValue valueInWindows2 = valueInWindows("2", intervalWindow, intervalWindow2);
            multiplexingConsumer.accept(valueInWindows);
            multiplexingConsumer.accept(valueInWindows2);
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.of(KV.of("5", KV.of(new OffsetRange(0L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L))), valueInWindows.getTimestamp(), intervalWindow, valueInWindows.getPane()), WindowedValue.of(KV.of("5", KV.of(new OffsetRange(0L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L))), valueInWindows.getTimestamp(), intervalWindow2, valueInWindows.getPane()), WindowedValue.of(KV.of("2", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L))), valueInWindows2.getTimestamp(), intervalWindow, valueInWindows2.getPane()), WindowedValue.of(KV.of("2", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L))), valueInWindows2.getTimestamp(), intervalWindow2, valueInWindows2.getPane())));
            arrayList.clear();
            Assert.assertTrue(pTransformFunctionRegistry2.getFunctions().isEmpty());
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
        }

        @Test
        public void testProcessElementForWindowedPairWithRestrictionWithNonWindowObservingOptimization() throws Exception {
            Pipeline create = Pipeline.create();
            PCollection pCollection = (PCollection) create.apply(Create.of("unused", new String[0]));
            ((PCollection) pCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(1L))))).apply(TEST_TRANSFORM_ID, ParDo.of(new NonWindowObservingTestSplittableDoFn()));
            RunnerApi.Pipeline updateTransform = ProtoOverrides.updateTransform(PTransformTranslation.PAR_DO_TRANSFORM_URN, PipelineTranslation.toProto(create, SdkComponents.create(create.getOptions()), true), SplittableParDoExpander.createSizedReplacement());
            RunnerApi.PTransform transformsOrThrow = updateTransform.getComponents().getTransformsOrThrow((String) ((Map.Entry) Iterables.find(updateTransform.getComponents().getTransformsMap().entrySet(), entry -> {
                return ((RunnerApi.PTransform) entry.getValue()).getSpec().getUrn().equals(PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN) && ((RunnerApi.PTransform) entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID);
            })).getKey());
            String inputsOrThrow = transformsOrThrow.getInputsOrThrow(ParDoTranslation.getMainInputName(transformsOrThrow));
            String str = (String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values());
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of());
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            Objects.requireNonNull(arrayList);
            pCollectionConsumerRegistry.register(str, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, KvCoder.of(StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())));
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create2 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = updateTransform.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = updateTransform.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = updateTransform.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            factory.createRunnerForPTransform(create2, (BeamFnDataClient) null, fakeBeamFnStateClient, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            Assert.assertTrue(pTransformFunctionRegistry.getFunctions().isEmpty());
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(inputsOrThrow, str));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(inputsOrThrow);
            IntervalWindow intervalWindow = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow intervalWindow2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            WindowedValue valueInWindows = valueInWindows("5", intervalWindow, intervalWindow2);
            WindowedValue valueInWindows2 = valueInWindows("2", intervalWindow, intervalWindow2);
            multiplexingConsumer.accept(valueInWindows);
            multiplexingConsumer.accept(valueInWindows2);
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.of(KV.of("5", KV.of(new OffsetRange(0L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L))), valueInWindows.getTimestamp(), ImmutableList.of(intervalWindow, intervalWindow2), valueInWindows.getPane()), WindowedValue.of(KV.of("2", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1L))), valueInWindows2.getTimestamp(), ImmutableList.of(intervalWindow, intervalWindow2), valueInWindows2.getPane())));
            arrayList.clear();
            Assert.assertTrue(pTransformFunctionRegistry2.getFunctions().isEmpty());
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
        }

        @Test
        public void testProcessElementForSplitAndSizeRestriction() throws Exception {
            Pipeline create = Pipeline.create();
            PCollection pCollection = (PCollection) create.apply(Create.of("unused", new String[0]));
            PCollectionView<?> pCollectionView = (PCollectionView) pCollection.apply(View.asSingleton());
            pCollection.apply(TEST_TRANSFORM_ID, ParDo.of(new WindowObservingTestSplittableDoFn(pCollectionView)).withSideInputs(pCollectionView));
            RunnerApi.Pipeline updateTransform = ProtoOverrides.updateTransform(PTransformTranslation.PAR_DO_TRANSFORM_URN, PipelineTranslation.toProto(create, SdkComponents.create(create.getOptions()), true), SplittableParDoExpander.createSizedReplacement());
            RunnerApi.PTransform transformsOrThrow = updateTransform.getComponents().getTransformsOrThrow((String) ((Map.Entry) Iterables.find(updateTransform.getComponents().getTransformsMap().entrySet(), entry -> {
                return ((RunnerApi.PTransform) entry.getValue()).getSpec().getUrn().equals(PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN) && ((RunnerApi.PTransform) entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID);
            })).getKey());
            String inputsOrThrow = transformsOrThrow.getInputsOrThrow(ParDoTranslation.getMainInputName(transformsOrThrow));
            String str = (String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values());
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of());
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            KvCoder of = KvCoder.of(KvCoder.of(StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())), DoubleCoder.of());
            Objects.requireNonNull(arrayList);
            pCollectionConsumerRegistry.register(str, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, of);
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create2 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = updateTransform.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = updateTransform.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = updateTransform.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            factory.createRunnerForPTransform(create2, (BeamFnDataClient) null, fakeBeamFnStateClient, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            Assert.assertTrue(pTransformFunctionRegistry.getFunctions().isEmpty());
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(inputsOrThrow, str));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(inputsOrThrow);
            multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("5", KV.of(new OffsetRange(0L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE))));
            multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of("2", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE))));
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow(KV.of(KV.of("5", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(2.0d))), WindowedValue.valueInGlobalWindow(KV.of(KV.of("5", KV.of(new OffsetRange(2L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(3.0d))), WindowedValue.valueInGlobalWindow(KV.of(KV.of("2", KV.of(new OffsetRange(0L, 1L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(1.0d))), WindowedValue.valueInGlobalWindow(KV.of(KV.of("2", KV.of(new OffsetRange(1L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(1.0d)))));
            arrayList.clear();
            Assert.assertTrue(pTransformFunctionRegistry2.getFunctions().isEmpty());
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
        }

        @Test
        public void testProcessElementForWindowedSplitAndSizeRestriction() throws Exception {
            Pipeline create = Pipeline.create();
            PCollection pCollection = (PCollection) create.apply(Create.of("unused", new String[0]));
            PCollectionView<?> pCollectionView = (PCollectionView) pCollection.apply(View.asSingleton());
            ((PCollection) pCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(1L))))).apply(TEST_TRANSFORM_ID, ParDo.of(new WindowObservingTestSplittableDoFn(pCollectionView)).withSideInputs(pCollectionView));
            RunnerApi.Pipeline updateTransform = ProtoOverrides.updateTransform(PTransformTranslation.PAR_DO_TRANSFORM_URN, PipelineTranslation.toProto(create, SdkComponents.create(create.getOptions()), true), SplittableParDoExpander.createSizedReplacement());
            RunnerApi.PTransform transformsOrThrow = updateTransform.getComponents().getTransformsOrThrow((String) ((Map.Entry) Iterables.find(updateTransform.getComponents().getTransformsMap().entrySet(), entry -> {
                return ((RunnerApi.PTransform) entry.getValue()).getSpec().getUrn().equals(PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN) && ((RunnerApi.PTransform) entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID);
            })).getKey());
            String inputsOrThrow = transformsOrThrow.getInputsOrThrow(ParDoTranslation.getMainInputName(transformsOrThrow));
            String str = (String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values());
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of());
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            KvCoder of = KvCoder.of(KvCoder.of(StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())), DoubleCoder.of());
            Objects.requireNonNull(arrayList);
            pCollectionConsumerRegistry.register(str, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, of);
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create2 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = updateTransform.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = updateTransform.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = updateTransform.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            factory.createRunnerForPTransform(create2, (BeamFnDataClient) null, fakeBeamFnStateClient, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            Assert.assertTrue(pTransformFunctionRegistry.getFunctions().isEmpty());
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(inputsOrThrow, str));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(inputsOrThrow);
            IntervalWindow intervalWindow = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow intervalWindow2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            WindowedValue valueInWindows = valueInWindows(KV.of("5", KV.of(new OffsetRange(0L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE)), intervalWindow, intervalWindow2);
            WindowedValue valueInWindows2 = valueInWindows(KV.of("2", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), intervalWindow, intervalWindow2);
            multiplexingConsumer.accept(valueInWindows);
            multiplexingConsumer.accept(valueInWindows2);
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.of(KV.of(KV.of("5", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(2.0d)), valueInWindows.getTimestamp(), intervalWindow, valueInWindows.getPane()), WindowedValue.of(KV.of(KV.of("5", KV.of(new OffsetRange(2L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(3.0d)), valueInWindows.getTimestamp(), intervalWindow, valueInWindows.getPane()), WindowedValue.of(KV.of(KV.of("5", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(2.0d)), valueInWindows.getTimestamp(), intervalWindow2, valueInWindows.getPane()), WindowedValue.of(KV.of(KV.of("5", KV.of(new OffsetRange(2L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(3.0d)), valueInWindows.getTimestamp(), intervalWindow2, valueInWindows.getPane()), WindowedValue.of(KV.of(KV.of("2", KV.of(new OffsetRange(0L, 1L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(1.0d)), valueInWindows.getTimestamp(), intervalWindow, valueInWindows.getPane()), WindowedValue.of(KV.of(KV.of("2", KV.of(new OffsetRange(1L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(1.0d)), valueInWindows.getTimestamp(), intervalWindow, valueInWindows.getPane()), WindowedValue.of(KV.of(KV.of("2", KV.of(new OffsetRange(0L, 1L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(1.0d)), valueInWindows.getTimestamp(), intervalWindow2, valueInWindows.getPane()), WindowedValue.of(KV.of(KV.of("2", KV.of(new OffsetRange(1L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(1.0d)), valueInWindows.getTimestamp(), intervalWindow2, valueInWindows.getPane())));
            arrayList.clear();
            Assert.assertTrue(pTransformFunctionRegistry2.getFunctions().isEmpty());
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
        }

        @Test
        public void testProcessElementForWindowedSplitAndSizeRestrictionWithNonWindowObservingOptimization() throws Exception {
            Pipeline create = Pipeline.create();
            ((PCollection) ((PCollection) create.apply(Create.of("unused", new String[0]))).apply(Window.into(SlidingWindows.of(Duration.standardSeconds(1L))))).apply(TEST_TRANSFORM_ID, ParDo.of(new NonWindowObservingTestSplittableDoFn()));
            RunnerApi.Pipeline updateTransform = ProtoOverrides.updateTransform(PTransformTranslation.PAR_DO_TRANSFORM_URN, PipelineTranslation.toProto(create, SdkComponents.create(create.getOptions()), true), SplittableParDoExpander.createSizedReplacement());
            RunnerApi.PTransform transformsOrThrow = updateTransform.getComponents().getTransformsOrThrow((String) ((Map.Entry) Iterables.find(updateTransform.getComponents().getTransformsMap().entrySet(), entry -> {
                return ((RunnerApi.PTransform) entry.getValue()).getSpec().getUrn().equals(PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN) && ((RunnerApi.PTransform) entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID);
            })).getKey());
            String inputsOrThrow = transformsOrThrow.getInputsOrThrow(ParDoTranslation.getMainInputName(transformsOrThrow));
            String str = (String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values());
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            KvCoder of = KvCoder.of(KvCoder.of(StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())), DoubleCoder.of());
            Objects.requireNonNull(arrayList);
            pCollectionConsumerRegistry.register(str, TEST_TRANSFORM_ID, (v1) -> {
                r3.add(v1);
            }, of);
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create2 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = updateTransform.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = updateTransform.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = updateTransform.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            factory.createRunnerForPTransform(create2, (BeamFnDataClient) null, (BeamFnStateClient) null, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            Assert.assertTrue(pTransformFunctionRegistry.getFunctions().isEmpty());
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(inputsOrThrow, str));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(inputsOrThrow);
            IntervalWindow intervalWindow = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow intervalWindow2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            WindowedValue valueInWindows = valueInWindows(KV.of("5", KV.of(new OffsetRange(0L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE)), intervalWindow, intervalWindow2);
            WindowedValue valueInWindows2 = valueInWindows(KV.of("2", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), intervalWindow, intervalWindow2);
            multiplexingConsumer.accept(valueInWindows);
            multiplexingConsumer.accept(valueInWindows2);
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.of(KV.of(KV.of("5", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(2.0d)), valueInWindows.getTimestamp(), ImmutableList.of(intervalWindow, intervalWindow2), valueInWindows.getPane()), WindowedValue.of(KV.of(KV.of("5", KV.of(new OffsetRange(2L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(3.0d)), valueInWindows.getTimestamp(), ImmutableList.of(intervalWindow, intervalWindow2), valueInWindows.getPane()), WindowedValue.of(KV.of(KV.of("2", KV.of(new OffsetRange(0L, 1L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(1.0d)), valueInWindows.getTimestamp(), ImmutableList.of(intervalWindow, intervalWindow2), valueInWindows.getPane()), WindowedValue.of(KV.of(KV.of("2", KV.of(new OffsetRange(1L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(1.0d)), valueInWindows.getTimestamp(), ImmutableList.of(intervalWindow, intervalWindow2), valueInWindows.getPane())));
            arrayList.clear();
            Assert.assertTrue(pTransformFunctionRegistry2.getFunctions().isEmpty());
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static HandlesSplits.SplitResult createSplitResult(double d) {
            ByteString.Output newOutput = ByteString.newOutput();
            ByteString.Output newOutput2 = ByteString.newOutput();
            try {
                DoubleCoder.of().encode(Double.valueOf(d), (OutputStream) newOutput);
                DoubleCoder.of().encode(Double.valueOf(1.0d - d), (OutputStream) newOutput2);
            } catch (Exception e) {
            }
            return HandlesSplits.SplitResult.of(ImmutableList.of(BeamFnApi.BundleApplication.newBuilder().setElement(newOutput.toByteString()).setInputId("mainInputId-process").setTransformId("processPTransfromId").build()), ImmutableList.of(BeamFnApi.DelayedBundleApplication.newBuilder().setApplication(BeamFnApi.BundleApplication.newBuilder().setElement(newOutput2.toByteString()).setInputId("mainInputId-process").setTransformId("processPTransfromId").build()).build()));
        }

        @Test
        public void testProcessElementForTruncateAndSizeRestrictionForwardSplitWhenObservingWindows() throws Exception {
            Pipeline create = Pipeline.create();
            PCollection pCollection = (PCollection) create.apply(Create.of("unused", new String[0]));
            PCollectionView<?> pCollectionView = (PCollectionView) pCollection.apply(View.asSingleton());
            WindowObservingTestSplittableDoFn forSplitAtTruncate = WindowObservingTestSplittableDoFn.forSplitAtTruncate(pCollectionView);
            ((PCollection) pCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(1L))))).apply(TEST_TRANSFORM_ID, ParDo.of(forSplitAtTruncate).withSideInputs(pCollectionView));
            RunnerApi.Pipeline updateTransform = ProtoOverrides.updateTransform(PTransformTranslation.PAR_DO_TRANSFORM_URN, PipelineTranslation.toProto(create, SdkComponents.create(create.getOptions()), true), SplittableParDoExpander.createTruncateReplacement());
            RunnerApi.PTransform transformsOrThrow = updateTransform.getComponents().getTransformsOrThrow((String) ((Map.Entry) Iterables.find(updateTransform.getComponents().getTransformsMap().entrySet(), entry -> {
                return ((RunnerApi.PTransform) entry.getValue()).getSpec().getUrn().equals(PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN) && ((RunnerApi.PTransform) entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID);
            })).getKey());
            String inputsOrThrow = transformsOrThrow.getInputsOrThrow(ParDoTranslation.getMainInputName(transformsOrThrow));
            RunnerApi.PCollection pcollectionsOrThrow = updateTransform.getComponents().getPcollectionsOrThrow(inputsOrThrow);
            RehydratedComponents forComponents = RehydratedComponents.forComponents(updateTransform.getComponents());
            WindowedValue.FullWindowedValueCoder fullCoder = WindowedValue.getFullCoder(CoderTranslation.fromProto(updateTransform.getComponents().getCodersOrThrow(pcollectionsOrThrow.getCoderId()), forComponents, CoderTranslation.TranslationContext.DEFAULT), CoderTranslation.fromProto(updateTransform.getComponents().getCodersOrThrow(updateTransform.getComponents().getWindowingStrategiesOrThrow(pcollectionsOrThrow.getWindowingStrategyId()).getWindowCoderId()), forComponents, CoderTranslation.TranslationContext.DEFAULT));
            String str = (String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values());
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of());
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            pCollectionConsumerRegistry.register(str, TEST_TRANSFORM_ID, new SplittableFnDataReceiver(arrayList), KvCoder.of(KvCoder.of(StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())), DoubleCoder.of()));
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            new ArrayList();
            BundleSplitListener.InMemory.create();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create2 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = updateTransform.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = updateTransform.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = updateTransform.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            factory.createRunnerForPTransform(create2, (BeamFnDataClient) null, fakeBeamFnStateClient, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(inputsOrThrow);
            MatcherAssert.assertThat(multiplexingConsumer, (Matcher<? super FnDataReceiver>) Matchers.instanceOf(HandlesSplits.class));
            arrayList.clear();
            IntervalWindow intervalWindow = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow intervalWindow2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            IntervalWindow intervalWindow3 = new IntervalWindow(new Instant(7L), new Instant(12L));
            Future submit = Executors.newSingleThreadExecutor().submit(() -> {
                try {
                    forSplitAtTruncate.waitForSplitElementToBeProcessed();
                    return ((HandlesSplits) multiplexingConsumer).trySplit(0.0d);
                } finally {
                    forSplitAtTruncate.releaseWaitingProcessElementThread();
                }
            });
            WindowedValue valueInWindows = valueInWindows(KV.of(KV.of("7", KV.of(new OffsetRange(0L, 6L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(6.0d)), intervalWindow, intervalWindow2, intervalWindow3);
            multiplexingConsumer.accept(valueInWindows);
            HandlesSplits.SplitResult splitResult = (HandlesSplits.SplitResult) submit.get();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.of(KV.of(KV.of("7", KV.of(new OffsetRange(0L, 3L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(3.0d)), valueInWindows.getTimestamp(), intervalWindow, valueInWindows.getPane()), WindowedValue.of(KV.of(KV.of("7", KV.of(new OffsetRange(0L, 3L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(3.0d)), valueInWindows.getTimestamp(), intervalWindow2, valueInWindows.getPane())));
            HandlesSplits.SplitResult createSplitResult = createSplitResult(0.0d);
            BeamFnApi.BundleApplication bundleApplication = (BeamFnApi.BundleApplication) Iterables.getOnlyElement(createSplitResult.getPrimaryRoots());
            ByteString.Output newOutput = ByteString.newOutput();
            fullCoder.encode((WindowedValue.FullWindowedValueCoder) WindowedValue.of(KV.of(KV.of("7", KV.of(new OffsetRange(0L, 6L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(6.0d)), valueInWindows.getTimestamp(), intervalWindow, valueInWindows.getPane()), (OutputStream) newOutput);
            BeamFnApi.BundleApplication build = BeamFnApi.BundleApplication.newBuilder().setElement(newOutput.toByteString()).setInputId(ParDoTranslation.getMainInputName(transformsOrThrow)).setTransformId(TEST_TRANSFORM_ID).build();
            BeamFnApi.DelayedBundleApplication delayedBundleApplication = (BeamFnApi.DelayedBundleApplication) Iterables.getOnlyElement(createSplitResult.getResidualRoots());
            ByteString.Output newOutput2 = ByteString.newOutput();
            fullCoder.encode((WindowedValue.FullWindowedValueCoder) WindowedValue.of(KV.of(KV.of("7", KV.of(new OffsetRange(0L, 6L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(6.0d)), valueInWindows.getTimestamp(), intervalWindow3, valueInWindows.getPane()), (OutputStream) newOutput2);
            BeamFnApi.DelayedBundleApplication build2 = BeamFnApi.DelayedBundleApplication.newBuilder().setApplication(BeamFnApi.BundleApplication.newBuilder().setElement(newOutput2.toByteString()).setInputId(ParDoTranslation.getMainInputName(transformsOrThrow)).setTransformId(TEST_TRANSFORM_ID).build()).build();
            MatcherAssert.assertThat(splitResult.getPrimaryRoots(), (Matcher<? super List>) Matchers.contains(build, bundleApplication));
            MatcherAssert.assertThat(splitResult.getResidualRoots(), (Matcher<? super List>) Matchers.contains(build2, delayedBundleApplication));
        }

        @Test
        public void testProcessElementForTruncateAndSizeRestrictionForwardSplitWithoutObservingWindow() throws Exception {
            Pipeline create = Pipeline.create();
            ((PCollection) create.apply(Create.of("unused", new String[0]))).apply(TEST_TRANSFORM_ID, ParDo.of(new NonWindowObservingTestSplittableDoFn()));
            RunnerApi.Pipeline updateTransform = ProtoOverrides.updateTransform(PTransformTranslation.PAR_DO_TRANSFORM_URN, PipelineTranslation.toProto(create, SdkComponents.create(create.getOptions()), true), SplittableParDoExpander.createTruncateReplacement());
            RunnerApi.PTransform transformsOrThrow = updateTransform.getComponents().getTransformsOrThrow((String) ((Map.Entry) Iterables.find(updateTransform.getComponents().getTransformsMap().entrySet(), entry -> {
                return ((RunnerApi.PTransform) entry.getValue()).getSpec().getUrn().equals(PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN) && ((RunnerApi.PTransform) entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID);
            })).getKey());
            String inputsOrThrow = transformsOrThrow.getInputsOrThrow(ParDoTranslation.getMainInputName(transformsOrThrow));
            String str = (String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values());
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of());
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            pCollectionConsumerRegistry.register(str, TEST_TRANSFORM_ID, new SplittableFnDataReceiver(arrayList), KvCoder.of(KvCoder.of(StringUtf8Coder.of(), OffsetRange.Coder.of()), DoubleCoder.of()));
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create2 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = updateTransform.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = updateTransform.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = updateTransform.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            factory.createRunnerForPTransform(create2, (BeamFnDataClient) null, fakeBeamFnStateClient, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            HandlesSplits multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(inputsOrThrow);
            MatcherAssert.assertThat(multiplexingConsumer, (Matcher<? super HandlesSplits>) Matchers.instanceOf(HandlesSplits.class));
            Assert.assertEquals(0.7d, multiplexingConsumer.getProgress(), 0.0d);
            Assert.assertEquals(createSplitResult(0.4d), multiplexingConsumer.trySplit(0.4d));
        }

        @Test
        public void testProcessElementForTruncateAndSizeRestriction() throws Exception {
            Pipeline create = Pipeline.create();
            ((PCollection) create.apply(Create.of("unused", new String[0]))).apply(TEST_TRANSFORM_ID, ParDo.of(new NonWindowObservingTestSplittableDoFn()));
            RunnerApi.Pipeline updateTransform = ProtoOverrides.updateTransform(PTransformTranslation.PAR_DO_TRANSFORM_URN, PipelineTranslation.toProto(create, SdkComponents.create(create.getOptions()), true), SplittableParDoExpander.createTruncateReplacement());
            RunnerApi.PTransform transformsOrThrow = updateTransform.getComponents().getTransformsOrThrow((String) ((Map.Entry) Iterables.find(updateTransform.getComponents().getTransformsMap().entrySet(), entry -> {
                return ((RunnerApi.PTransform) entry.getValue()).getSpec().getUrn().equals(PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN) && ((RunnerApi.PTransform) entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID);
            })).getKey());
            String inputsOrThrow = transformsOrThrow.getInputsOrThrow(ParDoTranslation.getMainInputName(transformsOrThrow));
            String str = (String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values());
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of());
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            pCollectionConsumerRegistry.register(str, TEST_TRANSFORM_ID, new SplittableFnDataReceiver(arrayList), KvCoder.of(KvCoder.of(StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())), DoubleCoder.of()));
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create2 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = updateTransform.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = updateTransform.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = updateTransform.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            factory.createRunnerForPTransform(create2, (BeamFnDataClient) null, fakeBeamFnStateClient, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            Assert.assertTrue(pTransformFunctionRegistry.getFunctions().isEmpty());
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(inputsOrThrow, str));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(inputsOrThrow);
            MatcherAssert.assertThat(multiplexingConsumer, (Matcher<? super FnDataReceiver>) Matchers.instanceOf(HandlesSplits.class));
            multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of(KV.of("5", KV.of(new OffsetRange(0L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(5.0d))));
            multiplexingConsumer.accept(WindowedValue.valueInGlobalWindow(KV.of(KV.of("2", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(2.0d))));
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.valueInGlobalWindow(KV.of(KV.of("5", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(2.0d))), WindowedValue.valueInGlobalWindow(KV.of(KV.of("2", KV.of(new OffsetRange(0L, 1L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(1.0d)))));
            arrayList.clear();
            Assert.assertTrue(pTransformFunctionRegistry2.getFunctions().isEmpty());
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
        }

        @Test
        public void testProcessElementForWindowedTruncateAndSizeRestriction() throws Exception {
            Pipeline create = Pipeline.create();
            PCollection pCollection = (PCollection) create.apply(Create.of("unused", new String[0]));
            PCollectionView<?> pCollectionView = (PCollectionView) pCollection.apply(View.asSingleton());
            ((PCollection) pCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(1L))))).apply(TEST_TRANSFORM_ID, ParDo.of(new WindowObservingTestSplittableDoFn(pCollectionView)).withSideInputs(pCollectionView));
            RunnerApi.Pipeline updateTransform = ProtoOverrides.updateTransform(PTransformTranslation.PAR_DO_TRANSFORM_URN, PipelineTranslation.toProto(create, SdkComponents.create(create.getOptions()), true), SplittableParDoExpander.createTruncateReplacement());
            RunnerApi.PTransform transformsOrThrow = updateTransform.getComponents().getTransformsOrThrow((String) ((Map.Entry) Iterables.find(updateTransform.getComponents().getTransformsMap().entrySet(), entry -> {
                return ((RunnerApi.PTransform) entry.getValue()).getSpec().getUrn().equals(PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN) && ((RunnerApi.PTransform) entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID);
            })).getKey());
            String inputsOrThrow = transformsOrThrow.getInputsOrThrow(ParDoTranslation.getMainInputName(transformsOrThrow));
            String str = (String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values());
            FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient(ImmutableMap.of());
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            pCollectionConsumerRegistry.register(str, TEST_TRANSFORM_ID, new SplittableFnDataReceiver(arrayList), KvCoder.of(KvCoder.of(StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())), DoubleCoder.of()));
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create2 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = updateTransform.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = updateTransform.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = updateTransform.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            factory.createRunnerForPTransform(create2, (BeamFnDataClient) null, fakeBeamFnStateClient, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            Assert.assertTrue(pTransformFunctionRegistry.getFunctions().isEmpty());
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(inputsOrThrow, str));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(inputsOrThrow);
            MatcherAssert.assertThat(multiplexingConsumer, (Matcher<? super FnDataReceiver>) Matchers.instanceOf(HandlesSplits.class));
            IntervalWindow intervalWindow = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow intervalWindow2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            WindowedValue valueInWindows = valueInWindows(KV.of(KV.of("5", KV.of(new OffsetRange(0L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(5.0d)), intervalWindow, intervalWindow2);
            WindowedValue valueInWindows2 = valueInWindows(KV.of(KV.of("2", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(2.0d)), intervalWindow, intervalWindow2);
            multiplexingConsumer.accept(valueInWindows);
            multiplexingConsumer.accept(valueInWindows2);
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.of(KV.of(KV.of("5", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(2.0d)), valueInWindows.getTimestamp(), intervalWindow, valueInWindows.getPane()), WindowedValue.of(KV.of(KV.of("5", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(2.0d)), valueInWindows.getTimestamp(), intervalWindow2, valueInWindows.getPane()), WindowedValue.of(KV.of(KV.of("2", KV.of(new OffsetRange(0L, 1L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(1.0d)), valueInWindows.getTimestamp(), intervalWindow, valueInWindows.getPane()), WindowedValue.of(KV.of(KV.of("2", KV.of(new OffsetRange(0L, 1L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(1.0d)), valueInWindows.getTimestamp(), intervalWindow2, valueInWindows.getPane())));
            arrayList.clear();
            Assert.assertTrue(pTransformFunctionRegistry2.getFunctions().isEmpty());
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
        }

        @Test
        public void testProcessElementForWindowedTruncateAndSizeRestrictionWithNonWindowObservingOptimization() throws Exception {
            Pipeline create = Pipeline.create();
            ((PCollection) ((PCollection) create.apply(Create.of("unused", new String[0]))).apply(Window.into(SlidingWindows.of(Duration.standardSeconds(1L))))).apply(TEST_TRANSFORM_ID, ParDo.of(new NonWindowObservingTestSplittableDoFn()));
            RunnerApi.Pipeline updateTransform = ProtoOverrides.updateTransform(PTransformTranslation.PAR_DO_TRANSFORM_URN, PipelineTranslation.toProto(create, SdkComponents.create(create.getOptions()), true), SplittableParDoExpander.createTruncateReplacement());
            RunnerApi.PTransform transformsOrThrow = updateTransform.getComponents().getTransformsOrThrow((String) ((Map.Entry) Iterables.find(updateTransform.getComponents().getTransformsMap().entrySet(), entry -> {
                return ((RunnerApi.PTransform) entry.getValue()).getSpec().getUrn().equals(PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN) && ((RunnerApi.PTransform) entry.getValue()).getUniqueName().contains(TEST_TRANSFORM_ID);
            })).getKey());
            String inputsOrThrow = transformsOrThrow.getInputsOrThrow(ParDoTranslation.getMainInputName(transformsOrThrow));
            String str = (String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values());
            ArrayList arrayList = new ArrayList();
            PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(new MetricsContainerStepMap(), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class));
            pCollectionConsumerRegistry.register(str, TEST_TRANSFORM_ID, new SplittableFnDataReceiver(arrayList), KvCoder.of(KvCoder.of(StringUtf8Coder.of(), KvCoder.of(OffsetRange.Coder.of(), InstantCoder.of())), DoubleCoder.of()));
            PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.START_STATE_NAME);
            PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry((MetricsContainerStepMap) Mockito.mock(MetricsContainerStepMap.class), (ExecutionStateTracker) Mockito.mock(ExecutionStateTracker.class), ExecutionStateTracker.FINISH_STATE_NAME);
            ArrayList arrayList2 = new ArrayList();
            FnApiDoFnRunner.Factory factory = new FnApiDoFnRunner.Factory();
            PipelineOptions create2 = PipelineOptionsFactory.create();
            Supplier ofInstance = Suppliers.ofInstance("57L");
            Objects.requireNonNull(ofInstance);
            java.util.function.Supplier supplier = ofInstance::get;
            Map<String, RunnerApi.PCollection> pcollectionsMap = updateTransform.getComponents().getPcollectionsMap();
            Map<String, RunnerApi.Coder> codersMap = updateTransform.getComponents().getCodersMap();
            Map<String, RunnerApi.WindowingStrategy> windowingStrategiesMap = updateTransform.getComponents().getWindowingStrategiesMap();
            Objects.requireNonNull(arrayList2);
            factory.createRunnerForPTransform(create2, (BeamFnDataClient) null, (BeamFnStateClient) null, (BeamFnTimerClient) null, TEST_TRANSFORM_ID, transformsOrThrow, supplier, pcollectionsMap, codersMap, windowingStrategiesMap, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, (Consumer) null, (v1) -> {
                r15.add(v1);
            }, (Consumer) null, (BundleSplitListener) null, (DoFn.BundleFinalizer) null);
            Assert.assertTrue(pTransformFunctionRegistry.getFunctions().isEmpty());
            arrayList.clear();
            MatcherAssert.assertThat(pCollectionConsumerRegistry.keySet(), (Matcher<? super Set>) Matchers.containsInAnyOrder(inputsOrThrow, str));
            FnDataReceiver multiplexingConsumer = pCollectionConsumerRegistry.getMultiplexingConsumer(inputsOrThrow);
            MatcherAssert.assertThat(multiplexingConsumer, (Matcher<? super FnDataReceiver>) Matchers.instanceOf(HandlesSplits.class));
            IntervalWindow intervalWindow = new IntervalWindow(new Instant(5L), new Instant(10L));
            IntervalWindow intervalWindow2 = new IntervalWindow(new Instant(6L), new Instant(11L));
            WindowedValue valueInWindows = valueInWindows(KV.of(KV.of("5", KV.of(new OffsetRange(0L, 5L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(5.0d)), intervalWindow, intervalWindow2);
            WindowedValue valueInWindows2 = valueInWindows(KV.of(KV.of("2", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(2.0d)), intervalWindow, intervalWindow2);
            multiplexingConsumer.accept(valueInWindows);
            multiplexingConsumer.accept(valueInWindows2);
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.contains(WindowedValue.of(KV.of(KV.of("5", KV.of(new OffsetRange(0L, 2L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(2.0d)), valueInWindows.getTimestamp(), ImmutableList.of(intervalWindow, intervalWindow2), valueInWindows.getPane()), WindowedValue.of(KV.of(KV.of("2", KV.of(new OffsetRange(0L, 1L), GlobalWindow.TIMESTAMP_MIN_VALUE)), Double.valueOf(1.0d)), valueInWindows.getTimestamp(), ImmutableList.of(intervalWindow, intervalWindow2), valueInWindows.getPane())));
            arrayList.clear();
            Assert.assertTrue(pTransformFunctionRegistry2.getFunctions().isEmpty());
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
            ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
            MatcherAssert.assertThat(arrayList, (Matcher<? super ArrayList>) Matchers.empty());
        }
    }

    @RunWith(JUnit4.class)
    /* loaded from: input_file:org/apache/beam/fn/harness/FnApiDoFnRunnerTest$SplitTest.class */
    public static class SplitTest {

        @Rule
        public final ExpectedException expected = ExpectedException.none();
        private IntervalWindow window1;
        private IntervalWindow window2;
        private IntervalWindow window3;
        private WindowedValue<String> currentElement;
        private OffsetRange currentRestriction;
        private Instant currentWatermarkEstimatorState;
        private Instant initialWatermark;
        KV<Instant, Instant> watermarkAndState;
        private static final String PROCESS_TRANSFORM_ID = "processPTransformId";
        private static final String TRUNCATE_TRANSFORM_ID = "truncatePTransformId";
        private static final String PROCESS_INPUT_ID = "processInputId";
        private static final String TRUNCATE_INPUT_ID = "truncateInputId";
        private static final String PROCESS_OUTPUT_ID = "processOutputId";
        private static final String TRUNCATE_OUTPUT_ID = "truncateOutputId";

        private KV<WindowedValue, WindowedValue> createSplitInWindow(OffsetRange offsetRange, OffsetRange offsetRange2, BoundedWindow boundedWindow) {
            return KV.of(WindowedValue.of(KV.of(this.currentElement.getValue(), KV.of(offsetRange, this.currentWatermarkEstimatorState)), this.currentElement.getTimestamp(), boundedWindow, this.currentElement.getPane()), WindowedValue.of(KV.of(this.currentElement.getValue(), KV.of(offsetRange2, this.watermarkAndState.getValue())), this.currentElement.getTimestamp(), boundedWindow, this.currentElement.getPane()));
        }

        private KV<WindowedValue, WindowedValue> createSplitAcrossWindows(List<BoundedWindow> list, List<BoundedWindow> list2) {
            return KV.of(list.isEmpty() ? null : WindowedValue.of(KV.of(this.currentElement.getValue(), KV.of(this.currentRestriction, this.currentWatermarkEstimatorState)), this.currentElement.getTimestamp(), list, this.currentElement.getPane()), list2.isEmpty() ? null : WindowedValue.of(KV.of(this.currentElement.getValue(), KV.of(this.currentRestriction, this.currentWatermarkEstimatorState)), this.currentElement.getTimestamp(), list2, this.currentElement.getPane()));
        }

        private KV<WindowedValue, WindowedValue> createSplitWithSizeInWindow(OffsetRange offsetRange, OffsetRange offsetRange2, BoundedWindow boundedWindow) {
            return KV.of(WindowedValue.of(KV.of(KV.of(this.currentElement.getValue(), KV.of(offsetRange, this.currentWatermarkEstimatorState)), Double.valueOf(offsetRange.getTo() - offsetRange.getFrom())), this.currentElement.getTimestamp(), boundedWindow, this.currentElement.getPane()), WindowedValue.of(KV.of(KV.of(this.currentElement.getValue(), KV.of(offsetRange2, this.watermarkAndState.getValue())), Double.valueOf(offsetRange2.getTo() - offsetRange2.getFrom())), this.currentElement.getTimestamp(), boundedWindow, this.currentElement.getPane()));
        }

        private KV<WindowedValue, WindowedValue> createSplitWithSizeAcrossWindows(List<BoundedWindow> list, List<BoundedWindow> list2) {
            return KV.of(list.isEmpty() ? null : WindowedValue.of(KV.of(KV.of(this.currentElement.getValue(), KV.of(this.currentRestriction, this.currentWatermarkEstimatorState)), Double.valueOf(this.currentRestriction.getTo() - this.currentRestriction.getFrom())), this.currentElement.getTimestamp(), list, this.currentElement.getPane()), list2.isEmpty() ? null : WindowedValue.of(KV.of(KV.of(this.currentElement.getValue(), KV.of(this.currentRestriction, this.currentWatermarkEstimatorState)), Double.valueOf(this.currentRestriction.getTo() - this.currentRestriction.getFrom())), this.currentElement.getTimestamp(), list2, this.currentElement.getPane()));
        }

        @Before
        public void setUp() {
            this.window1 = new IntervalWindow(Instant.ofEpochMilli(0L), Instant.ofEpochMilli(10L));
            this.window2 = new IntervalWindow(Instant.ofEpochMilli(10L), Instant.ofEpochMilli(20L));
            this.window3 = new IntervalWindow(Instant.ofEpochMilli(20L), Instant.ofEpochMilli(30L));
            this.currentElement = WindowedValue.of(SampleElements.Strings.MIN_ELEMENT, Instant.ofEpochMilli(57L), ImmutableList.of(this.window1, this.window2, this.window3), PaneInfo.NO_FIRING);
            this.currentRestriction = new OffsetRange(0L, 100L);
            this.currentWatermarkEstimatorState = Instant.ofEpochMilli(21L);
            this.initialWatermark = Instant.ofEpochMilli(25L);
            this.watermarkAndState = KV.of(Instant.ofEpochMilli(42L), Instant.ofEpochMilli(42L));
        }

        @Test
        public void testScaledProgress() throws Exception {
            RestrictionTracker.Progress from = RestrictionTracker.Progress.from(2.0d, 8.0d);
            RestrictionTracker.Progress scaleProgress = FnApiDoFnRunner.scaleProgress(from, 0, 1);
            Assert.assertEquals(2.0d, scaleProgress.getWorkCompleted(), 0.0d);
            Assert.assertEquals(8.0d, scaleProgress.getWorkRemaining(), 0.0d);
            RestrictionTracker.Progress scaleProgress2 = FnApiDoFnRunner.scaleProgress(from, 0, 3);
            Assert.assertEquals(2.0d, scaleProgress2.getWorkCompleted(), 0.0d);
            Assert.assertEquals(28.0d, scaleProgress2.getWorkRemaining(), 0.0d);
            RestrictionTracker.Progress scaleProgress3 = FnApiDoFnRunner.scaleProgress(from, 1, 3);
            Assert.assertEquals(12.0d, scaleProgress3.getWorkCompleted(), 0.0d);
            Assert.assertEquals(18.0d, scaleProgress3.getWorkRemaining(), 0.0d);
            RestrictionTracker.Progress scaleProgress4 = FnApiDoFnRunner.scaleProgress(from, 2, 3);
            Assert.assertEquals(22.0d, scaleProgress4.getWorkCompleted(), 0.0d);
            Assert.assertEquals(8.0d, scaleProgress4.getWorkRemaining(), 0.0d);
        }

        @Test
        public void testComputeSplitForProcessOrTruncateWithNullTrackerAndSplitDelegate() throws Exception {
            this.expected.expect(IllegalArgumentException.class);
            FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, ImmutableList.copyOf((Collection) this.currentElement.getWindows()), this.currentWatermarkEstimatorState, 0.0d, (RestrictionTracker) null, (HandlesSplits) null, (KV) null, 0, 3);
        }

        @Test
        public void testComputeSplitForProcessOrTruncateWithNotNullTrackerAndDelegate() throws Exception {
            this.expected.expect(IllegalArgumentException.class);
            FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, ImmutableList.copyOf((Collection) this.currentElement.getWindows()), this.currentWatermarkEstimatorState, 0.0d, new OffsetRangeTracker(this.currentRestriction), createSplitDelegate(0.3d, 0.0d, null), (KV) null, 0, 3);
        }

        @Test
        public void testComputeSplitForProcessOrTruncateWithInvalidWatermarkAndState() throws Exception {
            this.expected.expect(NullPointerException.class);
            FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, ImmutableList.copyOf((Collection) this.currentElement.getWindows()), this.currentWatermarkEstimatorState, 0.0d, new OffsetRangeTracker(this.currentRestriction), (HandlesSplits) null, (KV) null, 0, 3);
        }

        @Test
        public void testTrySplitForProcessCheckpointOnFirstWindow() throws Exception {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentElement.getWindows());
            OffsetRangeTracker offsetRangeTracker = new OffsetRangeTracker(this.currentRestriction);
            offsetRangeTracker.tryClaim((Long) 30L);
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, copyOf, this.currentWatermarkEstimatorState, 0.0d, offsetRangeTracker, (HandlesSplits) null, this.watermarkAndState, 0, 3);
            Assert.assertEquals(1L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitInWindow = createSplitInWindow(new OffsetRange(0L, 31L), new OffsetRange(31L, 100L), this.window1);
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(this.window2, this.window3));
            Assert.assertEquals(createSplitInWindow.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertEquals(createSplitInWindow.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessCheckpointOnFirstWindowAfterOneSplit() throws Exception {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentElement.getWindows());
            OffsetRangeTracker offsetRangeTracker = new OffsetRangeTracker(this.currentRestriction);
            offsetRangeTracker.tryClaim((Long) 30L);
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, copyOf, this.currentWatermarkEstimatorState, 0.0d, offsetRangeTracker, (HandlesSplits) null, this.watermarkAndState, 0, 2);
            Assert.assertEquals(1L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitInWindow = createSplitInWindow(new OffsetRange(0L, 31L), new OffsetRange(31L, 100L), this.window1);
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(this.window2));
            Assert.assertEquals(createSplitInWindow.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertEquals(createSplitInWindow.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessSplitOnFirstWindow() throws Exception {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentElement.getWindows());
            OffsetRangeTracker offsetRangeTracker = new OffsetRangeTracker(this.currentRestriction);
            offsetRangeTracker.tryClaim((Long) 30L);
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, copyOf, this.currentWatermarkEstimatorState, 0.2d, offsetRangeTracker, (HandlesSplits) null, this.watermarkAndState, 0, 3);
            Assert.assertEquals(1L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitInWindow = createSplitInWindow(new OffsetRange(0L, 84L), new OffsetRange(84L, 100L), this.window1);
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(this.window2, this.window3));
            Assert.assertEquals(createSplitInWindow.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertEquals(createSplitInWindow.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessSplitOnMiddleWindow() throws Exception {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentElement.getWindows());
            OffsetRangeTracker offsetRangeTracker = new OffsetRangeTracker(this.currentRestriction);
            offsetRangeTracker.tryClaim((Long) 30L);
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window2, copyOf, this.currentWatermarkEstimatorState, 0.2d, offsetRangeTracker, (HandlesSplits) null, this.watermarkAndState, 1, 3);
            Assert.assertEquals(2L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitInWindow = createSplitInWindow(new OffsetRange(0L, 63L), new OffsetRange(63L, 100L), this.window2);
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(this.window1), ImmutableList.of(this.window3));
            Assert.assertEquals(createSplitInWindow.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertEquals(createSplitInWindow.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessSplitOnLastWindow() throws Exception {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentElement.getWindows());
            OffsetRangeTracker offsetRangeTracker = new OffsetRangeTracker(this.currentRestriction);
            offsetRangeTracker.tryClaim((Long) 30L);
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window3, copyOf, this.currentWatermarkEstimatorState, 0.2d, offsetRangeTracker, (HandlesSplits) null, this.watermarkAndState, 2, 3);
            Assert.assertEquals(3L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitInWindow = createSplitInWindow(new OffsetRange(0L, 44L), new OffsetRange(44L, 100L), this.window3);
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(this.window1, this.window2), ImmutableList.of());
            Assert.assertEquals(createSplitInWindow.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertEquals(createSplitInWindow.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessSplitOnFirstWindowFallback() throws Exception {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentElement.getWindows());
            OffsetRangeTracker offsetRangeTracker = new OffsetRangeTracker(this.currentRestriction);
            offsetRangeTracker.tryClaim((Long) 100L);
            Assert.assertNull(offsetRangeTracker.trySplit(0.0d));
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window3, copyOf, this.currentWatermarkEstimatorState, 0.0d, offsetRangeTracker, (HandlesSplits) null, this.watermarkAndState, 0, 3);
            Assert.assertEquals(1L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(this.window1), ImmutableList.of(this.window2, this.window3));
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessSplitOnLastWindowWhenNoElementSplit() throws Exception {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentElement.getWindows());
            OffsetRangeTracker offsetRangeTracker = new OffsetRangeTracker(this.currentRestriction);
            offsetRangeTracker.tryClaim((Long) 100L);
            Assert.assertNull(offsetRangeTracker.trySplit(0.0d));
            Assert.assertNull(FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window3, copyOf, this.currentWatermarkEstimatorState, 0.0d, offsetRangeTracker, (HandlesSplits) null, this.watermarkAndState, 2, 3));
        }

        @Test
        public void testTrySplitForProcessOnWindowBoundaryRoundUp() throws Exception {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentElement.getWindows());
            OffsetRangeTracker offsetRangeTracker = new OffsetRangeTracker(this.currentRestriction);
            offsetRangeTracker.tryClaim((Long) 30L);
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window2, copyOf, this.currentWatermarkEstimatorState, 0.6d, offsetRangeTracker, (HandlesSplits) null, this.watermarkAndState, 0, 3);
            Assert.assertEquals(2L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(this.window1, this.window2), ImmutableList.of(this.window3));
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessOnWindowBoundaryRoundDown() throws Exception {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentElement.getWindows());
            OffsetRangeTracker offsetRangeTracker = new OffsetRangeTracker(this.currentRestriction);
            offsetRangeTracker.tryClaim((Long) 30L);
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window2, copyOf, this.currentWatermarkEstimatorState, 0.3d, offsetRangeTracker, (HandlesSplits) null, this.watermarkAndState, 0, 3);
            Assert.assertEquals(1L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(this.window1), ImmutableList.of(this.window2, this.window3));
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForProcessOnWindowBoundaryRoundDownOnLastWindow() throws Exception {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentElement.getWindows());
            OffsetRangeTracker offsetRangeTracker = new OffsetRangeTracker(this.currentRestriction);
            offsetRangeTracker.tryClaim((Long) 30L);
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window2, copyOf, this.currentWatermarkEstimatorState, 0.9d, offsetRangeTracker, (HandlesSplits) null, this.watermarkAndState, 0, 3);
            Assert.assertEquals(2L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(this.window1, this.window2), ImmutableList.of(this.window3));
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        private HandlesSplits createSplitDelegate(final double d, final double d2, final HandlesSplits.SplitResult splitResult) {
            return new HandlesSplits() { // from class: org.apache.beam.fn.harness.FnApiDoFnRunnerTest.SplitTest.1
                public HandlesSplits.SplitResult trySplit(double d3) {
                    Preconditions.checkArgument(d3 == d2);
                    return splitResult;
                }

                public double getProgress() {
                    return d;
                }
            };
        }

        @Test
        public void testTrySplitForTruncateCheckpointOnFirstWindow() throws Exception {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentElement.getWindows());
            HandlesSplits.SplitResult of = HandlesSplits.SplitResult.of(ImmutableList.of(BeamFnApi.BundleApplication.getDefaultInstance()), ImmutableList.of(BeamFnApi.DelayedBundleApplication.getDefaultInstance()));
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, copyOf, this.currentWatermarkEstimatorState, 0.0d, (RestrictionTracker) null, createSplitDelegate(0.3d, 0.0d, of), (KV) null, 0, 3);
            Assert.assertEquals(1L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(this.window2, this.window3));
            Assert.assertEquals(of, computeSplitForProcessOrTruncate.getDownstreamSplit());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateCheckpointOnFirstWindowAfterOneSplit() throws Exception {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentElement.getWindows());
            HandlesSplits.SplitResult of = HandlesSplits.SplitResult.of(ImmutableList.of(BeamFnApi.BundleApplication.getDefaultInstance()), ImmutableList.of(BeamFnApi.DelayedBundleApplication.getDefaultInstance()));
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, copyOf, this.currentWatermarkEstimatorState, 0.0d, (RestrictionTracker) null, createSplitDelegate(0.3d, 0.0d, of), (KV) null, 0, 2);
            Assert.assertEquals(1L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(this.window2));
            Assert.assertEquals(of, computeSplitForProcessOrTruncate.getDownstreamSplit());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateSplitOnFirstWindow() throws Exception {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentElement.getWindows());
            HandlesSplits.SplitResult of = HandlesSplits.SplitResult.of(ImmutableList.of(BeamFnApi.BundleApplication.getDefaultInstance()), ImmutableList.of(BeamFnApi.DelayedBundleApplication.getDefaultInstance()));
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, copyOf, this.currentWatermarkEstimatorState, 0.2d, (RestrictionTracker) null, createSplitDelegate(0.3d, 0.54d, of), (KV) null, 0, 3);
            Assert.assertEquals(1L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(this.window2, this.window3));
            Assert.assertEquals(of, computeSplitForProcessOrTruncate.getDownstreamSplit());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateSplitOnMiddleWindow() throws Exception {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentElement.getWindows());
            HandlesSplits.SplitResult of = HandlesSplits.SplitResult.of(ImmutableList.of(BeamFnApi.BundleApplication.getDefaultInstance()), ImmutableList.of(BeamFnApi.DelayedBundleApplication.getDefaultInstance()));
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, copyOf, this.currentWatermarkEstimatorState, 0.2d, (RestrictionTracker) null, createSplitDelegate(0.3d, 0.34d, of), (KV) null, 1, 3);
            Assert.assertEquals(2L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(this.window1), ImmutableList.of(this.window3));
            Assert.assertEquals(of, computeSplitForProcessOrTruncate.getDownstreamSplit());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateSplitOnLastWindow() throws Exception {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentElement.getWindows());
            HandlesSplits.SplitResult of = HandlesSplits.SplitResult.of(ImmutableList.of(BeamFnApi.BundleApplication.getDefaultInstance()), ImmutableList.of(BeamFnApi.DelayedBundleApplication.getDefaultInstance()));
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, copyOf, this.currentWatermarkEstimatorState, 0.2d, (RestrictionTracker) null, createSplitDelegate(0.3d, 0.2d, of), (KV) null, 2, 3);
            Assert.assertEquals(3L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(this.window1, this.window2), ImmutableList.of());
            Assert.assertEquals(of, computeSplitForProcessOrTruncate.getDownstreamSplit());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateSplitOnFirstWindowFallback() throws Exception {
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, ImmutableList.copyOf((Collection) this.currentElement.getWindows()), this.currentWatermarkEstimatorState, 0.0d, (RestrictionTracker) null, createSplitDelegate(1.0d, 0.0d, HandlesSplits.SplitResult.of(ImmutableList.of(BeamFnApi.BundleApplication.getDefaultInstance()), ImmutableList.of(BeamFnApi.DelayedBundleApplication.getDefaultInstance()))), (KV) null, 0, 3);
            Assert.assertEquals(1L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(this.window1), ImmutableList.of(this.window2, this.window3));
            Assert.assertNull(computeSplitForProcessOrTruncate.getDownstreamSplit());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateSplitOnLastWindowWhenNoElementSplit() throws Exception {
            Assert.assertNull(FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, ImmutableList.copyOf((Collection) this.currentElement.getWindows()), this.currentWatermarkEstimatorState, 0.0d, (RestrictionTracker) null, createSplitDelegate(1.0d, 0.0d, null), (KV) null, 2, 3));
        }

        @Test
        public void testTrySplitForTruncateOnWindowBoundaryRoundUp() throws Exception {
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, ImmutableList.copyOf((Collection) this.currentElement.getWindows()), this.currentWatermarkEstimatorState, 0.6d, (RestrictionTracker) null, createSplitDelegate(0.3d, 0.0d, HandlesSplits.SplitResult.of(ImmutableList.of(BeamFnApi.BundleApplication.getDefaultInstance()), ImmutableList.of(BeamFnApi.DelayedBundleApplication.getDefaultInstance()))), (KV) null, 0, 3);
            Assert.assertEquals(2L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(this.window1, this.window2), ImmutableList.of(this.window3));
            Assert.assertNull(computeSplitForProcessOrTruncate.getDownstreamSplit());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateOnWindowBoundaryRoundDown() throws Exception {
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, ImmutableList.copyOf((Collection) this.currentElement.getWindows()), this.currentWatermarkEstimatorState, 0.3d, (RestrictionTracker) null, createSplitDelegate(0.3d, 0.0d, HandlesSplits.SplitResult.of(ImmutableList.of(BeamFnApi.BundleApplication.getDefaultInstance()), ImmutableList.of(BeamFnApi.DelayedBundleApplication.getDefaultInstance()))), (KV) null, 0, 3);
            Assert.assertEquals(1L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(this.window1), ImmutableList.of(this.window2, this.window3));
            Assert.assertNull(computeSplitForProcessOrTruncate.getDownstreamSplit());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testTrySplitForTruncateOnWindowBoundaryRoundDownOnLastWindow() throws Exception {
            FnApiDoFnRunner.SplitResultsWithStopIndex computeSplitForProcessOrTruncate = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.window1, ImmutableList.copyOf((Collection) this.currentElement.getWindows()), this.currentWatermarkEstimatorState, 0.6d, (RestrictionTracker) null, createSplitDelegate(0.3d, 0.0d, HandlesSplits.SplitResult.of(ImmutableList.of(BeamFnApi.BundleApplication.getDefaultInstance()), ImmutableList.of(BeamFnApi.DelayedBundleApplication.getDefaultInstance()))), (KV) null, 0, 3);
            Assert.assertEquals(2L, computeSplitForProcessOrTruncate.getNewWindowStopIndex());
            KV<WindowedValue, WindowedValue> createSplitAcrossWindows = createSplitAcrossWindows(ImmutableList.of(this.window1, this.window2), ImmutableList.of(this.window3));
            Assert.assertNull(computeSplitForProcessOrTruncate.getDownstreamSplit());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getPrimarySplitRoot());
            Assert.assertNull(computeSplitForProcessOrTruncate.getWindowSplit().getResidualSplitRoot());
            Assert.assertEquals(createSplitAcrossWindows.getKey(), computeSplitForProcessOrTruncate.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
            Assert.assertEquals(createSplitAcrossWindows.getValue(), computeSplitForProcessOrTruncate.getWindowSplit().getResidualInUnprocessedWindowsRoot());
        }

        @Test
        public void testConstructSplitResultWithInvalidElementSplits() throws Exception {
            this.expected.expect(IllegalArgumentException.class);
            FnApiDoFnRunner.constructSplitResult(FnApiDoFnRunner.WindowedSplitResult.forRoots((WindowedValue) null, WindowedValue.valueInGlobalWindow("elementPrimary"), WindowedValue.valueInGlobalWindow("elementResidual"), (WindowedValue) null), HandlesSplits.SplitResult.of(ImmutableList.of(BeamFnApi.BundleApplication.getDefaultInstance()), ImmutableList.of(BeamFnApi.DelayedBundleApplication.getDefaultInstance())), WindowedValue.getFullCoder(VoidCoder.of(), GlobalWindow.Coder.INSTANCE), Instant.now(), (KV) null, "ptransformId", "inputId", ImmutableList.of("outputId"), (Duration) null);
        }

        private Coder getFullInputCoder(Coder coder, Coder coder2, Coder coder3, Coder coder4) {
            return WindowedValue.getFullCoder(KvCoder.of(KvCoder.of(coder, KvCoder.of(coder2, coder3)), DoubleCoder.of()), coder4);
        }

        private HandlesSplits.SplitResult getProcessElementSplit(String str, String str2) {
            return HandlesSplits.SplitResult.of(ImmutableList.of(BeamFnApi.BundleApplication.newBuilder().setTransformId(str).setInputId(str2).build()), ImmutableList.of(BeamFnApi.DelayedBundleApplication.newBuilder().setApplication(BeamFnApi.BundleApplication.newBuilder().setTransformId(str).setInputId(str2).build()).setRequestedTimeDelay(Durations.fromMillis(1000L)).build()));
        }

        private Timestamp toTimestamp(Instant instant) {
            return Timestamp.newBuilder().setSeconds(instant.getMillis() / 1000).setNanos(((int) (instant.getMillis() % 1000)) * 1000000).build();
        }

        @Test
        public void testConstructSplitResultWithElementSplitFromDelegate() throws Exception {
            Coder fullInputCoder = getFullInputCoder(StringUtf8Coder.of(), OffsetRange.Coder.of(), InstantCoder.of(), IntervalWindow.getCoder());
            HandlesSplits.SplitResult processElementSplit = getProcessElementSplit(PROCESS_TRANSFORM_ID, PROCESS_INPUT_ID);
            HandlesSplits.SplitResult constructSplitResult = FnApiDoFnRunner.constructSplitResult((FnApiDoFnRunner.WindowedSplitResult) null, processElementSplit, fullInputCoder, (Instant) null, (KV) null, TRUNCATE_TRANSFORM_ID, TRUNCATE_INPUT_ID, ImmutableList.of(TRUNCATE_OUTPUT_ID), (Duration) null);
            Assert.assertEquals(processElementSplit.getPrimaryRoots(), constructSplitResult.getPrimaryRoots());
            Assert.assertEquals(processElementSplit.getResidualRoots(), constructSplitResult.getResidualRoots());
        }

        @Test
        public void testConstructSplitResultWithElementSplitFromTracker() throws Exception {
            Coder fullInputCoder = getFullInputCoder(StringUtf8Coder.of(), OffsetRange.Coder.of(), InstantCoder.of(), IntervalWindow.getCoder());
            KV<WindowedValue, WindowedValue> createSplitWithSizeInWindow = createSplitWithSizeInWindow(new OffsetRange(0L, 31L), new OffsetRange(31L, 100L), this.window1);
            HandlesSplits.SplitResult constructSplitResult = FnApiDoFnRunner.constructSplitResult(FnApiDoFnRunner.WindowedSplitResult.forRoots((WindowedValue) null, createSplitWithSizeInWindow.getKey(), createSplitWithSizeInWindow.getValue(), (WindowedValue) null), (HandlesSplits.SplitResult) null, fullInputCoder, (Instant) null, this.watermarkAndState, PROCESS_TRANSFORM_ID, PROCESS_INPUT_ID, ImmutableList.of(PROCESS_OUTPUT_ID), Duration.millis(100L));
            Assert.assertEquals(1L, constructSplitResult.getPrimaryRoots().size());
            BeamFnApi.BundleApplication bundleApplication = (BeamFnApi.BundleApplication) constructSplitResult.getPrimaryRoots().get(0);
            Assert.assertEquals(PROCESS_TRANSFORM_ID, bundleApplication.getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, bundleApplication.getInputId());
            Assert.assertEquals(createSplitWithSizeInWindow.getKey(), fullInputCoder.decode(bundleApplication.getElement().newInput()));
            Assert.assertEquals(1L, constructSplitResult.getResidualRoots().size());
            BeamFnApi.DelayedBundleApplication delayedBundleApplication = (BeamFnApi.DelayedBundleApplication) constructSplitResult.getResidualRoots().get(0);
            Assert.assertEquals(Durations.fromMillis(100L), delayedBundleApplication.getRequestedTimeDelay());
            Assert.assertEquals(PROCESS_TRANSFORM_ID, delayedBundleApplication.getApplication().getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, delayedBundleApplication.getApplication().getInputId());
            Assert.assertEquals(toTimestamp(this.watermarkAndState.getValue()), delayedBundleApplication.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
            Assert.assertEquals(createSplitWithSizeInWindow.getValue(), fullInputCoder.decode(delayedBundleApplication.getApplication().getElement().newInput()));
        }

        @Test
        public void testConstructSplitResultWithOnlyWindowSplits() throws Exception {
            Coder fullInputCoder = getFullInputCoder(StringUtf8Coder.of(), OffsetRange.Coder.of(), InstantCoder.of(), IntervalWindow.getCoder());
            KV<WindowedValue, WindowedValue> createSplitWithSizeAcrossWindows = createSplitWithSizeAcrossWindows(ImmutableList.of(this.window1), ImmutableList.of(this.window2, this.window3));
            HandlesSplits.SplitResult constructSplitResult = FnApiDoFnRunner.constructSplitResult(FnApiDoFnRunner.WindowedSplitResult.forRoots(createSplitWithSizeAcrossWindows.getKey(), (WindowedValue) null, (WindowedValue) null, createSplitWithSizeAcrossWindows.getValue()), (HandlesSplits.SplitResult) null, fullInputCoder, this.initialWatermark, this.watermarkAndState, PROCESS_TRANSFORM_ID, PROCESS_INPUT_ID, ImmutableList.of(PROCESS_OUTPUT_ID), Duration.millis(100L));
            Assert.assertEquals(1L, constructSplitResult.getPrimaryRoots().size());
            BeamFnApi.BundleApplication bundleApplication = (BeamFnApi.BundleApplication) constructSplitResult.getPrimaryRoots().get(0);
            Assert.assertEquals(PROCESS_TRANSFORM_ID, bundleApplication.getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, bundleApplication.getInputId());
            Assert.assertEquals(createSplitWithSizeAcrossWindows.getKey(), fullInputCoder.decode(bundleApplication.getElement().newInput()));
            Assert.assertEquals(1L, constructSplitResult.getResidualRoots().size());
            BeamFnApi.DelayedBundleApplication delayedBundleApplication = (BeamFnApi.DelayedBundleApplication) constructSplitResult.getResidualRoots().get(0);
            Assert.assertEquals(org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.Duration.getDefaultInstance(), delayedBundleApplication.getRequestedTimeDelay());
            Assert.assertEquals(PROCESS_TRANSFORM_ID, delayedBundleApplication.getApplication().getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, delayedBundleApplication.getApplication().getInputId());
            Assert.assertEquals(toTimestamp(this.initialWatermark), delayedBundleApplication.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
            Assert.assertEquals(createSplitWithSizeAcrossWindows.getValue(), fullInputCoder.decode(delayedBundleApplication.getApplication().getElement().newInput()));
        }

        @Test
        public void testConstructSplitResultWithElementAndWindowSplitFromProcess() throws Exception {
            Coder fullInputCoder = getFullInputCoder(StringUtf8Coder.of(), OffsetRange.Coder.of(), InstantCoder.of(), IntervalWindow.getCoder());
            KV<WindowedValue, WindowedValue> createSplitWithSizeAcrossWindows = createSplitWithSizeAcrossWindows(ImmutableList.of(this.window1), ImmutableList.of(this.window3));
            KV<WindowedValue, WindowedValue> createSplitWithSizeInWindow = createSplitWithSizeInWindow(new OffsetRange(0L, 31L), new OffsetRange(31L, 100L), this.window2);
            HandlesSplits.SplitResult constructSplitResult = FnApiDoFnRunner.constructSplitResult(FnApiDoFnRunner.WindowedSplitResult.forRoots(createSplitWithSizeAcrossWindows.getKey(), createSplitWithSizeInWindow.getKey(), createSplitWithSizeInWindow.getValue(), createSplitWithSizeAcrossWindows.getValue()), (HandlesSplits.SplitResult) null, fullInputCoder, this.initialWatermark, this.watermarkAndState, PROCESS_TRANSFORM_ID, PROCESS_INPUT_ID, ImmutableList.of(PROCESS_OUTPUT_ID), Duration.millis(100L));
            Assert.assertEquals(2L, constructSplitResult.getPrimaryRoots().size());
            BeamFnApi.BundleApplication bundleApplication = (BeamFnApi.BundleApplication) constructSplitResult.getPrimaryRoots().get(0);
            BeamFnApi.BundleApplication bundleApplication2 = (BeamFnApi.BundleApplication) constructSplitResult.getPrimaryRoots().get(1);
            Assert.assertEquals(PROCESS_TRANSFORM_ID, bundleApplication.getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, bundleApplication.getInputId());
            Assert.assertEquals(createSplitWithSizeAcrossWindows.getKey(), fullInputCoder.decode(bundleApplication.getElement().newInput()));
            Assert.assertEquals(PROCESS_TRANSFORM_ID, bundleApplication2.getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, bundleApplication2.getInputId());
            Assert.assertEquals(createSplitWithSizeInWindow.getKey(), fullInputCoder.decode(bundleApplication2.getElement().newInput()));
            Assert.assertEquals(2L, constructSplitResult.getResidualRoots().size());
            BeamFnApi.DelayedBundleApplication delayedBundleApplication = (BeamFnApi.DelayedBundleApplication) constructSplitResult.getResidualRoots().get(0);
            BeamFnApi.DelayedBundleApplication delayedBundleApplication2 = (BeamFnApi.DelayedBundleApplication) constructSplitResult.getResidualRoots().get(1);
            Assert.assertEquals(org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.Duration.getDefaultInstance(), delayedBundleApplication.getRequestedTimeDelay());
            Assert.assertEquals(PROCESS_TRANSFORM_ID, delayedBundleApplication.getApplication().getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, delayedBundleApplication.getApplication().getInputId());
            Assert.assertEquals(toTimestamp(this.initialWatermark), delayedBundleApplication.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
            Assert.assertEquals(createSplitWithSizeAcrossWindows.getValue(), fullInputCoder.decode(delayedBundleApplication.getApplication().getElement().newInput()));
            Assert.assertEquals(Durations.fromMillis(100L), delayedBundleApplication2.getRequestedTimeDelay());
            Assert.assertEquals(PROCESS_TRANSFORM_ID, delayedBundleApplication2.getApplication().getTransformId());
            Assert.assertEquals(PROCESS_INPUT_ID, delayedBundleApplication2.getApplication().getInputId());
            Assert.assertEquals(toTimestamp(this.watermarkAndState.getValue()), delayedBundleApplication2.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
            Assert.assertEquals(createSplitWithSizeInWindow.getValue(), fullInputCoder.decode(delayedBundleApplication2.getApplication().getElement().newInput()));
        }

        @Test
        public void testConstructSplitResultWithElementAndWindowSplitFromTruncate() throws Exception {
            Coder fullInputCoder = getFullInputCoder(StringUtf8Coder.of(), OffsetRange.Coder.of(), InstantCoder.of(), IntervalWindow.getCoder());
            KV<WindowedValue, WindowedValue> createSplitWithSizeAcrossWindows = createSplitWithSizeAcrossWindows(ImmutableList.of(this.window1), ImmutableList.of(this.window3));
            HandlesSplits.SplitResult processElementSplit = getProcessElementSplit(PROCESS_TRANSFORM_ID, PROCESS_INPUT_ID);
            HandlesSplits.SplitResult constructSplitResult = FnApiDoFnRunner.constructSplitResult(FnApiDoFnRunner.WindowedSplitResult.forRoots(createSplitWithSizeAcrossWindows.getKey(), (WindowedValue) null, (WindowedValue) null, createSplitWithSizeAcrossWindows.getValue()), processElementSplit, fullInputCoder, this.initialWatermark, this.watermarkAndState, TRUNCATE_TRANSFORM_ID, TRUNCATE_INPUT_ID, ImmutableList.of(TRUNCATE_OUTPUT_ID), Duration.millis(100L));
            Assert.assertEquals(2L, constructSplitResult.getPrimaryRoots().size());
            BeamFnApi.BundleApplication bundleApplication = (BeamFnApi.BundleApplication) constructSplitResult.getPrimaryRoots().get(0);
            BeamFnApi.BundleApplication bundleApplication2 = (BeamFnApi.BundleApplication) constructSplitResult.getPrimaryRoots().get(1);
            Assert.assertEquals(TRUNCATE_TRANSFORM_ID, bundleApplication.getTransformId());
            Assert.assertEquals(TRUNCATE_INPUT_ID, bundleApplication.getInputId());
            Assert.assertEquals(createSplitWithSizeAcrossWindows.getKey(), fullInputCoder.decode(bundleApplication.getElement().newInput()));
            Assert.assertEquals(processElementSplit.getPrimaryRoots().get(0), bundleApplication2);
            Assert.assertEquals(2L, constructSplitResult.getResidualRoots().size());
            BeamFnApi.DelayedBundleApplication delayedBundleApplication = (BeamFnApi.DelayedBundleApplication) constructSplitResult.getResidualRoots().get(0);
            BeamFnApi.DelayedBundleApplication delayedBundleApplication2 = (BeamFnApi.DelayedBundleApplication) constructSplitResult.getResidualRoots().get(1);
            Assert.assertEquals(org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.Duration.getDefaultInstance(), delayedBundleApplication.getRequestedTimeDelay());
            Assert.assertEquals(TRUNCATE_TRANSFORM_ID, delayedBundleApplication.getApplication().getTransformId());
            Assert.assertEquals(TRUNCATE_INPUT_ID, delayedBundleApplication.getApplication().getInputId());
            Assert.assertEquals(toTimestamp(this.initialWatermark), delayedBundleApplication.getApplication().getOutputWatermarksMap().get(TRUNCATE_OUTPUT_ID));
            Assert.assertEquals(createSplitWithSizeAcrossWindows.getValue(), fullInputCoder.decode(delayedBundleApplication.getApplication().getElement().newInput()));
            Assert.assertEquals(processElementSplit.getResidualRoots().get(0), delayedBundleApplication2);
        }
    }
}
