package org.apache.beam.runners.core;

import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.StateTags;
import org.apache.beam.sdk.util.state.TimerInternalsFactory;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypedPValue;
import org.joda.time.Instant;

@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
/* loaded from: input_file:org/apache/beam/runners/core/SplittableParDo.class */
public class SplittableParDo<InputT, OutputT, RestrictionT> extends PTransform<PCollection<InputT>, PCollectionTuple> {
    private final ParDo.BoundMulti<InputT, OutputT> parDo;

    /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDo$GBKIntoKeyedWorkItems.class */
    public static class GBKIntoKeyedWorkItems<KeyT, InputT> extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
        public PCollection<KeyedWorkItem<KeyT, InputT>> expand(PCollection<KV<KeyT, InputT>> pCollection) {
            return PCollection.createPrimitiveOutputInternal(pCollection.getPipeline(), WindowingStrategy.globalDefault(), pCollection.isBounded());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDo$PairWithRestrictionFn.class */
    public static class PairWithRestrictionFn<InputT, OutputT, RestrictionT> extends DoFn<InputT, ElementAndRestriction<InputT, RestrictionT>> {
        private DoFn<InputT, OutputT> fn;
        private transient DoFnInvoker<InputT, OutputT> invoker;

        PairWithRestrictionFn(DoFn<InputT, OutputT> doFn) {
            this.fn = doFn;
        }

        @DoFn.Setup
        public void setup() {
            this.invoker = DoFnInvokers.invokerFor(this.fn);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<InputT, ElementAndRestriction<InputT, RestrictionT>>.ProcessContext processContext) {
            processContext.output(ElementAndRestriction.of(processContext.element(), this.invoker.invokeGetInitialRestriction(processContext.element())));
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDo$ProcessElements.class */
    public static class ProcessElements<InputT, OutputT, RestrictionT> extends PTransform<PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple> {
        private final DoFn<InputT, OutputT> fn;
        private final Coder<InputT> elementCoder;
        private final Coder<RestrictionT> restrictionCoder;
        private final WindowingStrategy<?, ?> windowingStrategy;
        private final List<PCollectionView<?>> sideInputs;
        private final TupleTag<OutputT> mainOutputTag;
        private final TupleTagList sideOutputTags;

        public ProcessElements(DoFn<InputT, OutputT> doFn, Coder<InputT> coder, Coder<RestrictionT> coder2, WindowingStrategy<?, ?> windowingStrategy, List<PCollectionView<?>> list, TupleTag<OutputT> tupleTag, TupleTagList tupleTagList) {
            this.fn = doFn;
            this.elementCoder = coder;
            this.restrictionCoder = coder2;
            this.windowingStrategy = windowingStrategy;
            this.sideInputs = list;
            this.mainOutputTag = tupleTag;
            this.sideOutputTags = tupleTagList;
        }

        public DoFn<InputT, OutputT> getFn() {
            return this.fn;
        }

        public List<PCollectionView<?>> getSideInputs() {
            return this.sideInputs;
        }

        public TupleTag<OutputT> getMainOutputTag() {
            return this.mainOutputTag;
        }

        public TupleTagList getSideOutputTags() {
            return this.sideOutputTags;
        }

        public ProcessFn<InputT, OutputT, RestrictionT, ?> newProcessFn(DoFn<InputT, OutputT> doFn) {
            return new ProcessFn<>(doFn, this.elementCoder, this.restrictionCoder, this.windowingStrategy.getWindowFn().windowCoder());
        }

        public PCollectionTuple expand(PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> pCollection) {
            PCollectionTuple ofPrimitiveOutputsInternal = PCollectionTuple.ofPrimitiveOutputsInternal(pCollection.getPipeline(), TupleTagList.of(this.mainOutputTag).and(this.sideOutputTags.getAll()), this.windowingStrategy, pCollection.isBounded().and(DoFnSignatures.getSignature(this.fn.getClass()).isBoundedPerElement()));
            ofPrimitiveOutputsInternal.get(this.mainOutputTag).setTypeDescriptor(this.fn.getOutputTypeDescriptor());
            return ofPrimitiveOutputsInternal;
        }

        public <T> Coder<T> getDefaultOutputCoder(PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> pCollection, TypedPValue<T> typedPValue) throws CannotProvideCoderException {
            return pCollection.getPipeline().getCoderRegistry().getDefaultCoder(typedPValue.getTypeDescriptor(), this.fn.getInputTypeDescriptor(), pCollection.getCoder().getElementCoder().getElementCoder());
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDo$ProcessFn.class */
    public static class ProcessFn<InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> extends DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {

        @VisibleForTesting
        static final int MAX_OUTPUTS_PER_BUNDLE = 10000;
        private static final StateTag<Object, WatermarkHoldState<GlobalWindow>> watermarkHoldTag = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("hold", OutputTimeFns.outputAtLatestInputTimestamp()));
        private final StateTag<Object, ValueState<WindowedValue<InputT>>> elementTag;
        private StateTag<Object, ValueState<RestrictionT>> restrictionTag;
        private transient StateInternalsFactory<String> stateInternalsFactory;
        private transient TimerInternalsFactory<String> timerInternalsFactory;
        private transient OutputWindowedValue<OutputT> outputWindowedValue;
        private final DoFn<InputT, OutputT> fn;
        private final Coder<? extends BoundedWindow> windowCoder;
        private transient DoFnInvoker<InputT, OutputT> invoker;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDo$ProcessFn$ArgumentProviderForTracker.class */
        public static class ArgumentProviderForTracker<InputT, OutputT, TrackerT extends RestrictionTracker<?>> implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
            private final TrackerT tracker;
            private final DoFn<InputT, OutputT>.ProcessContext processContext;

            ArgumentProviderForTracker(TrackerT trackert, DoFn<InputT, OutputT>.ProcessContext processContext) {
                this.tracker = trackert;
                this.processContext = processContext;
            }

            public BoundedWindow window() {
                throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
            }

            public DoFn.Context context(DoFn<InputT, OutputT> doFn) {
                return this.processContext;
            }

            public DoFn.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
                return this.processContext;
            }

            public DoFn.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
                throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
            }

            public DoFn.InputProvider<InputT> inputProvider() {
                throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
            }

            public DoFn.OutputReceiver<OutputT> outputReceiver() {
                throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
            }

            public WindowingInternals<InputT, OutputT> windowingInternals() {
                throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
            }

            public TrackerT restrictionTracker() {
                return this.tracker;
            }

            public State state(String str) {
                throw new UnsupportedOperationException("State cannot be used with a splittable DoFn");
            }

            public Timer timer(String str) {
                throw new UnsupportedOperationException("Timers cannot be used with a splittable DoFn");
            }
        }

        public ProcessFn(DoFn<InputT, OutputT> doFn, Coder<InputT> coder, Coder<RestrictionT> coder2, Coder<? extends BoundedWindow> coder3) {
            this.fn = doFn;
            this.invoker = DoFnInvokers.invokerFor(doFn);
            this.windowCoder = coder3;
            this.elementTag = StateTags.value("element", WindowedValue.getFullCoder(coder, this.windowCoder));
            this.restrictionTag = StateTags.value("restriction", coder2);
        }

        public void setStateInternalsFactory(StateInternalsFactory<String> stateInternalsFactory) {
            this.stateInternalsFactory = stateInternalsFactory;
        }

        public void setTimerInternalsFactory(TimerInternalsFactory<String> timerInternalsFactory) {
            this.timerInternalsFactory = timerInternalsFactory;
        }

        public void setOutputWindowedValue(OutputWindowedValue<OutputT> outputWindowedValue) {
            this.outputWindowedValue = outputWindowedValue;
        }

        @DoFn.StartBundle
        public void startBundle(DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>.Context context) throws Exception {
            this.invoker.invokeStartBundle(wrapContext(context));
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>.Context context) throws Exception {
            this.invoker.invokeFinishBundle(wrapContext(context));
        }

        /* JADX WARN: Multi-variable type inference failed */
        @DoFn.ProcessElement
        public void processElement(DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>.ProcessContext processContext) {
            ElementAndRestriction of;
            StateInternals stateInternalsForKey = this.stateInternalsFactory.stateInternalsForKey(((KeyedWorkItem) processContext.element()).key());
            TimerInternals timerInternalsForKey = this.timerInternalsFactory.timerInternalsForKey(((KeyedWorkItem) processContext.element()).key());
            TimerInternals.TimerData timerData = (TimerInternals.TimerData) Iterables.getOnlyElement(((KeyedWorkItem) processContext.element()).timersIterable(), null);
            boolean z = timerData == null;
            StateNamespace global = z ? StateNamespaces.global() : timerData.getNamespace();
            ValueState state = stateInternalsForKey.state(global, this.elementTag);
            ValueState state2 = stateInternalsForKey.state(global, this.restrictionTag);
            WatermarkHoldState state3 = stateInternalsForKey.state(global, watermarkHoldTag);
            if (z) {
                WindowedValue implodeWindows = implodeWindows(((KeyedWorkItem) processContext.element()).elementsIterable());
                WindowedValue withValue = implodeWindows.withValue(((ElementAndRestriction) implodeWindows.getValue()).element());
                state.write(withValue);
                of = ElementAndRestriction.of(withValue, ((ElementAndRestriction) implodeWindows.getValue()).restriction());
            } else {
                state.readLater();
                state2.readLater();
                of = ElementAndRestriction.of(state.read(), state2.read());
            }
            RestrictionTracker invokeNewTracker = this.invoker.invokeNewTracker(of.restriction());
            Object[] objArr = new Object[1];
            DoFn.ProcessContinuation invokeProcessElement = this.invoker.invokeProcessElement(wrapTracker(invokeNewTracker, wrapContext(processContext, (WindowedValue) of.element(), invokeNewTracker, objArr)));
            if (objArr[0] == null) {
                objArr[0] = Preconditions.checkNotNull(invokeNewTracker.checkpoint());
            }
            if (!invokeProcessElement.shouldResume()) {
                state.clear();
                state2.clear();
                state3.clear();
                return;
            }
            state2.write(objArr[0]);
            Instant watermark = invokeProcessElement.getWatermark();
            if (watermark == null) {
                watermark = ((WindowedValue) of.element()).getTimestamp();
            }
            Instant plus = timerInternalsForKey.currentProcessingTime().plus(invokeProcessElement.resumeDelay());
            state3.add(watermark);
            timerInternalsForKey.setTimer(TimerInternals.TimerData.of(global, plus, TimeDomain.PROCESSING_TIME));
        }

        private static <InputT, RestrictionT> WindowedValue<ElementAndRestriction<InputT, RestrictionT>> implodeWindows(Iterable<WindowedValue<ElementAndRestriction<InputT, RestrictionT>>> iterable) {
            WindowedValue windowedValue = (WindowedValue) Iterables.getFirst(iterable, null);
            Preconditions.checkState(windowedValue != null, "Got a KeyedWorkItem with no elements and no timers");
            ImmutableList.Builder builder = ImmutableList.builder();
            Iterator<WindowedValue<ElementAndRestriction<InputT, RestrictionT>>> it = iterable.iterator();
            while (it.hasNext()) {
                builder.addAll((Iterable) it.next().getWindows());
            }
            return WindowedValue.of(windowedValue.getValue(), windowedValue.getTimestamp(), builder.build(), windowedValue.getPane());
        }

        private DoFn<InputT, OutputT>.Context wrapContext(DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>.Context context) {
            DoFn<InputT, OutputT> doFn = this.fn;
            doFn.getClass();
            return new DoFn<InputT, OutputT>.Context(doFn, context) { // from class: org.apache.beam.runners.core.SplittableParDo.ProcessFn.1
                final /* synthetic */ DoFn.Context val$baseContext;

                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(doFn);
                    this.val$baseContext = context;
                    doFn.getClass();
                }

                public PipelineOptions getPipelineOptions() {
                    return this.val$baseContext.getPipelineOptions();
                }

                public void output(OutputT outputt) {
                    throwUnsupportedOutput();
                }

                public void outputWithTimestamp(OutputT outputt, Instant instant) {
                    throwUnsupportedOutput();
                }

                public <T> void sideOutput(TupleTag<T> tupleTag, T t) {
                    throwUnsupportedOutput();
                }

                public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
                    throwUnsupportedOutput();
                }

                protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(String str, Combine.CombineFn<AggInputT, ?, AggOutputT> combineFn) {
                    return ProcessFn.this.fn.createAggregator(str, combineFn);
                }

                private void throwUnsupportedOutput() {
                    throw new UnsupportedOperationException(String.format("Splittable DoFn can only output from @%s", DoFn.ProcessElement.class.getSimpleName()));
                }
            };
        }

