package org.apache.flink.cdc.composer.flink.translator;

import java.lang.invoke.SerializedLambda;
import java.lang.reflect.InvocationTargetException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.sink.FlinkSinkFunctionProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator;
import org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;

@Internal
/* loaded from: input_file:org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.class */
public class DataSinkTranslator {
    private static final String SINK_WRITER_PREFIX = "Sink Writer: ";
    private static final String SINK_COMMITTER_PREFIX = "Sink Committer: ";

    public void translate(SinkDef sinkDef, DataStream<Event> dataStream, DataSink dataSink, OperatorID operatorID) {
        FlinkSinkProvider eventSinkProvider = dataSink.getEventSinkProvider();
        String generateSinkName = generateSinkName(sinkDef);
        if (eventSinkProvider instanceof FlinkSinkProvider) {
            sinkTo(dataStream, eventSinkProvider.getSink(), generateSinkName, operatorID);
        } else if (eventSinkProvider instanceof FlinkSinkFunctionProvider) {
            sinkTo(dataStream, ((FlinkSinkFunctionProvider) eventSinkProvider).getSinkFunction(), generateSinkName, operatorID);
        }
    }

    @VisibleForTesting
    void sinkTo(DataStream<Event> dataStream, Sink<Event> sink, String str, OperatorID operatorID) {
        DataStream<Event> dataStream2 = dataStream;
        if (sink instanceof WithPreWriteTopology) {
            dataStream2 = ((WithPreWriteTopology) sink).addPreWriteTopology(dataStream2);
        }
        if (sink instanceof TwoPhaseCommittingSink) {
            addCommittingTopology(sink, dataStream2, str, operatorID);
        } else {
            dataStream2.transform(SINK_WRITER_PREFIX + str, CommittableMessageTypeInfo.noOutput(), new DataSinkWriterOperatorFactory(sink, operatorID));
        }
    }

    private void sinkTo(DataStream<Event> dataStream, SinkFunction<Event> sinkFunction, String str, OperatorID operatorID) {
        DataSinkFunctionOperator dataSinkFunctionOperator = new DataSinkFunctionOperator(sinkFunction, operatorID);
        StreamExecutionEnvironment executionEnvironment = dataStream.getExecutionEnvironment();
        executionEnvironment.addOperator(new LegacySinkTransformation(dataStream.getTransformation(), SINK_WRITER_PREFIX + str, dataSinkFunctionOperator, executionEnvironment.getParallelism(), false));
    }

    private <CommT> void addCommittingTopology(Sink<Event> sink, DataStream<Event> dataStream, String str, OperatorID operatorID) {
        TypeInformation of = CommittableMessageTypeInfo.of(() -> {
            return getCommittableSerializer(sink);
        });
        DataStream transform = dataStream.transform(SINK_WRITER_PREFIX + str, of, new DataSinkWriterOperatorFactory(sink, operatorID));
        DataStream dataStream2 = transform;
        if (sink instanceof WithPreCommitTopology) {
            dataStream2 = ((WithPreCommitTopology) sink).addPreCommitTopology(transform);
        }
        SingleOutputStreamOperator transform2 = dataStream2.transform(SINK_COMMITTER_PREFIX + str, of, getCommitterOperatorFactory(sink, false, true));
        if (sink instanceof WithPostCommitTopology) {
            ((WithPostCommitTopology) sink).addPostCommitTopology(transform2);
        }
    }

    private String generateSinkName(SinkDef sinkDef) {
        return sinkDef.getName().orElse(String.format("Flink CDC Event Sink: %s", sinkDef.getType()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <CommT> SimpleVersionedSerializer<CommT> getCommittableSerializer(Object obj) {
        try {
            return (SimpleVersionedSerializer) obj.getClass().getDeclaredMethod("getCommittableSerializer", new Class[0]).invoke(obj, new Object[0]);
        } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException("Failed to get CommittableSerializer", e);
        }
    }

    private static <CommT> OneInputStreamOperatorFactory<CommittableMessage<CommT>, CommittableMessage<CommT>> getCommitterOperatorFactory(Sink<Event> sink, boolean z, boolean z2) {
        try {
            return (OneInputStreamOperatorFactory) Class.forName("org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory").getDeclaredConstructors()[0].newInstance(sink, Boolean.valueOf(z), Boolean.valueOf(z2));
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
            throw new RuntimeException("Failed to create CommitterOperatorFactory", e);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1947122925:
                if (implMethodName.equals("lambda$addCommittingTopology$43bb1172$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/connector/sink2/Sink;)Lorg/apache/flink/core/io/SimpleVersionedSerializer;")) {
                    Sink sink = (Sink) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return getCommittableSerializer(sink);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
