package org.apache.paimon.flink.compact;

import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
import org.apache.paimon.flink.source.BucketUnawareCompactSource;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.AppendOnlyFileStoreTable;

/* loaded from: input_file:org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.class */
public class UnawareBucketCompactionTopoBuilder {
    private final transient StreamExecutionEnvironment env;
    private final String tableIdentifier;
    private final AppendOnlyFileStoreTable table;

    @Nullable
    private List<Map<String, String>> specifiedPartitions = null;
    private boolean isContinuous = false;

    public UnawareBucketCompactionTopoBuilder(StreamExecutionEnvironment streamExecutionEnvironment, String str, AppendOnlyFileStoreTable appendOnlyFileStoreTable) {
        this.env = streamExecutionEnvironment;
        this.tableIdentifier = str;
        this.table = appendOnlyFileStoreTable;
    }

    public void withContinuousMode(boolean z) {
        this.isContinuous = z;
    }

    public void withPartitions(List<Map<String, String>> list) {
        this.specifiedPartitions = list;
    }

    public void build() {
        sinkFromSource(buildSource());
    }

    public DataStream<Committable> fetchUncommitted(String str) {
        DataStream<AppendOnlyCompactionTask> rebalanceInput = rebalanceInput(buildSource());
        return new UnawareBucketCompactionSink(this.table).doWrite(rebalanceInput, str, Integer.valueOf(rebalanceInput.getParallelism()));
    }

    private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
        return BucketUnawareCompactSource.buildSource(this.env, new BucketUnawareCompactSource(this.table, this.isContinuous, this.table.coreOptions().continuousDiscoveryInterval().toMillis(), this.specifiedPartitions != null ? PredicateBuilder.partitions(this.specifiedPartitions, this.table.rowType()) : null), this.isContinuous, this.tableIdentifier);
    }

    private void sinkFromSource(DataStreamSource<AppendOnlyCompactionTask> dataStreamSource) {
        UnawareBucketCompactionSink.sink(this.table, rebalanceInput(dataStreamSource));
    }

    private DataStream<AppendOnlyCompactionTask> rebalanceInput(DataStreamSource<AppendOnlyCompactionTask> dataStreamSource) {
        Integer num = (Integer) Options.fromMap(this.table.options()).get(FlinkConnectorOptions.UNAWARE_BUCKET_COMPACTION_PARALLELISM);
        PartitionTransformation partitionTransformation = new PartitionTransformation(dataStreamSource.getTransformation(), new RebalancePartitioner());
        if (num != null) {
            partitionTransformation.setParallelism(num.intValue());
        } else {
            partitionTransformation.setParallelism(this.env.getParallelism());
        }
        return new DataStream<>(this.env, partitionTransformation);
    }
}
