package org.apache.beam.runners.flink;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.Concatenate;
import org.apache.beam.runners.core.construction.ExecutableStageTranslation;
import org.apache.beam.runners.core.construction.NativeTransforms;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.flink.FlinkPortablePipelineTranslator;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContextFactory;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.types.KvKeySelector;
import org.apache.beam.runners.flink.translation.utils.FlinkPortableRunnerUtils;
import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils;
import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.GroupCombineOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.operators.SortedGrouping;
import org.apache.flink.api.java.operators.UnsortedGrouping;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.class */
public class FlinkBatchPortablePipelineTranslator implements FlinkPortablePipelineTranslator<BatchTranslationContext> {
    private final Map<String, PTransformTranslator> urnToTransformTranslator;

    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator$BatchTranslationContext.class */
    public static class BatchTranslationContext implements FlinkPortablePipelineTranslator.TranslationContext, FlinkPortablePipelineTranslator.Executor {
        private final JobInfo jobInfo;
        private final FlinkPipelineOptions options;
        private final ExecutionEnvironment executionEnvironment;
        private final Map<String, DataSet<?>> dataSets;
        private final Set<String> danglingDataSets;

        private BatchTranslationContext(JobInfo jobInfo, FlinkPipelineOptions flinkPipelineOptions, ExecutionEnvironment executionEnvironment) {
            this.jobInfo = jobInfo;
            this.options = flinkPipelineOptions;
            this.executionEnvironment = executionEnvironment;
            this.dataSets = new HashMap();
            this.danglingDataSets = new HashSet();
        }

        @Override // org.apache.beam.runners.flink.FlinkPortablePipelineTranslator.TranslationContext
        public JobInfo getJobInfo() {
            return this.jobInfo;
        }

        @Override // org.apache.beam.runners.flink.FlinkPortablePipelineTranslator.TranslationContext
        public FlinkPipelineOptions getPipelineOptions() {
            return this.options;
        }

        @Override // org.apache.beam.runners.flink.FlinkPortablePipelineTranslator.Executor
        public JobExecutionResult execute(String str) throws Exception {
            return getExecutionEnvironment().execute(str);
        }

        public ExecutionEnvironment getExecutionEnvironment() {
            return this.executionEnvironment;
        }

        public <T> void addDataSet(String str, DataSet<T> dataSet) {
            Preconditions.checkArgument(!this.dataSets.containsKey(str));
            this.dataSets.put(str, dataSet);
            this.danglingDataSets.add(str);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public <T> DataSet<T> getDataSetOrThrow(String str) {
            DataSet<?> dataSet = this.dataSets.get(str);
            if (dataSet == null) {
                throw new IllegalArgumentException(String.format("Unknown dataset for id %s.", str));
            }
            this.danglingDataSets.remove(str);
            return dataSet;
        }

        public Collection<DataSet<?>> getDanglingDataSets() {
            return (Collection) this.danglingDataSets.stream().map(str -> {
                return this.dataSets.get(str);
            }).collect(Collectors.toList());
        }
    }

