package org.apache.paimon.flink.sink;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/sink/FlinkSinkBuilder.class */
public class FlinkSinkBuilder {
    private final FileStoreTable table;
    private DataStream<RowData> input;

    @Nullable
    private Map<String, String> overwritePartition;

    @Nullable
    private LogSinkFunction logSinkFunction;

    @Nullable
    private Integer parallelism;
    private boolean boundedInput = false;
    private boolean compactSink = false;

    public FlinkSinkBuilder(FileStoreTable fileStoreTable) {
        this.table = fileStoreTable;
    }

    public FlinkSinkBuilder withInput(DataStream<RowData> dataStream) {
        this.input = dataStream;
        return this;
    }

    public FlinkSinkBuilder withOverwritePartition(@Nullable Map<String, String> map) {
        this.overwritePartition = map;
        return this;
    }

    public FlinkSinkBuilder withLogSinkFunction(@Nullable LogSinkFunction logSinkFunction) {
        this.logSinkFunction = logSinkFunction;
        return this;
    }

    public FlinkSinkBuilder withParallelism(@Nullable Integer num) {
        this.parallelism = num;
        return this;
    }

    public FlinkSinkBuilder withBoundedInputStream(boolean z) {
        this.boundedInput = z;
        return this;
    }

    public FlinkSinkBuilder forCompact(boolean z) {
        this.compactSink = z;
        return this;
    }

    public DataStreamSink<?> build() {
        DataStream map = MapToInternalRow.map(this.input, this.table.rowType());
        if (this.table.coreOptions().localMergeEnabled() && this.table.schema().primaryKeys().size() > 0) {
            map = map.forward().transform("local merge", map.getType(), new LocalMergeOperator(this.table.schema())).setParallelism(map.getParallelism());
        }
        BucketMode bucketMode = this.table.bucketMode();
        switch (bucketMode) {
            case FIXED:
                return buildForFixedBucket(map);
            case DYNAMIC:
                return buildDynamicBucketSink(map, false);
            case GLOBAL_DYNAMIC:
                return buildDynamicBucketSink(map, true);
            case UNAWARE:
                return buildUnawareBucketSink(map);
            default:
                throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
        }
    }

    private DataStreamSink<?> buildDynamicBucketSink(DataStream<InternalRow> dataStream, boolean z) {
        Preconditions.checkArgument(this.logSinkFunction == null, "Dynamic bucket mode can not work with log system.");
        return (!this.compactSink || z) ? z ? new GlobalDynamicBucketSink(this.table, this.overwritePartition).build(dataStream, this.parallelism) : new RowDynamicBucketSink(this.table, this.overwritePartition).build(dataStream, this.parallelism) : new DynamicBucketCompactSink(this.table, this.overwritePartition).build(dataStream, this.parallelism);
    }

    private DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow> dataStream) {
        return new FileStoreSink(this.table, this.overwritePartition, this.logSinkFunction).sinkFrom(FlinkStreamPartitioner.partition(dataStream, new RowDataChannelComputer(this.table.schema(), this.logSinkFunction != null), this.parallelism));
    }

    private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow> dataStream) {
        Preconditions.checkArgument(this.table instanceof AppendOnlyFileStoreTable, "Unaware bucket mode only works with append-only table for now.");
        return new UnawareBucketWriteSink((AppendOnlyFileStoreTable) this.table, this.overwritePartition, this.logSinkFunction, this.parallelism, this.boundedInput).sinkFrom(dataStream);
    }
}
