/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.flink.CreateStreamingFlinkView;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator;
import org.apache.beam.runners.flink.FlinkStreamingTranslationContext;
import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BeamStoppableFunction;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestStreamSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;

class FlinkStreamingTransformTranslators {
    private static final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String,  @UnknownKeyFor @NonNull @Initialized FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<String, FlinkStreamingPipelineTranslator.StreamTransformTranslator>();

    FlinkStreamingTransformTranslators() {
    }

    public static /*
     * Issues handling annotations - annotations may be inaccurate
     */
     @UnknownKeyFor @NonNull @Initialized FlinkStreamingPipelineTranslator.StreamTransformTranslator<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> getTranslator(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> transform) {
        @Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform);
        return urn == null ? null : TRANSLATORS.get(urn);
    }

    private static @UnknownKeyFor @NonNull @Initialized String getCurrentTransformName(@UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
        return context.getCurrentTransform().getFullName();
    }

    private static /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Tuple2<@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>>, @UnknownKeyFor @NonNull @Initialized DataStream<@UnknownKeyFor @NonNull @Initialized RawUnionValue>> transformSideInputs(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> sideInputs, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
        HashMap<TupleTag, Integer> tagToIntMapping = new HashMap<TupleTag, Integer>();
        HashMap intToViewMapping = new HashMap();
        int count = 0;
        for (PCollectionView<?> pCollectionView : sideInputs) {
            TupleTag tupleTag = pCollectionView.getTagInternal();
            intToViewMapping.put(count, pCollectionView);
            tagToIntMapping.put(tupleTag, count);
            ++count;
        }
        ArrayList inputCoders = new ArrayList();
        for (PCollectionView<?> pCollectionView : sideInputs) {
            DataStream sideInputStream = context.getInputDataStream((PValue)pCollectionView);
            TypeInformation tpe = sideInputStream.getType();
            if (!(tpe instanceof CoderTypeInformation)) {
                throw new IllegalStateException("Input Stream TypeInformation is no CoderTypeInformation.");
            }
            Coder coder = ((CoderTypeInformation)tpe).getCoder();
            inputCoders.add(coder);
        }
        UnionCoder unionCoder = UnionCoder.of(inputCoders);
        CoderTypeInformation coderTypeInformation = new CoderTypeInformation(unionCoder, context.getPipelineOptions());
        SingleOutputStreamOperator sideInputUnion = null;
        for (PCollectionView<?> sideInput : sideInputs) {
            TupleTag tag = sideInput.getTagInternal();
            int intTag = (Integer)tagToIntMapping.get(tag);
            DataStream sideInputStream = context.getInputDataStream((PValue)sideInput);
            SingleOutputStreamOperator unionValueStream = sideInputStream.map(new ToRawUnion(intTag, context.getPipelineOptions())).returns(coderTypeInformation);
            if (sideInputUnion == null) {
                sideInputUnion = unionValueStream;
                continue;
            }
            sideInputUnion = sideInputUnion.union(new DataStream[]{unionValueStream});
        }
        if (sideInputUnion == null) {
            throw new IllegalStateException("No unioned side inputs, this indicates a bug.");
        }
        return new Tuple2(intToViewMapping, sideInputUnion);
    }

    static {
        TRANSLATORS.put("beam:transform:impulse:v1", new ImpulseTranslator());
        TRANSLATORS.put("beam:transform:read:v1", new ReadSourceTranslator());
        TRANSLATORS.put("beam:transform:pardo:v1", new ParDoStreamingTranslator());
        TRANSLATORS.put("beam:runners_core:transforms:splittable_process:v1", new SplittableProcessElementsStreamingTranslator());
        TRANSLATORS.put("beam:runners_core:transforms:splittable_gbkikwi:v1", new GBKIntoKeyedWorkItemsTranslator());
        TRANSLATORS.put("beam:transform:window_into:v1", new WindowAssignTranslator());
        TRANSLATORS.put("beam:transform:flatten:v1", new FlattenPCollectionTranslator());
        TRANSLATORS.put("beam:transform:flink:create-streaming-flink-view:v1", new CreateViewStreamingTranslator());
        TRANSLATORS.put("beam:transform:reshuffle:v1", new ReshuffleTranslatorStreaming());
        TRANSLATORS.put("beam:transform:group_by_key:v1", new GroupByKeyTranslator());
        TRANSLATORS.put("beam:transform:combine_per_key:v1", new CombinePerKeyTranslator());
        TRANSLATORS.put("beam:transform:teststream:v1", new TestStreamTranslator());
    }

    static class UnboundedSourceWrapperNoValueWithRecordId<@UnknownKeyFor OutputT, @UnknownKeyFor CheckpointMarkT extends // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized UnboundedSource.CheckpointMark>
    extends RichParallelSourceFunction<WindowedValue<OutputT>>
    implements ProcessingTimeCallback,
    BeamStoppableFunction,
    CheckpointListener,
    CheckpointedFunction {
        private final @UnknownKeyFor @NonNull @Initialized UnboundedSourceWrapper<OutputT, CheckpointMarkT> unboundedSourceWrapper;

        @VisibleForTesting
        @UnknownKeyFor @NonNull @Initialized UnboundedSourceWrapper<OutputT, CheckpointMarkT> getUnderlyingSource() {
            return this.unboundedSourceWrapper;
        }

        UnboundedSourceWrapperNoValueWithRecordId(@UnknownKeyFor @NonNull @Initialized UnboundedSourceWrapper<OutputT, CheckpointMarkT> unboundedSourceWrapper) {
            this.unboundedSourceWrapper = unboundedSourceWrapper;
        }

        public void open(@UnknownKeyFor @NonNull @Initialized Configuration parameters) throws @UnknownKeyFor @NonNull @Initialized Exception {
            this.unboundedSourceWrapper.setRuntimeContext(this.getRuntimeContext());
            this.unboundedSourceWrapper.open(parameters);
        }

        public void run(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized SourceFunction.SourceContext<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutputT>> ctx) throws @UnknownKeyFor @NonNull @Initialized Exception {
            this.unboundedSourceWrapper.run(new SourceContextWrapper(ctx));
        }

        public void initializeState(@UnknownKeyFor @NonNull @Initialized FunctionInitializationContext context) throws @UnknownKeyFor @NonNull @Initialized Exception {
            this.unboundedSourceWrapper.initializeState(context);
        }

        public void snapshotState(@UnknownKeyFor @NonNull @Initialized FunctionSnapshotContext context) throws @UnknownKeyFor @NonNull @Initialized Exception {
            this.unboundedSourceWrapper.snapshotState(context);
        }

        public void notifyCheckpointComplete(@UnknownKeyFor @NonNull @Initialized long checkpointId) throws @UnknownKeyFor @NonNull @Initialized Exception {
            this.unboundedSourceWrapper.notifyCheckpointComplete(checkpointId);
        }

        @Override
        public void stop() {
            this.unboundedSourceWrapper.stop();
        }

        public void cancel() {
            this.unboundedSourceWrapper.cancel();
        }

        public void onProcessingTime(@UnknownKeyFor @NonNull @Initialized long timestamp) throws @UnknownKeyFor @NonNull @Initialized Exception {
            this.unboundedSourceWrapper.onProcessingTime(timestamp);
        }

        private final class SourceContextWrapper
        implements SourceFunction.SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> {
            private final // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @NonNull @Initialized SourceFunction.SourceContext<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutputT>> ctx;

            private SourceContextWrapper(SourceFunction.SourceContext<WindowedValue<OutputT>> ctx) {
                this.ctx = ctx;
            }

            public void collect(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<OutputT>> element) {
                Object originalValue = ((ValueWithRecordId)element.getValue()).getValue();
                WindowedValue output = WindowedValue.of((Object)originalValue, (Instant)element.getTimestamp(), (Collection)element.getWindows(), (PaneInfo)element.getPane());
                this.ctx.collect((Object)output);
            }

            public void collectWithTimestamp(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<OutputT>> element, @UnknownKeyFor @NonNull @Initialized long timestamp) {
                Object originalValue = ((ValueWithRecordId)element.getValue()).getValue();
                WindowedValue output = WindowedValue.of((Object)originalValue, (Instant)element.getTimestamp(), (Collection)element.getWindows(), (PaneInfo)element.getPane());
                this.ctx.collectWithTimestamp((Object)output, timestamp);
            }

            public void emitWatermark(@UnknownKeyFor @NonNull @Initialized Watermark mark) {
                this.ctx.emitWatermark(mark);
            }

            public void markAsTemporarilyIdle() {
                this.ctx.markAsTemporarilyIdle();
            }

            public @UnknownKeyFor @NonNull @Initialized Object getCheckpointLock() {
                return this.ctx.getCheckpointLock();
            }

            public void close() {
                this.ctx.close();
            }
        }
    }

    private static class TestStreamTranslator<@UnknownKeyFor T>
    extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TestStream<T>> {
        private TestStreamTranslator() {
        }

        @Override
        void translateNode(@UnknownKeyFor @NonNull @Initialized TestStream<T> testStream, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            byte[] payload;
            Coder valueCoder = testStream.getValueCoder();
            TestStream.TestStreamCoder testStreamCoder = TestStream.TestStreamCoder.of((Coder)valueCoder);
            try {
                payload = CoderUtils.encodeToByteArray((Coder)testStreamCoder, testStream);
            }
            catch (CoderException e) {
                throw new RuntimeException("Could not encode TestStream.", e);
            }
            SerializableFunction & Serializable testStreamDecoder = (SerializableFunction & Serializable)bytes -> {
                try {
                    return (TestStream)CoderUtils.decodeFromByteArray((Coder)TestStream.TestStreamCoder.of((Coder)valueCoder), (byte[])bytes);
                }
                catch (CoderException e) {
                    throw new RuntimeException("Can't decode TestStream payload.", e);
                }
            };
            WindowedValue.FullWindowedValueCoder elementCoder = WindowedValue.getFullCoder((Coder)valueCoder, (Coder)GlobalWindow.Coder.INSTANCE);
            DataStreamSource source = context.getExecutionEnvironment().addSource(new TestStreamSource(testStreamDecoder, payload), new CoderTypeInformation(elementCoder, context.getPipelineOptions()));
            context.setOutputDataStream((PValue)context.getOutput(testStream), (DataStream<?>)source);
        }
    }

    private static class CreateStreamingFlinkViewPayloadTranslator
    extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<CreateStreamingFlinkView.CreateFlinkPCollectionView<?, ?>> {
        private CreateStreamingFlinkViewPayloadTranslator() {
        }

        public @UnknownKeyFor @NonNull @Initialized String getUrn(/*
         * Issues handling annotations - annotations may be inaccurate
         */
         @UnknownKeyFor @NonNull @Initialized CreateStreamingFlinkView.CreateFlinkPCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> transform) {
            return "beam:transform:flink:create-streaming-flink-view:v1";
        }
    }

    public static class FlinkTransformsRegistrar
    implements TransformPayloadTranslatorRegistrar {
        public @UnknownKeyFor @NonNull @Initialized Map<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized PTransform>, @KeyForBottom @NonNull @Initialized ? extends // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized PTransformTranslation.TransformPayloadTranslator> getTransformPayloadTranslators() {
            return ImmutableMap.builder().put(CreateStreamingFlinkView.CreateFlinkPCollectionView.class, (Object)new CreateStreamingFlinkViewPayloadTranslator()).put(SplittableParDoViaKeyedWorkItems.ProcessElements.class, (Object)PTransformTranslation.TransformPayloadTranslator.NotSerializable.forUrn((String)"beam:runners_core:transforms:splittable_process:v1")).build();
        }
    }

    private static class SplittableParDoProcessElementsTranslator
    extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?, ?>> {
        private SplittableParDoProcessElementsTranslator() {
        }

        public @UnknownKeyFor @NonNull @Initialized String getUrn(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized SplittableParDoViaKeyedWorkItems.ProcessElements<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> transform) {
            return "beam:runners_core:transforms:splittable_process:v1";
        }
    }

    static class ToGroupByKeyResult<@UnknownKeyFor KeyT, @UnknownKeyFor ValueT>
    extends RichFlatMapFunction<WindowedValue<KV<KeyT, Iterable<byte[]>>>, WindowedValue<KV<KeyT, Iterable<ValueT>>>> {
        private final @UnknownKeyFor @NonNull @Initialized SerializablePipelineOptions options;
        private final @UnknownKeyFor @NonNull @Initialized Coder<ValueT> valueCoder;

        ToGroupByKeyResult(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized Coder<ValueT> valueCoder) {
            this.options = new SerializablePipelineOptions(options);
            this.valueCoder = valueCoder;
        }

        public void open(@UnknownKeyFor @NonNull @Initialized Configuration parameters) {
            FileSystems.setDefaultPipelineOptions((PipelineOptions)this.options.get());
        }

        public void flatMap(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<KeyT, @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>>> element, @UnknownKeyFor @NonNull @Initialized Collector<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<KeyT, @UnknownKeyFor @NonNull @Initialized Iterable<ValueT>>>> collector) throws @UnknownKeyFor @NonNull @Initialized CoderException {
            ArrayList<Object> result = new ArrayList<Object>();
            for (byte[] binaryValue : (Iterable)((KV)element.getValue()).getValue()) {
                result.add(CoderUtils.decodeFromByteArray(this.valueCoder, (byte[])binaryValue));
            }
            collector.collect((Object)element.withValue((Object)KV.of((Object)((KV)element.getValue()).getKey(), result)));
        }
    }

    static class ToBinaryKeyedWorkItem<@UnknownKeyFor K, @UnknownKeyFor InputT>
    extends RichFlatMapFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KeyedWorkItem<K, byte[]>>> {
        private final @UnknownKeyFor @NonNull @Initialized SerializablePipelineOptions options;
        private final @UnknownKeyFor @NonNull @Initialized Coder<InputT> valueCoder;

        ToBinaryKeyedWorkItem(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized Coder<InputT> valueCoder) {
            this.options = new SerializablePipelineOptions(options);
            this.valueCoder = valueCoder;
        }

        public void open(@UnknownKeyFor @NonNull @Initialized Configuration parameters) {
            FileSystems.setDefaultPipelineOptions((PipelineOptions)this.options.get());
        }

        public void flatMap(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>> inWithMultipleWindows, @UnknownKeyFor @NonNull @Initialized Collector<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []>>> out) throws @UnknownKeyFor @NonNull @Initialized CoderException {
            for (WindowedValue in : inWithMultipleWindows.explodeWindows()) {
                byte[] binaryValue = CoderUtils.encodeToByteArray(this.valueCoder, (Object)((KV)in.getValue()).getValue());
                SingletonKeyedWorkItem workItem = new SingletonKeyedWorkItem(((KV)in.getValue()).getKey(), in.withValue((Object)binaryValue));
                out.collect((Object)in.withValue(workItem));
            }
        }
    }

    static class ToKeyedWorkItem<@UnknownKeyFor K, @UnknownKeyFor InputT>
    extends RichFlatMapFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KeyedWorkItem<K, InputT>>> {
        private final @UnknownKeyFor @NonNull @Initialized SerializablePipelineOptions options;

        ToKeyedWorkItem(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
            this.options = new SerializablePipelineOptions(options);
        }

        public void open(@UnknownKeyFor @NonNull @Initialized Configuration parameters) {
            FileSystems.setDefaultPipelineOptions((PipelineOptions)this.options.get());
        }

        public void flatMap(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>> inWithMultipleWindows, @UnknownKeyFor @NonNull @Initialized Collector<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, InputT>>> out) {
            for (WindowedValue in : inWithMultipleWindows.explodeWindows()) {
                SingletonKeyedWorkItem workItem = new SingletonKeyedWorkItem(((KV)in.getValue()).getKey(), in.withValue(((KV)in.getValue()).getValue()));
                out.collect((Object)in.withValue(workItem));
            }
        }
    }

    private static class FlattenPCollectionTranslator<@UnknownKeyFor T>
    extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<PCollection<T>, PCollection<T>>> {
        private FlattenPCollectionTranslator() {
        }

        @Override
        public void translateNode(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<T>, @UnknownKeyFor @NonNull @Initialized PCollection<T>> transform, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            Map<TupleTag<PCollection<T>>, PCollection<PCollection<T>>> allInputs = context.getInputs(transform);
            if (allInputs.isEmpty()) {
                DataStreamSource dummySource = context.getExecutionEnvironment().fromElements((Object[])new String[]{"dummy"});
                SingleOutputStreamOperator result = dummySource.flatMap((FlatMapFunction & Serializable)(s, collector) -> {}).returns(new CoderTypeInformation(WindowedValue.getFullCoder((Coder)VoidCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE), context.getPipelineOptions()));
                context.setOutputDataStream((PValue)context.getOutput(transform), (DataStream<?>)result);
            } else {
                DataStream current;
                DataStream result = null;
                HashMap<SingleOutputStreamOperator, Integer> duplicates = new HashMap<SingleOutputStreamOperator, Integer>();
                for (PValue pValue : allInputs.values()) {
                    current = context.getInputDataStream(pValue);
                    Integer oldValue = duplicates.put((SingleOutputStreamOperator)current, 1);
                    if (oldValue == null) continue;
                    duplicates.put((SingleOutputStreamOperator)current, oldValue + 1);
                }
                for (PValue pValue : allInputs.values()) {
                    current = context.getInputDataStream(pValue);
                    Integer timesRequired = (Integer)duplicates.get(current);
                    if (timesRequired > 1) {
                        current = current.flatMap(new FlatMapFunction<T, T>(){
                            private static final @UnknownKeyFor @NonNull @Initialized long serialVersionUID = 1L;

                            public void flatMap(T t, @UnknownKeyFor @NonNull @Initialized Collector<T> collector) throws @UnknownKeyFor @NonNull @Initialized Exception {
                                collector.collect(t);
                            }
                        });
                    }
                    result = result == null ? current : result.union(new DataStream[]{current});
                }
                context.setOutputDataStream((PValue)context.getOutput(transform), result);
            }
        }
    }

    private static class ToKeyedWorkItemInGlobalWindow<@UnknownKeyFor K, @UnknownKeyFor InputT>
    extends RichFlatMapFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KeyedWorkItem<K, InputT>>> {
        private final @UnknownKeyFor @NonNull @Initialized SerializablePipelineOptions options;

        ToKeyedWorkItemInGlobalWindow(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
            this.options = new SerializablePipelineOptions(options);
        }

        public void open(@UnknownKeyFor @NonNull @Initialized Configuration parameters) {
            FileSystems.setDefaultPipelineOptions((PipelineOptions)this.options.get());
        }

        public void flatMap(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>> inWithMultipleWindows, @UnknownKeyFor @NonNull @Initialized Collector<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, InputT>>> out) throws @UnknownKeyFor @NonNull @Initialized Exception {
            for (WindowedValue in : inWithMultipleWindows.explodeWindows()) {
                SingletonKeyedWorkItem workItem = new SingletonKeyedWorkItem(((KV)in.getValue()).getKey(), in.withValue(((KV)in.getValue()).getValue()));
                out.collect((Object)WindowedValue.valueInGlobalWindow(workItem));
            }
        }
    }

    private static class GBKIntoKeyedWorkItemsTranslator<@UnknownKeyFor K, @UnknownKeyFor InputT>
    extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KeyedWorkItem<K, InputT>>>> {
        private GBKIntoKeyedWorkItemsTranslator() {
        }

        @Override
        @UnknownKeyFor @NonNull @Initialized boolean canTranslate(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, InputT>>> transform, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            return true;
        }

        @Override
        public void translateNode(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, InputT>>> transform, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            PCollection<KV<K, InputT>> input = context.getInput(transform);
            KvCoder inputKvCoder = (KvCoder)input.getCoder();
            SingletonKeyedWorkItemCoder workItemCoder = SingletonKeyedWorkItemCoder.of(inputKvCoder.getKeyCoder(), inputKvCoder.getValueCoder(), (Coder<? extends BoundedWindow>)input.getWindowingStrategy().getWindowFn().windowCoder());
            WindowedValue.ValueOnlyWindowedValueCoder windowedWorkItemCoder = WindowedValue.getValueOnlyCoder(workItemCoder);
            CoderTypeInformation workItemTypeInfo = new CoderTypeInformation(windowedWorkItemCoder, context.getPipelineOptions());
            DataStream inputDataStream = context.getInputDataStream((PValue)input);
            SingleOutputStreamOperator workItemStream = inputDataStream.flatMap(new ToKeyedWorkItemInGlobalWindow(context.getPipelineOptions())).returns(workItemTypeInfo).name("ToKeyedWorkItem");
            KeyedStream keyedWorkItemStream = workItemStream.keyBy(new WorkItemKeySelector(inputKvCoder.getKeyCoder(), new SerializablePipelineOptions(context.getPipelineOptions())));
            context.setOutputDataStream((PValue)context.getOutput(transform), (DataStream<?>)keyedWorkItemStream);
        }
    }

    private static class CombinePerKeyTranslator<@UnknownKeyFor K, @UnknownKeyFor InputT, @UnknownKeyFor OutputT>
    extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
        private CombinePerKeyTranslator() {
        }

        @Override
        @UnknownKeyFor @NonNull @Initialized boolean canTranslate(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, OutputT>>> transform, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            PCollection<KV<K, InputT>> input = context.getInput(transform);
            WindowingStrategy windowingStrategy = input.getWindowingStrategy();
            return windowingStrategy.getWindowFn().isNonMerging() || ((Combine.PerKey)transform).getSideInputs().isEmpty();
        }

        @Override
        public void translateNode(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, OutputT>>> transform, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            String fullName = FlinkStreamingTransformTranslators.getCurrentTransformName(context);
            PCollection<KV<K, InputT>> input = context.getInput(transform);
            WindowingStrategy windowingStrategy = input.getWindowingStrategy();
            KvCoder inputKvCoder = (KvCoder)input.getCoder();
            SingletonKeyedWorkItemCoder workItemCoder = SingletonKeyedWorkItemCoder.of(inputKvCoder.getKeyCoder(), inputKvCoder.getValueCoder(), (Coder<? extends BoundedWindow>)input.getWindowingStrategy().getWindowFn().windowCoder());
            DataStream inputDataStream = context.getInputDataStream((PValue)input);
            WindowedValue.FullWindowedValueCoder windowedWorkItemCoder = WindowedValue.getFullCoder(workItemCoder, (Coder)input.getWindowingStrategy().getWindowFn().windowCoder());
            CoderTypeInformation workItemTypeInfo = new CoderTypeInformation(windowedWorkItemCoder, context.getPipelineOptions());
            SingleOutputStreamOperator workItemStream = inputDataStream.flatMap(new ToKeyedWorkItem(context.getPipelineOptions())).returns(workItemTypeInfo).name("ToKeyedWorkItem");
            WorkItemKeySelector keySelector = new WorkItemKeySelector(inputKvCoder.getKeyCoder(), new SerializablePipelineOptions(context.getPipelineOptions()));
            KeyedStream keyedWorkItemStream = workItemStream.keyBy(keySelector);
            CombineFnBase.GlobalCombineFn combineFn = ((Combine.PerKey)transform).getFn();
            SystemReduceFn reduceFn = SystemReduceFn.combining((Coder)inputKvCoder.getKeyCoder(), (AppliedCombineFn)AppliedCombineFn.withInputCoder((CombineFnBase.GlobalCombineFn)combineFn, (CoderRegistry)input.getPipeline().getCoderRegistry(), (KvCoder)inputKvCoder));
            Coder<WindowedValue<KV<K, OutputT>>> outputCoder = context.getWindowedInputCoder(context.getOutput(transform));
            TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo = context.getTypeInfo(context.getOutput(transform));
            List sideInputs = ((Combine.PerKey)transform).getSideInputs();
            if (sideInputs.isEmpty()) {
                TupleTag mainTag = new TupleTag("main output");
                WindowDoFnOperator doFnOperator = new WindowDoFnOperator(reduceFn, fullName, windowedWorkItemCoder, mainTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory<KV<K, OutputT>>(mainTag, outputCoder, new SerializablePipelineOptions(context.getPipelineOptions())), (WindowingStrategy<?, ?>)windowingStrategy, new HashMap(), (Collection<PCollectionView<?>>)Collections.emptyList(), context.getPipelineOptions(), inputKvCoder.getKeyCoder(), keySelector);
                SingleOutputStreamOperator outDataStream = keyedWorkItemStream.transform(fullName, outputTypeInfo, doFnOperator).uid(fullName);
                context.setOutputDataStream((PValue)context.getOutput(transform), (DataStream<?>)outDataStream);
            } else {
                Tuple2 transformSideInputs = FlinkStreamingTransformTranslators.transformSideInputs(sideInputs, context);
                TupleTag mainTag = new TupleTag("main output");
                WindowDoFnOperator doFnOperator = new WindowDoFnOperator(reduceFn, fullName, windowedWorkItemCoder, mainTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory<KV<K, OutputT>>(mainTag, outputCoder, new SerializablePipelineOptions(context.getPipelineOptions())), (WindowingStrategy<?, ?>)windowingStrategy, (Map)transformSideInputs.f0, sideInputs, context.getPipelineOptions(), inputKvCoder.getKeyCoder(), keySelector);
                TwoInputTransformation rawFlinkTransform = new TwoInputTransformation(keyedWorkItemStream.getTransformation(), ((DataStream)transformSideInputs.f1).broadcast().getTransformation(), transform.getName(), doFnOperator, outputTypeInfo, keyedWorkItemStream.getParallelism());
                rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType());
                rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), null);
                SingleOutputStreamOperator outDataStream = new SingleOutputStreamOperator(keyedWorkItemStream.getExecutionEnvironment(), (Transformation)rawFlinkTransform){};
                keyedWorkItemStream.getExecutionEnvironment().addOperator((Transformation)rawFlinkTransform);
                context.setOutputDataStream((PValue)context.getOutput(transform), (DataStream<?>)outDataStream);
            }
        }
    }

    private static class GroupByKeyTranslator<@UnknownKeyFor K, @UnknownKeyFor InputT>
    extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> {
        private GroupByKeyTranslator() {
        }

        @Override
        public void translateNode(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, @UnknownKeyFor @NonNull @Initialized Iterable<InputT>>>> transform, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            PCollection<KV<K, InputT>> input = context.getInput(transform);
            WindowingStrategy windowingStrategy = input.getWindowingStrategy();
            KvCoder inputKvCoder = (KvCoder)input.getCoder();
            SingletonKeyedWorkItemCoder workItemCoder = SingletonKeyedWorkItemCoder.of(inputKvCoder.getKeyCoder(), ByteArrayCoder.of(), (Coder<? extends BoundedWindow>)input.getWindowingStrategy().getWindowFn().windowCoder());
            DataStream inputDataStream = context.getInputDataStream((PValue)input);
            WindowedValue.FullWindowedValueCoder windowedWorkItemCoder = WindowedValue.getFullCoder(workItemCoder, (Coder)input.getWindowingStrategy().getWindowFn().windowCoder());
            CoderTypeInformation workItemTypeInfo = new CoderTypeInformation(windowedWorkItemCoder, context.getPipelineOptions());
            SingleOutputStreamOperator workItemStream = inputDataStream.flatMap(new ToBinaryKeyedWorkItem(context.getPipelineOptions(), inputKvCoder.getValueCoder())).returns(workItemTypeInfo).name("ToBinaryKeyedWorkItem");
            WorkItemKeySelector keySelector = new WorkItemKeySelector(inputKvCoder.getKeyCoder(), new SerializablePipelineOptions(context.getPipelineOptions()));
            KeyedStream keyedWorkItemStream = workItemStream.keyBy(keySelector);
            SystemReduceFn reduceFn = SystemReduceFn.buffering((Coder)ByteArrayCoder.of());
            WindowedValue.FullWindowedValueCoder outputCoder = WindowedValue.getFullCoder((Coder)KvCoder.of((Coder)inputKvCoder.getKeyCoder(), (Coder)IterableCoder.of((Coder)ByteArrayCoder.of())), (Coder)windowingStrategy.getWindowFn().windowCoder());
            CoderTypeInformation outputTypeInfo = new CoderTypeInformation(outputCoder, context.getPipelineOptions());
            TupleTag mainTag = new TupleTag("main output");
            String fullName = FlinkStreamingTransformTranslators.getCurrentTransformName(context);
            WindowDoFnOperator doFnOperator = new WindowDoFnOperator(reduceFn, fullName, windowedWorkItemCoder, mainTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(mainTag, outputCoder, new SerializablePipelineOptions(context.getPipelineOptions())), (WindowingStrategy<?, ?>)windowingStrategy, new HashMap(), (Collection<PCollectionView<?>>)Collections.emptyList(), context.getPipelineOptions(), inputKvCoder.getKeyCoder(), keySelector);
            SingleOutputStreamOperator outDataStream = keyedWorkItemStream.transform(fullName, outputTypeInfo, doFnOperator).uid(fullName).flatMap(new ToGroupByKeyResult(context.getPipelineOptions(), inputKvCoder.getValueCoder())).returns(context.getTypeInfo(context.getOutput(transform))).name("ToGBKResult");
            context.setOutputDataStream((PValue)context.getOutput(transform), (DataStream<?>)outDataStream);
        }
    }

    private static class ReshuffleTranslatorStreaming<@UnknownKeyFor K, @UnknownKeyFor InputT>
    extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, InputT>>>> {
        private ReshuffleTranslatorStreaming() {
        }

        @Override
        public void translateNode(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>>> transform, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            DataStream inputDataSet = context.getInputDataStream((PValue)context.getInput(transform));
            context.setOutputDataStream((PValue)context.getOutput(transform), (DataStream<?>)inputDataSet.rebalance());
        }
    }

    private static class WindowAssignTranslator<@UnknownKeyFor T>
    extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<PCollection<T>, PCollection<T>>> {
        private WindowAssignTranslator() {
        }

        @Override
        public void translateNode(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<T>, @UnknownKeyFor @NonNull @Initialized PCollection<T>> transform, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            WindowingStrategy windowingStrategy = context.getOutput(transform).getWindowingStrategy();
            TypeInformation<WindowedValue<T>> typeInfo = context.getTypeInfo(context.getOutput(transform));
            DataStream inputDataStream = context.getInputDataStream((PValue)context.getInput(transform));
            WindowFn windowFn = windowingStrategy.getWindowFn();
            FlinkAssignWindows assignWindowsFunction = new FlinkAssignWindows(windowFn);
            String fullName = context.getOutput(transform).getName();
            SingleOutputStreamOperator outputDataStream = inputDataStream.flatMap(assignWindowsFunction).name(fullName).uid(fullName).returns(typeInfo);
            context.setOutputDataStream((PValue)context.getOutput(transform), (DataStream<?>)outputDataStream);
        }
    }

    private static class CreateViewStreamingTranslator<@UnknownKeyFor ElemT, @UnknownKeyFor ViewT>
    extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<CreateStreamingFlinkView.CreateFlinkPCollectionView<ElemT, ViewT>> {
        private CreateViewStreamingTranslator() {
        }

        @Override
        public void translateNode( @UnknownKeyFor @NonNull @Initialized CreateStreamingFlinkView.CreateFlinkPCollectionView<ElemT, ViewT> transform, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            DataStream inputDataSet = context.getInputDataStream((PValue)context.getInput(transform));
            PCollectionView<ViewT> view = transform.getView();
            context.setOutputDataStream((PValue)view, inputDataSet);
        }
    }

    private static class SplittableProcessElementsStreamingTranslator<@UnknownKeyFor InputT, @UnknownKeyFor OutputT, @UnknownKeyFor RestrictionT, @UnknownKeyFor PositionT, @UnknownKeyFor WatermarkEstimatorStateT>
    extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>> {
        private SplittableProcessElementsStreamingTranslator() {
        }

        @Override
        public void translateNode(// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> transform, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            ParDoTranslationHelper.translateParDo(FlinkStreamingTransformTranslators.getCurrentTransformName(context), transform.newProcessFn(transform.getFn()), (PCollection)context.getInput(transform), transform.getSideInputs(), context.getOutputs(transform), transform.getMainOutputTag(), transform.getAdditionalOutputTags().getAll(), DoFnSchemaInformation.create(), Collections.emptyMap(), context, (doFn, stepName, sideInputs, mainOutputTag, additionalOutputTags, context1, windowingStrategy, tagsToOutputTags, tagsToCoders, tagsToIds, windowedInputCoder, outputCoders1, keyCoder, keySelector, transformedSideInputs, doFnSchemaInformation, sideInputMapping) -> new SplittableDoFnOperator(doFn, stepName, windowedInputCoder, outputCoders1, mainOutputTag, additionalOutputTags, new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags, tagsToCoders, tagsToIds, new SerializablePipelineOptions(context.getPipelineOptions())), windowingStrategy, transformedSideInputs, sideInputs, context1.getPipelineOptions(), keyCoder, keySelector));
        }
    }

    private static class ParDoStreamingTranslator<@UnknownKeyFor InputT, @UnknownKeyFor OutputT>
    extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<PCollection<InputT>, PCollectionTuple>> {
        private ParDoStreamingTranslator() {
        }

        @Override
        public void translateNode(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PCollection<InputT>, @UnknownKeyFor @NonNull @Initialized PCollectionTuple> transform, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            TupleTagList additionalOutputTags;
            List sideInputs;
            TupleTag mainOutputTag;
            DoFn doFn;
            try {
                doFn = ParDoTranslation.getDoFn(context.getCurrentTransform());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            try {
                mainOutputTag = ParDoTranslation.getMainOutputTag(context.getCurrentTransform());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            try {
                sideInputs = ParDoTranslation.getSideInputs(context.getCurrentTransform());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            Map sideInputMapping = ParDoTranslation.getSideInputMapping(context.getCurrentTransform());
            try {
                additionalOutputTags = ParDoTranslation.getAdditionalOutputTags(context.getCurrentTransform());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            DoFnSchemaInformation doFnSchemaInformation = ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
            ParDoTranslationHelper.translateParDo(FlinkStreamingTransformTranslators.getCurrentTransformName(context), doFn, context.getInput(transform), sideInputs, context.getOutputs(transform), mainOutputTag, additionalOutputTags.getAll(), doFnSchemaInformation, sideInputMapping, context, (doFn1, stepName, sideInputs1, mainOutputTag1, additionalOutputTags1, context1, windowingStrategy, tagsToOutputTags, tagsToCoders, tagsToIds, windowedInputCoder, outputCoders1, keyCoder, keySelector, transformedSideInputs, doFnSchemaInformation1, sideInputMapping1) -> new DoFnOperator(doFn1, stepName, windowedInputCoder, outputCoders1, mainOutputTag1, additionalOutputTags1, new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag1, tagsToOutputTags, tagsToCoders, tagsToIds, new SerializablePipelineOptions(context.getPipelineOptions())), windowingStrategy, transformedSideInputs, sideInputs1, context1.getPipelineOptions(), keyCoder, keySelector, doFnSchemaInformation1, sideInputMapping1));
        }
    }

    static class ParDoTranslationHelper {
        ParDoTranslationHelper() {
        }

        static <InputT, OutputT> void translateParDo(@UnknownKeyFor @NonNull @Initialized String transformName, @UnknownKeyFor @NonNull @Initialized DoFn<InputT, OutputT> doFn, @UnknownKeyFor @NonNull @Initialized PCollection<InputT> input, /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> sideInputs, /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> outputs, @UnknownKeyFor @NonNull @Initialized TupleTag<OutputT> mainOutputTag, /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> additionalOutputTags, @UnknownKeyFor @NonNull @Initialized DoFnSchemaInformation doFnSchemaInformation, /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> sideInputMapping, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context, @UnknownKeyFor @NonNull @Initialized DoFnOperatorFactory<InputT, OutputT> doFnOperatorFactory) {
            SingleOutputStreamOperator outputStream;
            WindowingStrategy windowingStrategy = input.getWindowingStrategy();
            HashMap tagsToOutputTags = Maps.newHashMap();
            HashMap tagsToCoders = Maps.newHashMap();
            HashMap tagsToIds = Maps.newHashMap();
            int idCount = 0;
            tagsToIds.put(mainOutputTag, idCount++);
            for (Map.Entry<TupleTag<?>, PCollection<?>> entry : outputs.entrySet()) {
                if (tagsToOutputTags.containsKey(entry.getKey())) continue;
                tagsToOutputTags.put(entry.getKey(), new OutputTag(entry.getKey().getId(), context.getTypeInfo(entry.getValue())));
                tagsToCoders.put(entry.getKey(), context.getWindowedInputCoder(entry.getValue()));
                tagsToIds.put(entry.getKey(), idCount++);
            }
            Coder<WindowedValue<InputT>> windowedInputCoder = context.getWindowedInputCoder(input);
            Map<TupleTag<?>, Coder<?>> outputCoders = context.getOutputCoders();
            KeyedStream inputDataStream = context.getInputDataStream((PValue)input);
            Coder keyCoder = null;
            Object keySelector = null;
            boolean stateful = false;
            DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
            if (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) {
                keyCoder = ((KvCoder)input.getCoder()).getKeyCoder();
                keySelector = new KvToByteBufferKeySelector(keyCoder, new SerializablePipelineOptions(context.getPipelineOptions()));
                PTransform<?, PCollection<InputT>> producer = context.getProducer(input);
                String previousUrn = producer != null ? PTransformTranslation.urnForTransformOrNull(context.getProducer(input)) : null;
                inputDataStream = "beam:transform:combine_per_key:v1".equals(previousUrn) || "beam:transform:group_by_key:v1".equals(previousUrn) ? DataStreamUtils.reinterpretAsKeyedStream(inputDataStream, keySelector) : inputDataStream.keyBy(keySelector);
                stateful = true;
            } else if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) {
                keyCoder = ByteArrayCoder.of();
                keySelector = new WorkItemKeySelector(keyCoder, new SerializablePipelineOptions(context.getPipelineOptions()));
                stateful = true;
            }
            CoderTypeInformation outputTypeInformation = new CoderTypeInformation(context.getWindowedInputCoder(outputs.get(mainOutputTag)), context.getPipelineOptions());
            if (sideInputs.isEmpty()) {
                DoFnOperator<InputT, OutputT> doFnOperator = doFnOperatorFactory.createDoFnOperator(doFn, FlinkStreamingTransformTranslators.getCurrentTransformName(context), sideInputs, mainOutputTag, additionalOutputTags, context, (WindowingStrategy<?, ?>)windowingStrategy, tagsToOutputTags, tagsToCoders, tagsToIds, windowedInputCoder, outputCoders, keyCoder, (KeySelector<WindowedValue<InputT>, ?>)keySelector, new HashMap(), doFnSchemaInformation, sideInputMapping);
                outputStream = inputDataStream.transform(transformName, outputTypeInformation, doFnOperator);
            } else {
                Tuple2 transformedSideInputs = FlinkStreamingTransformTranslators.transformSideInputs(sideInputs, context);
                DoFnOperator<InputT, OutputT> doFnOperator = doFnOperatorFactory.createDoFnOperator(doFn, FlinkStreamingTransformTranslators.getCurrentTransformName(context), sideInputs, mainOutputTag, additionalOutputTags, context, (WindowingStrategy<?, ?>)windowingStrategy, tagsToOutputTags, tagsToCoders, tagsToIds, windowedInputCoder, outputCoders, keyCoder, (KeySelector<WindowedValue<InputT>, ?>)keySelector, (Map)transformedSideInputs.f0, doFnSchemaInformation, sideInputMapping);
                if (stateful) {
                    KeyedStream keyedStream = inputDataStream;
                    TwoInputTransformation rawFlinkTransform = new TwoInputTransformation(keyedStream.getTransformation(), ((DataStream)transformedSideInputs.f1).broadcast().getTransformation(), transformName, doFnOperator, outputTypeInformation, keyedStream.getParallelism());
                    rawFlinkTransform.setStateKeyType(keyedStream.getKeyType());
                    rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null);
                    outputStream = new SingleOutputStreamOperator(keyedStream.getExecutionEnvironment(), (Transformation)rawFlinkTransform){};
                    keyedStream.getExecutionEnvironment().addOperator((Transformation)rawFlinkTransform);
                } else {
                    outputStream = inputDataStream.connect(((DataStream)transformedSideInputs.f1).broadcast()).transform(transformName, outputTypeInformation, doFnOperator);
                }
            }
            outputStream.uid(transformName);
            context.setOutputDataStream((PValue)outputs.get(mainOutputTag), (DataStream<?>)outputStream);
            for (Map.Entry<TupleTag<?>, PCollection<?>> entry : outputs.entrySet()) {
                if (entry.getKey().equals(mainOutputTag)) continue;
                context.setOutputDataStream((PValue)entry.getValue(), outputStream.getSideOutput((OutputTag)tagsToOutputTags.get(entry.getKey())));
            }
        }

        static interface DoFnOperatorFactory<@UnknownKeyFor InputT, @UnknownKeyFor OutputT> {
            public @UnknownKeyFor @NonNull @Initialized DoFnOperator<InputT, OutputT> createDoFnOperator(@UnknownKeyFor @NonNull @Initialized DoFn<InputT, OutputT> var1, @UnknownKeyFor @NonNull @Initialized String var2, /*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> var3, @UnknownKeyFor @NonNull @Initialized TupleTag<OutputT> var4, /*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> var5, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext var6, /*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized WindowingStrategy<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> var7, /*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized OutputTag<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>>> var8, /*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>>> var9, /*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized Integer> var10, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized WindowedValue<InputT>> var11, /*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> var12, @UnknownKeyFor @NonNull @Initialized Coder var13, /*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized KeySelector<@UnknownKeyFor @NonNull @Initialized WindowedValue<InputT>, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> var14, /*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> var15, @UnknownKeyFor @NonNull @Initialized DoFnSchemaInformation var16, /*
             * Issues handling annotations - annotations may be inaccurate
             */
            @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> var17);
        }
    }

    public static class ToRawUnion<@UnknownKeyFor T>
    extends RichMapFunction<T, RawUnionValue> {
        private final @UnknownKeyFor @NonNull @Initialized int intTag;
        private final @UnknownKeyFor @NonNull @Initialized SerializablePipelineOptions options;

        ToRawUnion(@UnknownKeyFor @NonNull @Initialized int intTag, @UnknownKeyFor @NonNull @Initialized PipelineOptions pipelineOptions) {
            this.intTag = intTag;
            this.options = new SerializablePipelineOptions(pipelineOptions);
        }

        public void open(@UnknownKeyFor @NonNull @Initialized Configuration parameters) {
            FileSystems.setDefaultPipelineOptions((PipelineOptions)this.options.get());
        }

        public @UnknownKeyFor @NonNull @Initialized RawUnionValue map(T o) throws @UnknownKeyFor @NonNull @Initialized Exception {
            return new RawUnionValue(this.intTag, o);
        }
    }

    private static class BoundedReadSourceTranslator<@UnknownKeyFor T>
    extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<PBegin, PCollection<T>>> {
        private BoundedReadSourceTranslator() {
        }

        @Override
        public void translateNode(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PBegin, @UnknownKeyFor @NonNull @Initialized PCollection<T>> transform, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            SingleOutputStreamOperator source;
            BoundedSource rawSource;
            PCollection<T> output = context.getOutput(transform);
            TypeInformation<WindowedValue<T>> outputTypeInfo = context.getTypeInfo(context.getOutput(transform));
            try {
                rawSource = ReadTranslation.boundedSourceFromTransform(context.getCurrentTransform());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            String fullName = FlinkStreamingTransformTranslators.getCurrentTransformName(context);
            UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter adaptedRawSource = new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(rawSource);
            try {
                int parallelism = context.getExecutionEnvironment().getMaxParallelism() > 0 ? context.getExecutionEnvironment().getMaxParallelism() : context.getExecutionEnvironment().getParallelism();
                UnboundedSourceWrapperNoValueWithRecordId sourceWrapper = new UnboundedSourceWrapperNoValueWithRecordId(new UnboundedSourceWrapper(fullName, context.getPipelineOptions(), adaptedRawSource, parallelism));
                source = context.getExecutionEnvironment().addSource(sourceWrapper).name(fullName).uid(fullName).returns(outputTypeInfo);
            }
            catch (Exception e) {
                throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e);
            }
            context.setOutputDataStream((PValue)output, (DataStream<?>)source);
        }
    }

    private static class ReadSourceTranslator<@UnknownKeyFor T>
    extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<PBegin, PCollection<T>>> {
        private final @UnknownKeyFor @NonNull @Initialized BoundedReadSourceTranslator<T> boundedTranslator = new BoundedReadSourceTranslator();
        private final @UnknownKeyFor @NonNull @Initialized UnboundedReadSourceTranslator<T> unboundedTranslator = new UnboundedReadSourceTranslator();

        private ReadSourceTranslator() {
        }

        @Override
        void translateNode(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PBegin, @UnknownKeyFor @NonNull @Initialized PCollection<T>> transform, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            if (context.getOutput(transform).isBounded().equals((Object)PCollection.IsBounded.BOUNDED)) {
                this.boundedTranslator.translateNode(transform, context);
            } else {
                this.unboundedTranslator.translateNode(transform, context);
            }
        }
    }

    private static class ImpulseTranslator<@UnknownKeyFor T>
    extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Impulse> {
        private ImpulseTranslator() {
        }

        @Override
        void translateNode(@UnknownKeyFor @NonNull @Initialized Impulse transform, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            CoderTypeInformation typeInfo = new CoderTypeInformation(WindowedValue.getFullCoder((Coder)ByteArrayCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE), context.getPipelineOptions());
            long shutdownAfterIdleSourcesMs = ((FlinkPipelineOptions)context.getPipelineOptions().as(FlinkPipelineOptions.class)).getShutdownSourcesAfterIdleMs();
            SingleOutputStreamOperator source = context.getExecutionEnvironment().addSource((SourceFunction)new ImpulseSourceFunction(shutdownAfterIdleSourcesMs), "Impulse").returns(typeInfo);
            context.setOutputDataStream((PValue)context.getOutput(transform), (DataStream<?>)source);
        }
    }

    public static class StripIdsMap<@UnknownKeyFor T>
    extends RichFlatMapFunction<WindowedValue<ValueWithRecordId<T>>, WindowedValue<T>> {
        private final @UnknownKeyFor @NonNull @Initialized SerializablePipelineOptions options;

        StripIdsMap(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
            this.options = new SerializablePipelineOptions(options);
        }

        public void open(@UnknownKeyFor @NonNull @Initialized Configuration parameters) {
            FileSystems.setDefaultPipelineOptions((PipelineOptions)this.options.get());
        }

        public void flatMap(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<T>> value, @UnknownKeyFor @NonNull @Initialized Collector<@UnknownKeyFor @NonNull @Initialized WindowedValue<T>> collector) throws @UnknownKeyFor @NonNull @Initialized Exception {
            collector.collect((Object)value.withValue(((ValueWithRecordId)value.getValue()).getValue()));
        }
    }

    static class ValueWithRecordIdKeySelector<@UnknownKeyFor T>
    implements KeySelector<WindowedValue<ValueWithRecordId<T>>, ByteBuffer>,
    ResultTypeQueryable<ByteBuffer> {
        ValueWithRecordIdKeySelector() {
        }

        public @UnknownKeyFor @NonNull @Initialized ByteBuffer getKey(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized ValueWithRecordId<T>> value) throws @UnknownKeyFor @NonNull @Initialized Exception {
            return ByteBuffer.wrap(((ValueWithRecordId)value.getValue()).getId());
        }

        public @UnknownKeyFor @NonNull @Initialized TypeInformation<@UnknownKeyFor @NonNull @Initialized ByteBuffer> getProducedType() {
            return new GenericTypeInfo(ByteBuffer.class);
        }
    }

    private static class UnboundedReadSourceTranslator<@UnknownKeyFor T>
    extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<PBegin, PCollection<T>>> {
        private UnboundedReadSourceTranslator() {
        }

        @Override
        public void translateNode(@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PBegin, @UnknownKeyFor @NonNull @Initialized PCollection<T>> transform, @UnknownKeyFor @NonNull @Initialized FlinkStreamingTranslationContext context) {
            SingleOutputStreamOperator source;
            UnboundedSource rawSource;
            PCollection<T> output = context.getOutput(transform);
            TypeInformation<WindowedValue<T>> outputTypeInfo = context.getTypeInfo(context.getOutput(transform));
            Coder coder = context.getOutput(transform).getCoder();
            CoderTypeInformation withIdTypeInfo = new CoderTypeInformation(WindowedValue.getFullCoder((Coder)ValueWithRecordId.ValueWithRecordIdCoder.of((Coder)coder), (Coder)output.getWindowingStrategy().getWindowFn().windowCoder()), context.getPipelineOptions());
            try {
                rawSource = ReadTranslation.unboundedSourceFromTransform(context.getCurrentTransform());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            String fullName = FlinkStreamingTransformTranslators.getCurrentTransformName(context);
            try {
                int parallelism = context.getExecutionEnvironment().getMaxParallelism() > 0 ? context.getExecutionEnvironment().getMaxParallelism() : context.getExecutionEnvironment().getParallelism();
                UnboundedSourceWrapper sourceWrapper = new UnboundedSourceWrapper(fullName, context.getPipelineOptions(), rawSource, parallelism);
                SingleOutputStreamOperator nonDedupSource = context.getExecutionEnvironment().addSource(sourceWrapper).name(fullName).uid(fullName).returns(withIdTypeInfo);
                source = rawSource.requiresDeduping() ? nonDedupSource.keyBy(new ValueWithRecordIdKeySelector()).transform("deduping", outputTypeInfo, new DedupingOperator(context.getPipelineOptions())).uid(String.format("%s/__deduplicated__", fullName)) : nonDedupSource.flatMap(new StripIdsMap(context.getPipelineOptions())).returns(outputTypeInfo);
            }
            catch (Exception e) {
                throw new RuntimeException("Error while translating UnboundedSource: " + rawSource, e);
            }
            context.setOutputDataStream((PValue)output, (DataStream<?>)source);
        }
    }
}

