/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.translators;

import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.BatchCommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.BatchGlobalCommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.CommittableTypeInformation;
import org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.StatelessSinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.StreamingCommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.StreamingGlobalCommitterOperatorFactory;
import org.apache.flink.streaming.util.graph.StreamGraphUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SinkTransformationTranslator<InputT, CommT, WriterStateT, GlobalCommT>
implements TransformationTranslator<Object, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT>> {
    protected static final Logger LOG = LoggerFactory.getLogger(SinkTransformationTranslator.class);
    private static final String PREVIOUS_SINK_STATE_NAME = "bucket-states";

    @Override
    public Collection<Integer> translateForBatch(SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> transformation, TransformationTranslator.Context context) {
        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), transformation);
        int parallelism = this.getParallelism(transformation, context);
        try {
            this.internalTranslate(transformation, parallelism, PREVIOUS_SINK_STATE_NAME, new BatchCommitterOperatorFactory<CommT>(transformation.getSink()), 1, 1, new BatchGlobalCommitterOperatorFactory<CommT, GlobalCommT>(transformation.getSink()), context);
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Could not add the Committer or GlobalCommitter to the stream graph.", (Throwable)e);
        }
        return Collections.emptyList();
    }

    @Override
    public Collection<Integer> translateForStreaming(SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> transformation, TransformationTranslator.Context context) {
        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), transformation);
        int parallelism = this.getParallelism(transformation, context);
        try {
            this.internalTranslate(transformation, parallelism, PREVIOUS_SINK_STATE_NAME, new StreamingCommitterOperatorFactory<CommT>(transformation.getSink()), parallelism, transformation.getMaxParallelism(), new StreamingGlobalCommitterOperatorFactory<CommT, GlobalCommT>(transformation.getSink()), context);
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Could not add the Committer or GlobalCommitter to the stream graph.", (Throwable)e);
        }
        return Collections.emptyList();
    }

    private void internalTranslate(SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, int writerParallelism, @Nullable String previousSinkStateName, OneInputStreamOperatorFactory<CommT, CommT> committerFactory, int committerParallelism, int committerMaxParallelism, OneInputStreamOperatorFactory<CommT, GlobalCommT> globalCommitterFactory, TransformationTranslator.Context context) throws IOException {
        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), sinkTransformation);
        int writerId = this.addWriter(sinkTransformation, writerParallelism, previousSinkStateName, context);
        int committerId = this.addCommitter(writerId, sinkTransformation, committerFactory, committerParallelism, committerMaxParallelism, context);
        this.addGlobalCommitter(committerId > 0 ? committerId : writerId, sinkTransformation, globalCommitterFactory, context);
    }

    private int addWriter(SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, int parallelism, @Nullable String previousSinkStateName, TransformationTranslator.Context context) {
        boolean hasState = sinkTransformation.getSink().getWriterStateSerializer().isPresent();
        Preconditions.checkState((sinkTransformation.getInputs().size() == 1 ? 1 : 0) != 0);
        Transformation<?> input = sinkTransformation.getInputs().get(0);
        TypeInformation inputTypeInfo = input.getOutputType();
        AbstractSinkWriterOperatorFactory writer = hasState ? new StatefulSinkWriterOperatorFactory<InputT, CommT, WriterStateT>(sinkTransformation.getSink(), previousSinkStateName) : new StatelessSinkWriterOperatorFactory<InputT, CommT>(sinkTransformation.getSink());
        String prefix = "Sink Writer:";
        ChainingStrategy chainingStrategy = sinkTransformation.getChainingStrategy();
        if (chainingStrategy != null) {
            writer.setChainingStrategy(chainingStrategy);
        }
        return this.addOperatorToStreamGraph(writer, context.getStreamNodeIds(input), inputTypeInfo, this.extractCommittableTypeInformation(sinkTransformation.getSink()), String.format("%s %s", "Sink Writer:", sinkTransformation.getName()), sinkTransformation.getUid(), parallelism, sinkTransformation.getMaxParallelism(), sinkTransformation, context);
    }

    private int addCommitter(int inputId, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, OneInputStreamOperatorFactory<CommT, CommT> committerFactory, int parallelism, int maxParallelism, TransformationTranslator.Context context) throws IOException {
        if (!sinkTransformation.getSink().createCommitter().isPresent()) {
            return -1;
        }
        String prefix = "Sink Committer:";
        CommittableTypeInformation<CommT> committableTypeInfo = this.extractCommittableTypeInformation(sinkTransformation.getSink());
        Preconditions.checkNotNull(committableTypeInfo);
        return this.addOperatorToStreamGraph(committerFactory, Collections.singletonList(inputId), committableTypeInfo, committableTypeInfo, String.format("%s %s", "Sink Committer:", sinkTransformation.getName()), sinkTransformation.getUid() == null ? null : String.format("%s %s", "Sink Committer:", sinkTransformation.getUid()), parallelism, maxParallelism, sinkTransformation, context);
    }

    private void addGlobalCommitter(int inputId, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, OneInputStreamOperatorFactory<CommT, GlobalCommT> globalCommitterFactory, TransformationTranslator.Context context) throws IOException {
        if (!sinkTransformation.getSink().createGlobalCommitter().isPresent()) {
            return;
        }
        String prefix = "Sink Global Committer:";
        this.addOperatorToStreamGraph(globalCommitterFactory, Collections.singletonList(inputId), (TypeInformation)Preconditions.checkNotNull(this.extractCommittableTypeInformation(sinkTransformation.getSink())), null, String.format("%s %s", "Sink Global Committer:", sinkTransformation.getName()), sinkTransformation.getUid() == null ? null : String.format("%s %s", "Sink Global Committer:", sinkTransformation.getUid()), 1, 1, sinkTransformation, context);
    }

    private int getParallelism(SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, TransformationTranslator.Context context) {
        return sinkTransformation.getParallelism() != -1 ? sinkTransformation.getParallelism() : context.getStreamGraph().getExecutionConfig().getParallelism();
    }

    private <IN, OUT> int addOperatorToStreamGraph(StreamOperatorFactory<OUT> operatorFactory, Collection<Integer> inputs, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypInfo, String name, @Nullable String uid, int parallelism, int maxParallelism, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, TransformationTranslator.Context context) {
        StreamGraph streamGraph = context.getStreamGraph();
        String slotSharingGroup = context.getSlotSharingGroup();
        int transformationId = Transformation.getNewNodeId();
        streamGraph.addOperator(transformationId, slotSharingGroup, sinkTransformation.getCoLocationGroupKey(), operatorFactory, inTypeInfo, outTypInfo, name);
        streamGraph.setParallelism(transformationId, parallelism);
        streamGraph.setMaxParallelism(transformationId, maxParallelism);
        StreamGraphUtils.configureBufferTimeout(streamGraph, transformationId, sinkTransformation, context.getDefaultBufferTimeout());
        if (uid != null) {
            streamGraph.setTransformationUID(transformationId, uid);
        }
        for (int input : inputs) {
            streamGraph.addEdge(input, transformationId, 0);
        }
        return transformationId;
    }

    private CommittableTypeInformation<CommT> extractCommittableTypeInformation(Sink<InputT, CommT, WriterStateT, GlobalCommT> sink) {
        if (sink.getCommittableSerializer().isPresent()) {
            Type committableType = TypeExtractor.getParameterType(Sink.class, sink.getClass(), (int)1);
            LOG.debug("Extracted committable type [{}] from sink [{}].", (Object)committableType.toString(), (Object)sink.getClass().getCanonicalName());
            return new CommittableTypeInformation(TypeExtractionUtils.typeToClass((Type)committableType), (SerializableSupplier & Serializable)() -> (SimpleVersionedSerializer)sink.getCommittableSerializer().get());
        }
        return null;
    }
}