    @AutoService({NativeTransforms.IsNativeTransform.class})
    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator$IsFlinkNativeTransform.class */
    public static class IsFlinkNativeTransform implements NativeTransforms.IsNativeTransform {
        public boolean test(RunnerApi.PTransform pTransform) {
            return "beam:transform:reshuffle:v1".equals(PTransformTranslation.urnForTransformOrNull(pTransform));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator$PTransformTranslator.class */
    public interface PTransformTranslator {
        void translate(PipelineNode.PTransformNode pTransformNode, RunnerApi.Pipeline pipeline, BatchTranslationContext batchTranslationContext);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.beam.runners.flink.FlinkPortablePipelineTranslator
    public BatchTranslationContext createTranslationContext(JobInfo jobInfo, FlinkPipelineOptions flinkPipelineOptions, String str, List<String> list) {
        return new BatchTranslationContext(jobInfo, flinkPipelineOptions, FlinkExecutionEnvironments.createBatchExecutionEnvironment(flinkPipelineOptions, list, str));
    }

    public static FlinkBatchPortablePipelineTranslator createTranslator() {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        builder.put("beam:transform:flatten:v1", FlinkBatchPortablePipelineTranslator::translateFlatten);
        builder.put("beam:transform:group_by_key:v1", FlinkBatchPortablePipelineTranslator::translateGroupByKey);
        builder.put("beam:transform:impulse:v1", FlinkBatchPortablePipelineTranslator::translateImpulse);
        builder.put("beam:runner:executable_stage:v1", FlinkBatchPortablePipelineTranslator::translateExecutableStage);
        builder.put("beam:transform:reshuffle:v1", FlinkBatchPortablePipelineTranslator::translateReshuffle);
        return new FlinkBatchPortablePipelineTranslator(builder.build());
    }

    private FlinkBatchPortablePipelineTranslator(Map<String, PTransformTranslator> map) {
        this.urnToTransformTranslator = map;
    }

    @Override // org.apache.beam.runners.flink.FlinkPortablePipelineTranslator
    public Set<String> knownUrns() {
        return this.urnToTransformTranslator.keySet();
    }

    @Override // org.apache.beam.runners.flink.FlinkPortablePipelineTranslator
    public FlinkPortablePipelineTranslator.Executor translate(BatchTranslationContext batchTranslationContext, RunnerApi.Pipeline pipeline) {
        for (PipelineNode.PTransformNode pTransformNode : QueryablePipeline.forTransforms(pipeline.getRootTransformIdsList(), pipeline.getComponents()).getTopologicallyOrderedTransforms()) {
            this.urnToTransformTranslator.getOrDefault(pTransformNode.getTransform().getSpec().getUrn(), FlinkBatchPortablePipelineTranslator::urnNotFound).translate(pTransformNode, pipeline, batchTranslationContext);
        }
        Iterator<DataSet<?>> it = batchTranslationContext.getDanglingDataSets().iterator();
        while (it.hasNext()) {
            it.next().output(new DiscardingOutputFormat()).name("DiscardingOutput");
        }
        return batchTranslationContext;
    }

    private static <K, V> void translateReshuffle(PipelineNode.PTransformNode pTransformNode, RunnerApi.Pipeline pipeline, BatchTranslationContext batchTranslationContext) {
        batchTranslationContext.addDataSet((String) Iterables.getOnlyElement(pTransformNode.getTransform().getOutputsMap().values()), batchTranslationContext.getDataSetOrThrow((String) Iterables.getOnlyElement(pTransformNode.getTransform().getInputsMap().values())).rebalance());
    }

    private static <InputT> void translateExecutableStage(PipelineNode.PTransformNode pTransformNode, RunnerApi.Pipeline pipeline, BatchTranslationContext batchTranslationContext) {
        MapPartitionOperator groupReduceOperator;
        RunnerApi.Components components = pipeline.getComponents();
        Map outputsMap = pTransformNode.getTransform().getOutputsMap();
        BiMap createOutputMap = PipelineTranslatorUtils.createOutputMap(outputsMap.values());
        ArrayList newArrayList = Lists.newArrayList();
        HashMap newHashMap = Maps.newHashMap();
        for (String str : new TreeMap((Map) createOutputMap.inverse()).values()) {
            try {
                Coder instantiateRunnerWireCoder = WireCoders.instantiateRunnerWireCoder(PipelineNode.pCollection(str, components.getPcollectionsOrThrow(str)), components);
                newHashMap.put(str, instantiateRunnerWireCoder);
                newArrayList.add(instantiateRunnerWireCoder);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        CoderTypeInformation coderTypeInformation = new CoderTypeInformation(UnionCoder.of(newArrayList), batchTranslationContext.getPipelineOptions());
        try {
            RunnerApi.ExecutableStagePayload parseFrom = RunnerApi.ExecutableStagePayload.parseFrom(pTransformNode.getTransform().getSpec().getPayload());
            String input = parseFrom.getInput();
            WindowedValue.FullWindowedValueCoder instantiateCoder = PipelineTranslatorUtils.instantiateCoder(input, components);
            DataSet dataSetOrThrow = batchTranslationContext.getDataSetOrThrow(input);
            FlinkExecutableStageFunction flinkExecutableStageFunction = new FlinkExecutableStageFunction(pTransformNode.getTransform().getUniqueName(), batchTranslationContext.getPipelineOptions(), parseFrom, batchTranslationContext.getJobInfo(), createOutputMap, FlinkExecutableStageContextFactory.getInstance(), PipelineTranslatorUtils.getWindowingStrategy(input, components).getWindowFn().windowCoder(), instantiateCoder);
            String generateNameFromStagePayload = ExecutableStageTranslation.generateNameFromStagePayload(parseFrom);
            if (parseFrom.getUserStatesCount() > 0 || parseFrom.getTimersCount() > 0) {
                KvCoder valueCoder = instantiateCoder.getValueCoder();
                if (!(valueCoder instanceof KvCoder)) {
                    throw new IllegalStateException(String.format(Locale.ENGLISH, "The element coder for stateful DoFn '%s' must be KvCoder but is: %s", input, valueCoder.getClass().getSimpleName()));
                }
                SortedGrouping groupBy = dataSetOrThrow.groupBy(new KvKeySelector(valueCoder.getKeyCoder()));
                if (FlinkPortableRunnerUtils.requiresTimeSortedInput(parseFrom, false)) {
                    groupBy = ((UnsortedGrouping) groupBy).sortGroup((v0) -> {
                        return v0.getTimestamp();
                    }, Order.ASCENDING);
                }
                groupReduceOperator = new GroupReduceOperator(groupBy, coderTypeInformation, flinkExecutableStageFunction, generateNameFromStagePayload);
            } else {
                groupReduceOperator = new MapPartitionOperator(dataSetOrThrow, coderTypeInformation, flinkExecutableStageFunction, generateNameFromStagePayload);
            }
            for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId : parseFrom.getSideInputsList()) {
                String inputsOrThrow = parseFrom.getComponents().getTransformsOrThrow(sideInputId.getTransformId()).getInputsOrThrow(sideInputId.getLocalName());
                groupReduceOperator.withBroadcastSet(batchTranslationContext.getDataSetOrThrow(inputsOrThrow), inputsOrThrow);
            }
            for (String str2 : outputsMap.values()) {
                pruneOutput(groupReduceOperator, batchTranslationContext, ((Integer) createOutputMap.get(str2)).intValue(), (Coder) newHashMap.get(str2), str2);
            }
            if (outputsMap.isEmpty()) {
                groupReduceOperator.output(new DiscardingOutputFormat()).name("DiscardingOutput");
            }
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
    }

    private static <T> void translateFlatten(PipelineNode.PTransformNode pTransformNode, RunnerApi.Pipeline pipeline, BatchTranslationContext batchTranslationContext) {
        Map inputsMap = pTransformNode.getTransform().getInputsMap();
        SingleInputUdfOperator singleInputUdfOperator = null;
        if (inputsMap.isEmpty()) {
            singleInputUdfOperator = batchTranslationContext.getExecutionEnvironment().fromElements(new String[]{"dummy"}).flatMap((str, collector) -> {
            }).returns(new CoderTypeInformation(WindowedValue.getFullCoder(VoidCoder.of(), GlobalWindow.Coder.INSTANCE), batchTranslationContext.getPipelineOptions()));
        } else {
            Iterator it = inputsMap.values().iterator();
            while (it.hasNext()) {
                SingleInputUdfOperator dataSetOrThrow = batchTranslationContext.getDataSetOrThrow((String) it.next());
                singleInputUdfOperator = singleInputUdfOperator == null ? dataSetOrThrow : singleInputUdfOperator.union(dataSetOrThrow);
            }
        }
        batchTranslationContext.addDataSet((String) Iterables.getOnlyElement(pTransformNode.getTransform().getOutputsMap().values()), singleInputUdfOperator.filter(windowedValue -> {
            return true;
        }).name("UnionFixFilter"));
    }

    private static <K, V> void translateGroupByKey(PipelineNode.PTransformNode pTransformNode, RunnerApi.Pipeline pipeline, BatchTranslationContext batchTranslationContext) {
        RunnerApi.Components components = pipeline.getComponents();
        String str = (String) Iterables.getOnlyElement(pTransformNode.getTransform().getInputsMap().values());
        PipelineNode.PCollectionNode pCollection = PipelineNode.pCollection(str, components.getPcollectionsOrThrow(str));
        DataSet dataSetOrThrow = batchTranslationContext.getDataSetOrThrow(str);
        RunnerApi.WindowingStrategy windowingStrategiesOrThrow = pipeline.getComponents().getWindowingStrategiesOrThrow(pipeline.getComponents().getPcollectionsOrThrow(str).getWindowingStrategyId());
        try {
            WindowingStrategy fromProto = WindowingStrategyTranslation.fromProto(windowingStrategiesOrThrow, RehydratedComponents.forComponents(pipeline.getComponents()));
            try {
                KvCoder valueCoder = WireCoders.instantiateRunnerWireCoder(pCollection, pipeline.getComponents()).getValueCoder();
                Concatenate concatenate = new Concatenate();
                CoderTypeInformation coderTypeInformation = new CoderTypeInformation(WindowedValue.getFullCoder(KvCoder.of(valueCoder.getKeyCoder(), concatenate.getAccumulatorCoder(CoderRegistry.createDefault(), valueCoder.getValueCoder())), fromProto.getWindowFn().windowCoder()), batchTranslationContext.getPipelineOptions());
                batchTranslationContext.addDataSet((String) Iterables.getOnlyElement(pTransformNode.getTransform().getOutputsMap().values()), new GroupReduceOperator(new GroupCombineOperator(dataSetOrThrow.groupBy(new KvKeySelector(valueCoder.getKeyCoder())), coderTypeInformation, new FlinkPartialReduceFunction(concatenate, fromProto, Collections.emptyMap(), batchTranslationContext.getPipelineOptions()), "GroupCombine: " + pTransformNode.getTransform().getUniqueName()).groupBy(new KvKeySelector(valueCoder.getKeyCoder())), coderTypeInformation, new FlinkReduceFunction(concatenate, fromProto, Collections.emptyMap(), batchTranslationContext.getPipelineOptions()), pTransformNode.getTransform().getUniqueName()));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } catch (InvalidProtocolBufferException e2) {
            throw new IllegalStateException(String.format("Unable to hydrate GroupByKey windowing strategy %s.", windowingStrategiesOrThrow), e2);
        }
    }

    private static void translateImpulse(PipelineNode.PTransformNode pTransformNode, RunnerApi.Pipeline pipeline, BatchTranslationContext batchTranslationContext) {
        batchTranslationContext.addDataSet((String) Iterables.getOnlyElement(pTransformNode.getTransform().getOutputsMap().values()), new DataSource(batchTranslationContext.getExecutionEnvironment(), new ImpulseInputFormat(), new CoderTypeInformation(WindowedValue.getFullCoder(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE), batchTranslationContext.getPipelineOptions()), pTransformNode.getTransform().getUniqueName()).name("Impulse"));
    }

    private static void urnNotFound(PipelineNode.PTransformNode pTransformNode, RunnerApi.Pipeline pipeline, BatchTranslationContext batchTranslationContext) {
        throw new IllegalArgumentException(String.format("Unknown type of URN %s for PTransform with id %s.", pTransformNode.getTransform().getSpec().getUrn(), pTransformNode.getId()));
    }

    private static void pruneOutput(DataSet<RawUnionValue> dataSet, BatchTranslationContext batchTranslationContext, int i, Coder<WindowedValue<?>> coder, String str) {
        batchTranslationContext.addDataSet(str, new FlatMapOperator(dataSet, new CoderTypeInformation(coder, batchTranslationContext.getPipelineOptions()), new FlinkExecutableStagePruningFunction(i, batchTranslationContext.getPipelineOptions()), String.format("ExtractOutput[%s]", Integer.valueOf(i))));
    }

    @Override // org.apache.beam.runners.flink.FlinkPortablePipelineTranslator
    public /* bridge */ /* synthetic */ BatchTranslationContext createTranslationContext(JobInfo jobInfo, FlinkPipelineOptions flinkPipelineOptions, String str, List list) {
        return createTranslationContext(jobInfo, flinkPipelineOptions, str, (List<String>) list);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1661323511:
                if (implMethodName.equals("lambda$translateFlatten$e667b209$1")) {
                    z = true;
                    break;
                }
                break;
            case 45521504:
                if (implMethodName.equals("getTimestamp")) {
                    z = 2;
                    break;
                }
                break;
            case 1865502353:
                if (implMethodName.equals("lambda$translateFlatten$a7a5db8c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("filter") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/util/WindowedValue;)Z")) {
                    return windowedValue -> {
                        return true;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("flatMap") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/flink/util/Collector;)V") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/util/Collector;)V")) {
                    return (str, collector) -> {
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/util/WindowedValue") && serializedLambda.getImplMethodSignature().equals("()Lorg/joda/time/Instant;")) {
                    return (v0) -> {
                        return v0.getTimestamp();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
