package org.apache.beam.runners.flink.translation.functions;

import java.util.Iterator;
import java.util.Map;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.translation.utils.FlinkClassloading;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.class */
public class FlinkDoFnFunction<InputT, OutputT> extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> {
    private final SerializablePipelineOptions serializedOptions;
    private final DoFn<InputT, OutputT> doFn;
    private final String stepName;
    private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
    private final WindowingStrategy<?, ?> windowingStrategy;
    private final Map<TupleTag<?>, Integer> outputMap;
    private final TupleTag<OutputT> mainOutputTag;
    private final Coder<InputT> inputCoder;
    private final Map<TupleTag<?>, Coder<?>> outputCoderMap;
    private transient DoFnInvoker<InputT, OutputT> doFnInvoker;

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction$DoFnOutputManager.class */
    static class DoFnOutputManager implements DoFnRunners.OutputManager {
        private Collector collector;

        /* JADX INFO: Access modifiers changed from: package-private */
        public DoFnOutputManager(Collector collector) {
            this.collector = collector;
        }

        @Override // org.apache.beam.runners.core.DoFnRunners.OutputManager
        public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            this.collector.collect(WindowedValue.of(new RawUnionValue(0, windowedValue.getValue()), windowedValue.getTimestamp(), windowedValue.getWindows(), windowedValue.getPane()));
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction$MultiDoFnOutputManager.class */
    static class MultiDoFnOutputManager implements DoFnRunners.OutputManager {
        private Collector<WindowedValue<RawUnionValue>> collector;
        private Map<TupleTag<?>, Integer> outputMap;

        /* JADX INFO: Access modifiers changed from: package-private */
        public MultiDoFnOutputManager(Collector<WindowedValue<RawUnionValue>> collector, Map<TupleTag<?>, Integer> map) {
            this.collector = collector;
            this.outputMap = map;
        }

        @Override // org.apache.beam.runners.core.DoFnRunners.OutputManager
        public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            this.collector.collect(WindowedValue.of(new RawUnionValue(this.outputMap.get(tupleTag).intValue(), windowedValue.getValue()), windowedValue.getTimestamp(), windowedValue.getWindows(), windowedValue.getPane()));
        }
    }

    public FlinkDoFnFunction(DoFn<InputT, OutputT> doFn, String str, WindowingStrategy<?, ?> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> map, PipelineOptions pipelineOptions, Map<TupleTag<?>, Integer> map2, TupleTag<OutputT> tupleTag, Coder<InputT> coder, Map<TupleTag<?>, Coder<?>> map3) {
        this.doFn = doFn;
        this.stepName = str;
        this.sideInputs = map;
        this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
        this.windowingStrategy = windowingStrategy;
        this.outputMap = map2;
        this.mainOutputTag = tupleTag;
        this.inputCoder = coder;
        this.outputCoderMap = map3;
    }

    @Override // org.apache.flink.api.common.functions.RichMapPartitionFunction, org.apache.flink.api.common.functions.MapPartitionFunction
    public void mapPartition(Iterable<WindowedValue<InputT>> iterable, Collector<WindowedValue<OutputT>> collector) throws Exception {
        RuntimeContext runtimeContext = getRuntimeContext();
        DoFnRunner simpleRunner = DoFnRunners.simpleRunner(this.serializedOptions.get(), this.doFn, new FlinkSideInputReader(this.sideInputs, runtimeContext), this.outputMap.size() == 1 ? new DoFnOutputManager(collector) : new MultiDoFnOutputManager(collector, this.outputMap), this.mainOutputTag, Lists.newArrayList(this.outputMap.keySet()), new FlinkNoOpStepContext(), this.inputCoder, this.outputCoderMap, this.windowingStrategy);
        if (((FlinkPipelineOptions) this.serializedOptions.get().as(FlinkPipelineOptions.class)).getEnableMetrics().booleanValue()) {
            simpleRunner = new DoFnRunnerWithMetricsUpdate(this.stepName, simpleRunner, getRuntimeContext());
        }
        simpleRunner.startBundle();
        Iterator<WindowedValue<InputT>> it = iterable.iterator();
        while (it.hasNext()) {
            simpleRunner.processElement(it.next());
        }
        simpleRunner.finishBundle();
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void open(Configuration configuration) throws Exception {
        this.doFnInvoker = DoFnInvokers.invokerFor(this.doFn);
        this.doFnInvoker.invokeSetup();
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void close() throws Exception {
        try {
            this.doFnInvoker.invokeTeardown();
        } finally {
            FlinkClassloading.deleteStaticCaches();
        }
    }
}
