package org.apache.beam.runners.flink.adapter;

import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.class */
public class BeamFlinkDataStreamAdapter {
    private final PipelineOptions pipelineOptions;
    private final CoderRegistry coderRegistry;

    /* loaded from: input_file:org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter$UnwrapWindowOperator.class */
    private static class UnwrapWindowOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<WindowedValue<T>, T> {
        private UnwrapWindowOperator() {
        }

        public void processElement(StreamRecord<WindowedValue<T>> streamRecord) {
            this.output.collect(streamRecord.replace(((WindowedValue) streamRecord.getValue()).getValue(), ((WindowedValue) streamRecord.getValue()).getTimestamp().getMillis()));
        }
    }

    public BeamFlinkDataStreamAdapter() {
        this(PipelineOptionsFactory.create());
    }

    public BeamFlinkDataStreamAdapter(PipelineOptions pipelineOptions) {
        this.coderRegistry = CoderRegistry.createDefault();
        this.pipelineOptions = pipelineOptions;
    }

    public <InputT, OutputT, CollectionT extends PCollection<? extends InputT>> DataStream<OutputT> applyBeamPTransform(DataStream<InputT> dataStream, PTransform<CollectionT, PCollection<OutputT>> pTransform) {
        return (DataStream) getNonNull(applyBeamPTransformInternal(ImmutableMap.of("input", dataStream), (pipeline, map) -> {
            return (PCollection) getNonNull(map, "input");
        }, pCollection -> {
            return ImmutableMap.of("output", pCollection);
        }, pTransform, dataStream.getExecutionEnvironment()), "output");
    }

    public <OutputT> DataStream<OutputT> applyBeamPTransform(Map<String, ? extends DataStream<?>> map, PTransform<PCollectionTuple, PCollection<OutputT>> pTransform) {
        return (DataStream) getNonNull(applyBeamPTransformInternal(map, BeamAdapterUtils::mapToTuple, pCollection -> {
            return ImmutableMap.of("output", pCollection);
        }, pTransform, map.values().stream().findAny().get().getExecutionEnvironment()), "output");
    }

    public <OutputT> DataStream<OutputT> applyBeamPTransform(StreamExecutionEnvironment streamExecutionEnvironment, PTransform<PBegin, PCollection<OutputT>> pTransform) {
        return (DataStream) getNonNull(applyBeamPTransformInternal(ImmutableMap.of(), (pipeline, map) -> {
            return PBegin.in(pipeline);
        }, pCollection -> {
            return ImmutableMap.of("output", pCollection);
        }, pTransform, streamExecutionEnvironment), "output");
    }

    public <InputT, CollectionT extends PCollection<? extends InputT>> Map<String, DataStream<?>> applyMultiOutputBeamPTransform(DataStream<InputT> dataStream, PTransform<CollectionT, PCollectionTuple> pTransform) {
        return applyBeamPTransformInternal(ImmutableMap.of("input", dataStream), (pipeline, map) -> {
            return (PCollection) getNonNull(map, "input");
        }, BeamAdapterUtils::tupleToMap, pTransform, dataStream.getExecutionEnvironment());
    }

    public Map<String, DataStream<?>> applyMultiOutputBeamPTransform(Map<String, ? extends DataStream<?>> map, PTransform<PCollectionTuple, PCollectionTuple> pTransform) {
        return applyBeamPTransformInternal(map, BeamAdapterUtils::mapToTuple, BeamAdapterUtils::tupleToMap, pTransform, map.values().stream().findAny().get().getExecutionEnvironment());
    }

    public Map<String, DataStream<?>> applyMultiOutputBeamPTransform(StreamExecutionEnvironment streamExecutionEnvironment, PTransform<PBegin, PCollectionTuple> pTransform) {
        return applyBeamPTransformInternal(ImmutableMap.of(), (pipeline, map) -> {
            return PBegin.in(pipeline);
        }, BeamAdapterUtils::tupleToMap, pTransform, streamExecutionEnvironment);
    }

    public <InputT, CollectionT extends PCollection<? extends InputT>> void applyNoOutputBeamPTransform(DataStream<InputT> dataStream, PTransform<CollectionT, PDone> pTransform) {
        applyBeamPTransformInternal(ImmutableMap.of("input", dataStream), (pipeline, map) -> {
            return (PCollection) getNonNull(map, "input");
        }, pDone -> {
            return ImmutableMap.of();
        }, pTransform, dataStream.getExecutionEnvironment());
    }

    public void applyNoOutputBeamPTransform(Map<String, ? extends DataStream<?>> map, PTransform<PCollectionTuple, PDone> pTransform) {
        applyBeamPTransformInternal(map, BeamAdapterUtils::mapToTuple, pDone -> {
            return ImmutableMap.of();
        }, pTransform, map.values().stream().findAny().get().getExecutionEnvironment());
    }

