package org.apache.paimon.flink.sink;

import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.types.RowKind;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.PaimonDataStreamSinkProvider;
import org.apache.paimon.flink.log.LogSinkProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.PrimaryKeyFileStoreTable;
import org.apache.paimon.table.Table;

/* loaded from: input_file:org/apache/paimon/flink/sink/FlinkTableSinkBase.class */
public abstract class FlinkTableSinkBase implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning {
    protected final ObjectIdentifier tableIdentifier;
    protected final DynamicTableFactory.Context context;

    @Nullable
    protected final LogStoreTableFactory logStoreTableFactory;
    protected final Table table;
    protected Map<String, String> staticPartitions = new HashMap();
    protected boolean overwrite = false;

    public FlinkTableSinkBase(ObjectIdentifier objectIdentifier, Table table, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory) {
        this.tableIdentifier = objectIdentifier;
        this.table = table;
        this.context = context;
        this.logStoreTableFactory = logStoreTableFactory;
    }

    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
        if (this.table instanceof AppendOnlyFileStoreTable) {
            return changelogMode;
        }
        if (!(this.table instanceof PrimaryKeyFileStoreTable)) {
            throw new UnsupportedOperationException("Unknown FileStoreTable subclass " + this.table.getClass().getName());
        }
        Options fromMap = Options.fromMap(this.table.options());
        if (fromMap.get(CoreOptions.CHANGELOG_PRODUCER) != CoreOptions.ChangelogProducer.INPUT && fromMap.get(CoreOptions.MERGE_ENGINE) != CoreOptions.MergeEngine.AGGREGATE && fromMap.get(CoreOptions.LOG_CHANGELOG_MODE) != CoreOptions.LogChangelogMode.ALL) {
            ChangelogMode.Builder newBuilder = ChangelogMode.newBuilder();
            for (RowKind rowKind : changelogMode.getContainedKinds()) {
                if (rowKind != RowKind.UPDATE_BEFORE) {
                    newBuilder.addContainedKind(rowKind);
                }
            }
            return newBuilder.build();
        }
        return changelogMode;
    }

    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
        if (this.overwrite && !context.isBounded()) {
            throw new UnsupportedOperationException("Paimon doesn't support streaming INSERT OVERWRITE.");
        }
        LogSinkProvider logSinkProvider = null;
        if (this.logStoreTableFactory != null) {
            logSinkProvider = this.logStoreTableFactory.createSinkProvider(this.context, context);
        }
        Options fromMap = Options.fromMap(this.table.options());
        LogSinkFunction createSink = this.overwrite ? null : logSinkProvider == null ? null : logSinkProvider.createSink();
        return new PaimonDataStreamSinkProvider(dataStream -> {
            return new FlinkSinkBuilder((FileStoreTable) this.table).withInput(new DataStream<>(dataStream.getExecutionEnvironment(), dataStream.getTransformation())).withLogSinkFunction(createSink).withOverwritePartition(this.overwrite ? this.staticPartitions : null).withParallelism((Integer) fromMap.get(FlinkConnectorOptions.SINK_PARALLELISM)).build();
        });
    }

    public DynamicTableSink copy() {
        FlinkTableSink flinkTableSink = new FlinkTableSink(this.tableIdentifier, this.table, this.context, this.logStoreTableFactory);
        flinkTableSink.staticPartitions = new HashMap(this.staticPartitions);
        flinkTableSink.overwrite = this.overwrite;
        return flinkTableSink;
    }

    public String asSummaryString() {
        return "PaimonSink";
    }

    public void applyStaticPartition(Map<String, String> map) {
        this.table.partitionKeys().forEach(str -> {
            if (map.containsKey(str)) {
                this.staticPartitions.put(str, map.get(str));
            }
        });
    }

    public void applyOverwrite(boolean z) {
        this.overwrite = z;
    }
}
