package org.apache.hudi.table;

import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.adapter.SupportsRowLevelDeleteAdapter;
import org.apache.hudi.adapter.SupportsRowLevelUpdateAdapter;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.DataModificationInfos;

/* loaded from: input_file:org/apache/hudi/table/HoodieTableSink.class */
public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite, SupportsRowLevelDeleteAdapter, SupportsRowLevelUpdateAdapter {
    private final Configuration conf;
    private final ResolvedSchema schema;
    private boolean overwrite;

    public HoodieTableSink(Configuration configuration, ResolvedSchema resolvedSchema) {
        this.overwrite = false;
        this.conf = configuration;
        this.schema = resolvedSchema;
    }

    public HoodieTableSink(Configuration configuration, ResolvedSchema resolvedSchema, boolean z) {
        this.overwrite = false;
        this.conf = configuration;
        this.schema = resolvedSchema;
        this.overwrite = z;
    }

    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
        return dataStream -> {
            this.conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, dataStream.getExecutionEnvironment().getCheckpointConfig().getCheckpointTimeout());
            OptionsInference.setupSinkTasks(this.conf, dataStream.getExecutionConfig().getParallelism());
            OptionsInference.setupClientId(this.conf);
            RowType logicalType = this.schema.toSinkRowDataType().notNull().getLogicalType();
            if (OptionsResolver.isBulkInsertOperation(this.conf)) {
                if (context.isBounded()) {
                    return Pipelines.bulkInsert(this.conf, logicalType, dataStream);
                }
                throw new HoodieException("The bulk insert should be run in batch execution mode.");
            }
            if (OptionsResolver.isAppendMode(this.conf)) {
                DataStream<Object> append = Pipelines.append(this.conf, logicalType, dataStream);
                return OptionsResolver.needsAsyncClustering(this.conf) ? Pipelines.cluster(this.conf, logicalType, append) : OptionsResolver.isLazyFailedWritesCleanPolicy(this.conf) ? Pipelines.clean(this.conf, append) : Pipelines.dummySink(append);
            }
            DataStream<Object> hoodieStreamWrite = Pipelines.hoodieStreamWrite(this.conf, Pipelines.bootstrap(this.conf, logicalType, dataStream, context.isBounded(), this.overwrite));
            if (!OptionsResolver.needsAsyncCompaction(this.conf)) {
                return Pipelines.clean(this.conf, hoodieStreamWrite);
            }
            if (context.isBounded()) {
                this.conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
            }
            return Pipelines.compact(this.conf, hoodieStreamWrite);
        };
    }

    @VisibleForTesting
    public Configuration getConf() {
        return this.conf;
    }

    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
        return this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED) ? ChangelogModes.FULL : ChangelogModes.UPSERT;
    }

    public DynamicTableSink copy() {
        return new HoodieTableSink(this.conf, this.schema, this.overwrite);
    }

    public String asSummaryString() {
        return "HoodieTableSink";
    }

    public void applyStaticPartition(Map<String, String> map) {
        if (!this.overwrite || map.isEmpty()) {
            return;
        }
        this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE.value());
    }

    public void applyOverwrite(boolean z) {
        this.overwrite = z;
        if (OptionsResolver.overwriteDynamicPartition(this.conf)) {
            this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE.value());
        } else {
            this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE_TABLE.value());
        }
    }

    @Override // org.apache.hudi.adapter.SupportsRowLevelDeleteAdapter
    public SupportsRowLevelDeleteAdapter.RowLevelDeleteInfoAdapter applyRowLevelDelete() {
        this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.DELETE.value());
        return DataModificationInfos.DEFAULT_DELETE_INFO;
    }

    @Override // org.apache.hudi.adapter.SupportsRowLevelUpdateAdapter
    public SupportsRowLevelUpdateAdapter.RowLevelUpdateInfoAdapter applyRowLevelUpdate(List<Column> list) {
        this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.UPSERT.value());
        return DataModificationInfos.DEFAULT_UPDATE_INFO;
    }
}
