package org.apache.paimon.flink.sink;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.utils.ManagedMemoryUtils;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializableSupplier;

/* loaded from: input_file:org/apache/paimon/flink/sink/FlinkSink.class */
public abstract class FlinkSink<T> implements Serializable {
    private static final long serialVersionUID = 1;
    private static final String WRITER_NAME = "Writer";
    private static final String WRITER_WRITE_ONLY_NAME = "Writer(write-only)";
    private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
    protected final FileStoreTable table;
    private final boolean ignorePreviousFiles;

    public FlinkSink(FileStoreTable fileStoreTable, boolean z) {
        this.table = fileStoreTable;
        this.ignorePreviousFiles = z;
    }

    private StoreSinkWrite.Provider createWriteProvider(CheckpointConfig checkpointConfig, boolean z) {
        boolean z2;
        if (this.table.coreOptions().writeOnly()) {
            z2 = false;
        } else {
            Options configuration = this.table.coreOptions().toConfiguration();
            CoreOptions.ChangelogProducer changelogProducer = this.table.coreOptions().changelogProducer();
            z2 = changelogProducer == CoreOptions.ChangelogProducer.LOOKUP && ((Boolean) configuration.get(FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT)).booleanValue();
            int i = -1;
            if (configuration.contains(CoreOptions.FULL_COMPACTION_DELTA_COMMITS)) {
                i = ((Integer) configuration.get(CoreOptions.FULL_COMPACTION_DELTA_COMMITS)).intValue();
            } else if (configuration.contains(FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
                i = (int) (((Duration) configuration.get(FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)).toMillis() / checkpointConfig.getCheckpointInterval());
            }
            if (changelogProducer == CoreOptions.ChangelogProducer.FULL_COMPACTION || i >= 0) {
                int max = Math.max(i, 1);
                return (fileStoreTable, str, storeSinkWriteState, iOManager, memorySegmentPool, metricGroup) -> {
                    return new GlobalFullCompactionSinkWrite(fileStoreTable, str, storeSinkWriteState, iOManager, this.ignorePreviousFiles, z2, max, z, memorySegmentPool, metricGroup);
                };
            }
        }
        boolean z3 = z2;
        return (fileStoreTable2, str2, storeSinkWriteState2, iOManager2, memorySegmentPool2, metricGroup2) -> {
            return new StoreSinkWriteImpl(fileStoreTable2, str2, storeSinkWriteState2, iOManager2, this.ignorePreviousFiles, z3, z, memorySegmentPool2, metricGroup2);
        };
    }

    public DataStreamSink<?> sinkFrom(DataStream<T> dataStream) {
        return sinkFrom(dataStream, UUID.randomUUID().toString());
    }

    public DataStreamSink<?> sinkFrom(DataStream<T> dataStream, String str) {
        return doCommit(doWrite(dataStream, str, Integer.valueOf(dataStream.getParallelism())), str);
    }

    public SingleOutputStreamOperator<Committable> doWrite(DataStream<T> dataStream, String str, Integer num) {
        StreamExecutionEnvironment executionEnvironment = dataStream.getExecutionEnvironment();
        boolean z = StreamExecutionEnvironmentUtils.getConfiguration(executionEnvironment).get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        SingleOutputStreamOperator<Committable> parallelism = dataStream.transform((this.table.coreOptions().writeOnly() ? WRITER_WRITE_ONLY_NAME : WRITER_NAME) + " : " + this.table.name(), new CommittableTypeInfo(), createWriteOperator(createWriteProvider(executionEnvironment.getCheckpointConfig(), z), str)).setParallelism(num == null ? dataStream.getParallelism() : num.intValue());
        if (!z) {
            assertBatchConfiguration(executionEnvironment, parallelism.getParallelism());
        }
        Options fromMap = Options.fromMap(this.table.options());
        if (((Boolean) fromMap.get(FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY)).booleanValue()) {
            ManagedMemoryUtils.declareManagedMemory(parallelism, (MemorySize) fromMap.get(FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY));
        }
        return parallelism;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DataStreamSink<?> doCommit(DataStream<Committable> dataStream, String str) {
        StreamExecutionEnvironment executionEnvironment = dataStream.getExecutionEnvironment();
        ReadableConfig configuration = StreamExecutionEnvironmentUtils.getConfiguration(executionEnvironment);
        boolean z = (configuration.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING) && executionEnvironment.getCheckpointConfig().isCheckpointingEnabled();
        if (z) {
            assertStreamingConfiguration(executionEnvironment);
        }
        OneInputStreamOperator committerOperator = new CommitterOperator(z, str, createCommitterFactory(z), createCommittableStateManager());
        if (((Boolean) Options.fromMap(this.table.options()).get(FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT)).booleanValue()) {
            FileStoreTable fileStoreTable = this.table;
            fileStoreTable.getClass();
            SerializableSupplier serializableSupplier = fileStoreTable::snapshotManager;
            FileStoreTable fileStoreTable2 = this.table;
            fileStoreTable2.getClass();
            committerOperator = new AutoTagForSavepointCommitterOperator((CommitterOperator) committerOperator, serializableSupplier, fileStoreTable2::tagManager, () -> {
                return this.table.store().newTagDeletion();
            });
        }
        SingleOutputStreamOperator maxParallelism = dataStream.transform("Global Committer : " + this.table.name(), new CommittableTypeInfo(), committerOperator).setParallelism(1).setMaxParallelism(1);
        Options fromMap = Options.fromMap(this.table.options());
        configureGlobalCommitter(maxParallelism, ((Double) fromMap.get(FlinkConnectorOptions.SINK_COMMITTER_CPU)).doubleValue(), (MemorySize) fromMap.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY), configuration);
        return maxParallelism.addSink(new DiscardingSink()).name("end").setParallelism(1);
    }

    public static void configureGlobalCommitter(SingleOutputStreamOperator<?> singleOutputStreamOperator, double d, @Nullable MemorySize memorySize, ReadableConfig readableConfig) {
        if (memorySize == null) {
            return;
        }
        if (!((Boolean) readableConfig.get(ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT)).booleanValue()) {
            throw new RuntimeException("To support the 'sink.committer-cpu' and 'sink.committer-memory' configurations, you must enable fine-grained resource management. Please set 'cluster.fine-grained-resource-management.enabled' to 'true' in your Flink configuration.");
        }
        singleOutputStreamOperator.slotSharingGroup(SlotSharingGroup.newBuilder(singleOutputStreamOperator.getName()).setCpuCores(d).setTaskHeapMemory(new org.apache.flink.configuration.MemorySize(memorySize.getBytes())).build());
    }

    public static void assertStreamingConfiguration(StreamExecutionEnvironment streamExecutionEnvironment) {
        Preconditions.checkArgument(!streamExecutionEnvironment.getCheckpointConfig().isUnalignedCheckpointsEnabled(), "Paimon sink currently does not support unaligned checkpoints. Please set " + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key() + " to false.");
        Preconditions.checkArgument(streamExecutionEnvironment.getCheckpointConfig().getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE, "Paimon sink currently only supports EXACTLY_ONCE checkpoint mode. Please set " + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key() + " to exactly-once");
    }

    private void assertBatchConfiguration(StreamExecutionEnvironment streamExecutionEnvironment, int i) {
        boolean z;
        if (i == -1) {
            try {
                if (AdaptiveParallelism.isEnabled(streamExecutionEnvironment)) {
                    z = false;
                    Preconditions.checkArgument(z, "Paimon Sink does not support Flink's Adaptive Parallelism mode. Please manually turn it off or set Paimon `sink.parallelism` manually.");
                }
            } catch (NoClassDefFoundError e) {
                return;
            }
        }
        z = true;
        Preconditions.checkArgument(z, "Paimon Sink does not support Flink's Adaptive Parallelism mode. Please manually turn it off or set Paimon `sink.parallelism` manually.");
    }

    protected abstract OneInputStreamOperator<T, Committable> createWriteOperator(StoreSinkWrite.Provider provider, String str);

    protected abstract Committer.Factory<Committable, ManifestCommittable> createCommitterFactory(boolean z);

    protected abstract CommittableStateManager<ManifestCommittable> createCommittableStateManager();

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -92754008:
                if (implMethodName.equals("lambda$createWriteProvider$1e767ef3$1")) {
                    z = false;
                    break;
                }
                break;
            case 322336391:
                if (implMethodName.equals("lambda$createWriteProvider$90b8e97c$1")) {
                    z = true;
                    break;
                }
                break;
            case 964922697:
                if (implMethodName.equals("snapshotManager")) {
                    z = 3;
                    break;
                }
                break;
            case 1070597811:
                if (implMethodName.equals("tagManager")) {
                    z = 2;
                    break;
                }
                break;
            case 2026304861:
                if (implMethodName.equals("lambda$doCommit$aae46398$1")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/StoreSinkWrite$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("provide") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/FlinkSink") && serializedLambda.getImplMethodSignature().equals("(ZZLorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;")) {
                    FlinkSink flinkSink = (FlinkSink) serializedLambda.getCapturedArg(0);
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(1)).booleanValue();
                    boolean booleanValue2 = ((Boolean) serializedLambda.getCapturedArg(2)).booleanValue();
                    return (fileStoreTable2, str2, storeSinkWriteState2, iOManager2, memorySegmentPool2, metricGroup2) -> {
                        return new StoreSinkWriteImpl(fileStoreTable2, str2, storeSinkWriteState2, iOManager2, this.ignorePreviousFiles, booleanValue, booleanValue2, memorySegmentPool2, metricGroup2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/StoreSinkWrite$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("provide") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/FlinkSink") && serializedLambda.getImplMethodSignature().equals("(ZIZLorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;")) {
                    FlinkSink flinkSink2 = (FlinkSink) serializedLambda.getCapturedArg(0);
                    boolean booleanValue3 = ((Boolean) serializedLambda.getCapturedArg(1)).booleanValue();
                    int intValue = ((Integer) serializedLambda.getCapturedArg(2)).intValue();
                    boolean booleanValue4 = ((Boolean) serializedLambda.getCapturedArg(3)).booleanValue();
                    return (fileStoreTable, str, storeSinkWriteState, iOManager, memorySegmentPool, metricGroup) -> {
                        return new GlobalFullCompactionSinkWrite(fileStoreTable, str, storeSinkWriteState, iOManager, this.ignorePreviousFiles, booleanValue3, intValue, booleanValue4, memorySegmentPool, metricGroup);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/table/DataTable") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/paimon/utils/TagManager;")) {
                    FileStoreTable fileStoreTable3 = (FileStoreTable) serializedLambda.getCapturedArg(0);
                    return fileStoreTable3::tagManager;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/table/DataTable") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/paimon/utils/SnapshotManager;")) {
                    FileStoreTable fileStoreTable4 = (FileStoreTable) serializedLambda.getCapturedArg(0);
                    return fileStoreTable4::snapshotManager;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/FlinkSink") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/paimon/operation/TagDeletion;")) {
                    FlinkSink flinkSink3 = (FlinkSink) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return this.table.store().newTagDeletion();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
