package org.apache.paimon.flink.sink;

import java.util.Map;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.table.AppendOnlyFileStoreTable;

/* loaded from: input_file:org/apache/paimon/flink/sink/UnawareBucketWriteSink.class */
public class UnawareBucketWriteSink extends FileStoreSink {
    private final boolean enableCompaction;
    private final AppendOnlyFileStoreTable table;
    private final Integer parallelism;

    public UnawareBucketWriteSink(AppendOnlyFileStoreTable appendOnlyFileStoreTable, Map<String, String> map, LogSinkFunction logSinkFunction, Integer num) {
        super(appendOnlyFileStoreTable, map, logSinkFunction);
        this.table = appendOnlyFileStoreTable;
        this.enableCompaction = !appendOnlyFileStoreTable.coreOptions().writeOnly();
        this.parallelism = num;
    }

    @Override // org.apache.paimon.flink.sink.FlinkSink
    public DataStreamSink<?> sinkFrom(DataStream<InternalRow> dataStream, String str) {
        DataStream doWrite = doWrite(dataStream, str, this.parallelism);
        boolean z = dataStream.getExecutionEnvironment().getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        if (this.enableCompaction && z) {
            UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder = new UnawareBucketCompactionTopoBuilder(dataStream.getExecutionEnvironment(), this.table.name(), this.table);
            unawareBucketCompactionTopoBuilder.withContinuousMode(true);
            doWrite = doWrite.union(new DataStream[]{unawareBucketCompactionTopoBuilder.fetchUncommitted(str)});
        }
        return doCommit(doWrite, str);
    }
}
