package org.apache.flink.table.store.connector.sink;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.LogSinkFunction;

/* loaded from: input_file:org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.class */
public class FlinkSinkBuilder {
    private final ObjectIdentifier tableIdentifier;
    private final FileStoreTable table;
    private final Configuration conf;
    private DataStream<RowData> input;

    @Nullable
    private CatalogLock.Factory lockFactory;

    @Nullable
    private Map<String, String> overwritePartition;

    @Nullable
    private LogSinkFunction logSinkFunction;

    @Nullable
    private Integer parallelism;

    public FlinkSinkBuilder(ObjectIdentifier objectIdentifier, FileStoreTable fileStoreTable) {
        this.tableIdentifier = objectIdentifier;
        this.table = fileStoreTable;
        this.conf = Configuration.fromMap(fileStoreTable.schema().options());
    }

    public FlinkSinkBuilder withInput(DataStream<RowData> dataStream) {
        this.input = dataStream;
        return this;
    }

    public FlinkSinkBuilder withLockFactory(CatalogLock.Factory factory) {
        this.lockFactory = factory;
        return this;
    }

    public FlinkSinkBuilder withOverwritePartition(Map<String, String> map) {
        this.overwritePartition = map;
        return this;
    }

    public FlinkSinkBuilder withLogSinkFunction(@Nullable LogSinkFunction logSinkFunction) {
        this.logSinkFunction = logSinkFunction;
        return this;
    }

    public FlinkSinkBuilder withParallelism(@Nullable Integer num) {
        this.parallelism = num;
        return this;
    }

    @Nullable
    private Map<String, String> getCompactPartSpec() {
        String str = (String) this.conf.get(FlinkConnectorOptions.COMPACTION_PARTITION_SPEC);
        if (str == null) {
            return null;
        }
        return (Map) JsonSerdeUtil.fromJson(str, Map.class);
    }

    public DataStreamSink<?> build() {
        PartitionTransformation partitionTransformation = new PartitionTransformation(this.input.getTransformation(), new BucketStreamPartitioner(((Integer) this.conf.get(CoreOptions.BUCKET)).intValue(), this.table.schema()));
        if (this.parallelism != null) {
            partitionTransformation.setParallelism(this.parallelism.intValue());
        }
        return new StoreSink(this.tableIdentifier, this.table, ((Boolean) this.conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED)).booleanValue(), getCompactPartSpec(), this.lockFactory, this.overwritePartition, this.logSinkFunction).sinkTo(new DataStream<>(this.input.getExecutionEnvironment(), partitionTransformation));
    }
}