    public void applyNoOutputBeamPTransform(StreamExecutionEnvironment streamExecutionEnvironment, PTransform<PBegin, PDone> pTransform) {
        applyBeamPTransformInternal(ImmutableMap.of(), (pipeline, map) -> {
            return PBegin.in(pipeline);
        }, pDone -> {
            return ImmutableMap.of();
        }, pTransform, streamExecutionEnvironment);
    }

    private <BeamInputT extends PInput, BeamOutputT extends POutput> Map<String, DataStream<?>> applyBeamPTransformInternal(Map<String, ? extends DataStream<?>> map, BiFunction<Pipeline, Map<String, PCollection<?>>, BeamInputT> biFunction, Function<BeamOutputT, Map<String, PCollection<?>>> function, PTransform<? super BeamInputT, BeamOutputT> pTransform, StreamExecutionEnvironment streamExecutionEnvironment) {
        return BeamAdapterUtils.applyBeamPTransformInternal(map, biFunction, function, pTransform, streamExecutionEnvironment, false, dataStream -> {
            return dataStream.getType();
        }, this.pipelineOptions, this.coderRegistry, (map2, pipeline, streamExecutionEnvironment2) -> {
            HashMap hashMap = new HashMap();
            FlinkStreamingPortablePipelineTranslator flinkStreamingPortablePipelineTranslator = new FlinkStreamingPortablePipelineTranslator(ImmutableMap.of(FlinkInput.URN, flinkInputTranslator(map2), FlinkOutput.URN, flinkOutputTranslator(hashMap)));
            flinkStreamingPortablePipelineTranslator.translate(flinkStreamingPortablePipelineTranslator.createTranslationContext(JobInfo.create("unusedJobId", "unusedJobName", "unusedRetrievalToken", PipelineOptionsTranslation.toProto(this.pipelineOptions)), (FlinkPipelineOptions) this.pipelineOptions.as(FlinkPipelineOptions.class), streamExecutionEnvironment2), flinkStreamingPortablePipelineTranslator.prepareForTranslation(pipeline));
            return hashMap;
        });
    }

    private <InputT> FlinkStreamingPortablePipelineTranslator.PTransformTranslator<FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext> flinkInputTranslator(Map<String, ? extends DataStream<?>> map) {
        return (str, pipeline, streamingTranslationContext) -> {
            RunnerApi.PTransform transformsOrThrow = pipeline.getComponents().getTransformsOrThrow(str);
            String stringUtf8 = transformsOrThrow.getSpec().getPayload().toStringUtf8();
            DataStream dataStream = (DataStream) Preconditions.checkStateNotNull((DataStream) map.get(stringUtf8), "missing input referenced in proto: " + stringUtf8);
            streamingTranslationContext.addDataStream((String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values()), dataStream.process(new ProcessFunction<InputT, WindowedValue<InputT>>() { // from class: org.apache.beam.runners.flink.adapter.BeamFlinkDataStreamAdapter.1
                public void processElement(InputT inputt, ProcessFunction<InputT, WindowedValue<InputT>>.Context context, Collector<WindowedValue<InputT>> collector) throws Exception {
                    collector.collect(WindowedValue.timestampedValueInGlobalWindow(inputt, context.timestamp() == null ? BoundedWindow.TIMESTAMP_MIN_VALUE : Instant.ofEpochMilli(context.timestamp().longValue())));
                }
            }, BeamAdapterCoderUtils.coderToTypeInformation(WindowedValue.getFullCoder(BeamAdapterCoderUtils.typeInformationToCoder(dataStream.getType(), this.coderRegistry), GlobalWindow.Coder.INSTANCE), this.pipelineOptions)));
        };
    }

    private <InputT> FlinkStreamingPortablePipelineTranslator.PTransformTranslator<FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext> flinkOutputTranslator(Map<String, DataStream<?>> map) {
        return (str, pipeline, streamingTranslationContext) -> {
            RunnerApi.PTransform transformsOrThrow = pipeline.getComponents().getTransformsOrThrow(str);
            map.put(transformsOrThrow.getSpec().getPayload().toStringUtf8(), streamingTranslationContext.getDataStreamOrThrow((String) Iterables.getOnlyElement(transformsOrThrow.getInputsMap().values())).transform("StripWindows", BeamAdapterCoderUtils.coderToTypeInformation(BeamAdapterCoderUtils.lookupCoder(pipeline, (String) Iterables.getOnlyElement(transformsOrThrow.getInputsMap().values())), this.pipelineOptions), new UnwrapWindowOperator()));
        };
    }

    private <K, V> V getNonNull(Map<K, V> map, K k) {
        return (V) Preconditions.checkStateNotNull(map.get(Preconditions.checkArgumentNotNull(k)));
    }
}
