package org.apache.flink.streaming.api.runners.python.beam;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.graph.TimerReference;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.Constants;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.operators.python.timer.TimerRegistration;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.class */
public class BeamDataStreamPythonFunctionRunner extends BeamPythonFunctionRunner {
    private static final String TRANSFORM_ID_PREFIX = "transform-";
    private static final String COLLECTION_PREFIX = "collection-";
    private static final String CODER_PREFIX = "coder-";

    @Nullable
    private final FlinkFnApi.CoderInfoDescriptor timerCoderDescriptor;
    private final String headOperatorFunctionUrn;
    private final List<FlinkFnApi.UserDefinedDataStreamFunction> userDefinedDataStreamFunctions;

    public BeamDataStreamPythonFunctionRunner(String str, PythonEnvironmentManager pythonEnvironmentManager, String str2, List<FlinkFnApi.UserDefinedDataStreamFunction> list, Map<String, String> map, @Nullable FlinkMetricContainer flinkMetricContainer, KeyedStateBackend<?> keyedStateBackend, TypeSerializer<?> typeSerializer, TypeSerializer<?> typeSerializer2, @Nullable TimerRegistration timerRegistration, MemoryManager memoryManager, double d, FlinkFnApi.CoderInfoDescriptor coderInfoDescriptor, FlinkFnApi.CoderInfoDescriptor coderInfoDescriptor2, FlinkFnApi.CoderInfoDescriptor coderInfoDescriptor3) {
        super(str, pythonEnvironmentManager, map, flinkMetricContainer, keyedStateBackend, typeSerializer, typeSerializer2, timerRegistration, memoryManager, d, coderInfoDescriptor, coderInfoDescriptor2);
        this.headOperatorFunctionUrn = (String) Preconditions.checkNotNull(str2);
        Preconditions.checkArgument(list != null && list.size() >= 1);
        this.userDefinedDataStreamFunctions = list;
        this.timerCoderDescriptor = coderInfoDescriptor3;
    }

    @Override // org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner
    protected void buildTransforms(RunnerApi.Components.Builder builder) {
        int i = 0;
        while (i < this.userDefinedDataStreamFunctions.size() + 1) {
            RunnerApi.ParDoPayload.Builder doFn = RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.FunctionSpec.newBuilder().setUrn(i == 0 ? this.headOperatorFunctionUrn : Constants.STATELESS_FUNCTION_URN).setPayload(ByteString.copyFrom((i < this.userDefinedDataStreamFunctions.size() ? this.userDefinedDataStreamFunctions.get(i) : ProtoUtils.createReviseOutputDataStreamFunctionProto()).toByteArray())).build());
            if (i == 0 && this.timerCoderDescriptor != null) {
                doFn.putTimerFamilySpecs("timer", RunnerApi.TimerFamilySpec.newBuilder().setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME).setTimerFamilyCoderId(Constants.WRAPPER_TIMER_CODER_ID).build());
            }
            String str = TRANSFORM_ID_PREFIX + i;
            RunnerApi.PTransform.Builder spec = RunnerApi.PTransform.newBuilder().setUniqueName(str).setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(BeamUrns.getUrn(RunnerApi.StandardPTransforms.Primitives.PAR_DO)).setPayload(doFn.build().toByteString()).build());
            if (i == 0) {
                spec.putInputs("input", "input");
            } else {
                spec.putInputs("input", COLLECTION_PREFIX + (i - 1));
            }
            if (i == this.userDefinedDataStreamFunctions.size()) {
                spec.putOutputs("output", "output");
            } else {
                spec.putOutputs("output", COLLECTION_PREFIX + i);
                builder.putPcollections(COLLECTION_PREFIX + i, RunnerApi.PCollection.newBuilder().setWindowingStrategyId(Constants.WINDOW_STRATEGY).setCoderId(CODER_PREFIX + i).build()).putCoders(CODER_PREFIX + i, ProtoUtils.createCoderProto(this.inputCoderDescriptor));
            }
            builder.putTransforms(str, spec.build());
            i++;
        }
    }

    @Override // org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner
    protected List<TimerReference> getTimers(RunnerApi.Components components) {
        return this.timerCoderDescriptor != null ? Collections.singletonList(TimerReference.fromTimerId(RunnerApi.ExecutableStagePayload.TimerId.newBuilder().setTransformId("transform-0").setLocalName("timer").build(), components)) : Collections.emptyList();
    }

    @Override // org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner
    protected Optional<RunnerApi.Coder> getOptionalTimerCoderProto() {
        return this.timerCoderDescriptor != null ? Optional.of(ProtoUtils.createCoderProto(this.timerCoderDescriptor)) : Optional.empty();
    }
}