        private DoFn<InputT, OutputT>.ProcessContext wrapContext(DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>.ProcessContext processContext, WindowedValue<InputT> windowedValue, TrackerT trackert, RestrictionT[] restrictiontArr) {
            DoFn<InputT, OutputT> doFn = this.fn;
            doFn.getClass();
            return new DoFn<InputT, OutputT>.ProcessContext(doFn, windowedValue, restrictiontArr, trackert, processContext) { // from class: org.apache.beam.runners.core.SplittableParDo.ProcessFn.2
                private int numOutputs;
                final /* synthetic */ WindowedValue val$element;
                final /* synthetic */ Object[] val$residualRestrictionHolder;
                final /* synthetic */ RestrictionTracker val$tracker;
                final /* synthetic */ DoFn.ProcessContext val$baseContext;

                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(doFn);
                    this.val$element = windowedValue;
                    this.val$residualRestrictionHolder = restrictiontArr;
                    this.val$tracker = trackert;
                    this.val$baseContext = processContext;
                    doFn.getClass();
                    this.numOutputs = 0;
                }

                public InputT element() {
                    return (InputT) this.val$element.getValue();
                }

                public Instant timestamp() {
                    return this.val$element.getTimestamp();
                }

                public PaneInfo pane() {
                    return this.val$element.getPane();
                }

                public void output(OutputT outputt) {
                    ProcessFn.this.outputWindowedValue.outputWindowedValue(outputt, this.val$element.getTimestamp(), this.val$element.getWindows(), this.val$element.getPane());
                    noteOutput();
                }

                public void outputWithTimestamp(OutputT outputt, Instant instant) {
                    ProcessFn.this.outputWindowedValue.outputWindowedValue(outputt, instant, this.val$element.getWindows(), this.val$element.getPane());
                    noteOutput();
                }

                private void noteOutput() {
                    int i = this.numOutputs + 1;
                    this.numOutputs = i;
                    if (i < ProcessFn.MAX_OUTPUTS_PER_BUNDLE || this.val$residualRestrictionHolder[0] != null) {
                        return;
                    }
                    this.val$residualRestrictionHolder[0] = Preconditions.checkNotNull(this.val$tracker.checkpoint());
                }

                public <T> T sideInput(PCollectionView<T> pCollectionView) {
                    return (T) this.val$baseContext.sideInput(pCollectionView);
                }

                public PipelineOptions getPipelineOptions() {
                    return this.val$baseContext.getPipelineOptions();
                }

                public <T> void sideOutput(TupleTag<T> tupleTag, T t) {
                    ProcessFn.this.outputWindowedValue.sideOutputWindowedValue(tupleTag, t, this.val$element.getTimestamp(), this.val$element.getWindows(), this.val$element.getPane());
                    noteOutput();
                }

                public <T> void sideOutputWithTimestamp(TupleTag<T> tupleTag, T t, Instant instant) {
                    ProcessFn.this.outputWindowedValue.sideOutputWindowedValue(tupleTag, t, instant, this.val$element.getWindows(), this.val$element.getPane());
                    noteOutput();
                }

                protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(String str, Combine.CombineFn<AggInputT, ?, AggOutputT> combineFn) {
                    return ProcessFn.this.fn.createAggregator(str, combineFn);
                }
            };
        }

        private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapTracker(TrackerT trackert, DoFn<InputT, OutputT>.ProcessContext processContext) {
            return new ArgumentProviderForTracker(trackert, processContext);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDo$RandomUniqueKeyFn.class */
    public static class RandomUniqueKeyFn<T> implements SerializableFunction<T, String> {
        private RandomUniqueKeyFn() {
        }

        public String apply(T t) {
            return UUID.randomUUID().toString();
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: apply, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m15apply(Object obj) {
            return apply((RandomUniqueKeyFn<T>) obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/core/SplittableParDo$SplitRestrictionFn.class */
    public static class SplitRestrictionFn<InputT, RestrictionT> extends DoFn<ElementAndRestriction<InputT, RestrictionT>, ElementAndRestriction<InputT, RestrictionT>> {
        private final DoFn<InputT, ?> splittableFn;
        private transient DoFnInvoker<InputT, ?> invoker;

        SplitRestrictionFn(DoFn<InputT, ?> doFn) {
            this.splittableFn = doFn;
        }

        @DoFn.Setup
        public void setup() {
            this.invoker = DoFnInvokers.invokerFor(this.splittableFn);
        }

        @DoFn.ProcessElement
        public void processElement(final DoFn<ElementAndRestriction<InputT, RestrictionT>, ElementAndRestriction<InputT, RestrictionT>>.ProcessContext processContext) {
            final Object element = ((ElementAndRestriction) processContext.element()).element();
            this.invoker.invokeSplitRestriction(element, ((ElementAndRestriction) processContext.element()).restriction(), new DoFn.OutputReceiver<RestrictionT>() { // from class: org.apache.beam.runners.core.SplittableParDo.SplitRestrictionFn.1
                public void output(RestrictionT restrictiont) {
                    processContext.output(ElementAndRestriction.of(element, restrictiont));
                }
            });
        }
    }

    public SplittableParDo(ParDo.BoundMulti<InputT, OutputT> boundMulti) {
        Preconditions.checkNotNull(boundMulti, "parDo must not be null");
        this.parDo = boundMulti;
        Preconditions.checkArgument(DoFnSignatures.getSignature(boundMulti.getNewFn().getClass()).processElement().isSplittable(), "fn must be a splittable DoFn");
    }

    public PCollectionTuple expand(PCollection<InputT> pCollection) {
        return applyTyped(pCollection);
    }

    private PCollectionTuple applyTyped(PCollection<InputT> pCollection) {
        DoFn newFn = this.parDo.getNewFn();
        Coder invokeGetRestrictionCoder = DoFnInvokers.invokerFor(newFn).invokeGetRestrictionCoder(pCollection.getPipeline().getCoderRegistry());
        return applySplitIntoKeyedWorkItems(pCollection, newFn, invokeGetRestrictionCoder).apply("Process", new ProcessElements(newFn, pCollection.getCoder(), invokeGetRestrictionCoder, pCollection.getWindowingStrategy(), this.parDo.getSideInputs(), this.parDo.getMainOutputTag(), this.parDo.getSideOutputTags()));
    }

    private static <InputT, OutputT, RestrictionT> PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> applySplitIntoKeyedWorkItems(PCollection<InputT> pCollection, DoFn<InputT, OutputT> doFn, Coder<RestrictionT> coder) {
        ElementAndRestrictionCoder of = ElementAndRestrictionCoder.of(pCollection.getCoder(), coder);
        PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> coder2 = pCollection.apply("Pair with initial restriction", ParDo.of(new PairWithRestrictionFn(doFn))).setCoder(of).apply("Split restriction", ParDo.of(new SplitRestrictionFn(doFn))).setCoder(of).apply("Assign unique key", WithKeys.of(new RandomUniqueKeyFn())).apply("Group by key", new GBKIntoKeyedWorkItems()).setCoder(KeyedWorkItemCoder.of(StringUtf8Coder.of(), of, pCollection.getWindowingStrategy().getWindowFn().windowCoder()));
        Preconditions.checkArgument(coder2.getWindowingStrategy().getWindowFn() instanceof GlobalWindows, "GBKIntoKeyedWorkItems must produce a globally windowed collection, but windowing strategy was: %s", coder2.getWindowingStrategy());
        return coder2;
    }
}
