package org.apache.paimon.flink.sink.cdc;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.class */
public class FlinkCdcSyncDatabaseSinkBuilder<T> {
    private DataStream<T> input = null;
    private EventParser.Factory<T> parserFactory = null;
    private List<FileStoreTable> tables = new ArrayList();

    @Nullable
    private Integer parallelism;
    private double committerCpu;

    @Nullable
    private MemorySize committerMemory;
    private Map<String, String> dynamicOptions;
    private Catalog.Loader catalogLoader;
    private String database;
    private MultiTablesSinkMode mode;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$paimon$table$BucketMode = new int[BucketMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$paimon$table$BucketMode[BucketMode.FIXED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$paimon$table$BucketMode[BucketMode.DYNAMIC.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$paimon$table$BucketMode[BucketMode.GLOBAL_DYNAMIC.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$paimon$table$BucketMode[BucketMode.UNAWARE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public FlinkCdcSyncDatabaseSinkBuilder<T> withInput(DataStream<T> dataStream) {
        this.input = dataStream;
        return this;
    }

    public FlinkCdcSyncDatabaseSinkBuilder<T> withParserFactory(EventParser.Factory<T> factory) {
        this.parserFactory = factory;
        return this;
    }

    public FlinkCdcSyncDatabaseSinkBuilder<T> withTables(List<FileStoreTable> list) {
        this.tables = list;
        return this;
    }

    public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Map<String, String> map) {
        return withTableOptions(Options.fromMap(map));
    }

    public FlinkCdcSyncDatabaseSinkBuilder<T> withTableOptions(Options options) {
        this.dynamicOptions = options.toMap();
        this.parallelism = (Integer) options.get(FlinkConnectorOptions.SINK_PARALLELISM);
        this.committerCpu = ((Double) options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU)).doubleValue();
        this.committerMemory = (MemorySize) options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY);
        return this;
    }

    public FlinkCdcSyncDatabaseSinkBuilder<T> withDatabase(String str) {
        this.database = str;
        return this;
    }

    public FlinkCdcSyncDatabaseSinkBuilder<T> withCatalogLoader(Catalog.Loader loader) {
        this.catalogLoader = loader;
        return this;
    }

    public FlinkCdcSyncDatabaseSinkBuilder<T> withMode(MultiTablesSinkMode multiTablesSinkMode) {
        this.mode = multiTablesSinkMode;
        return this;
    }

    public void build() {
        Preconditions.checkNotNull(this.input);
        Preconditions.checkNotNull(this.parserFactory);
        Preconditions.checkNotNull(this.database);
        Preconditions.checkNotNull(this.catalogLoader);
        if (this.mode == MultiTablesSinkMode.COMBINED) {
            buildCombinedCdcSink();
        } else {
            buildDividedCdcSink();
        }
    }

    private void buildCombinedCdcSink() {
        SingleOutputStreamOperator parallelism = this.input.forward().process(new CdcDynamicTableParsingProcessFunction(this.database, this.catalogLoader, this.parserFactory)).name("Side Output").setParallelism(this.input.getParallelism());
        DataStream sideOutput = SingleOutputStreamOperatorUtils.getSideOutput(parallelism, CdcDynamicTableParsingProcessFunction.DYNAMIC_OUTPUT_TAG);
        SingleOutputStreamOperatorUtils.getSideOutput(parallelism, CdcDynamicTableParsingProcessFunction.DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG).process(new MultiTableUpdatedDataFieldsProcessFunction(this.catalogLoader)).name("Schema Evolution");
        new FlinkCdcMultiTableSink(this.catalogLoader, this.committerCpu, this.committerMemory).sinkFrom(FlinkStreamPartitioner.partition(sideOutput, new CdcMultiplexRecordChannelComputer(this.catalogLoader, this.dynamicOptions), this.parallelism), this.dynamicOptions);
    }

    private void buildForFixedBucket(FileStoreTable fileStoreTable, DataStream<CdcRecord> dataStream) {
        new FlinkCdcSink(fileStoreTable).sinkFrom(FlinkStreamPartitioner.partition(dataStream, new CdcRecordChannelComputer(fileStoreTable.schema()), this.parallelism));
    }

    private void buildDividedCdcSink() {
        Preconditions.checkNotNull(this.tables);
        SingleOutputStreamOperator parallelism = this.input.forward().process(new CdcMultiTableParsingProcessFunction(this.parserFactory)).setParallelism(this.input.getParallelism());
        for (FileStoreTable fileStoreTable : this.tables) {
            SingleOutputStreamOperator process = SingleOutputStreamOperatorUtils.getSideOutput(parallelism, CdcMultiTableParsingProcessFunction.createUpdatedDataFieldsOutputTag(fileStoreTable.name())).process(new UpdatedDataFieldsProcessFunction(new SchemaManager(fileStoreTable.fileIO(), fileStoreTable.location()), Identifier.create(this.database, fileStoreTable.name()), this.catalogLoader));
            process.getTransformation().setParallelism(1);
            process.getTransformation().setMaxParallelism(1);
            DataStream<CdcRecord> sideOutput = SingleOutputStreamOperatorUtils.getSideOutput(parallelism, CdcMultiTableParsingProcessFunction.createRecordOutputTag(fileStoreTable.name()));
            BucketMode bucketMode = fileStoreTable.bucketMode();
            switch (AnonymousClass1.$SwitchMap$org$apache$paimon$table$BucketMode[bucketMode.ordinal()]) {
                case 1:
                    buildForFixedBucket(fileStoreTable, sideOutput);
                    break;
                case 2:
                    new CdcDynamicBucketSink(fileStoreTable).build(sideOutput, this.parallelism);
                    break;
                case 3:
                case 4:
                default:
                    throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
            }
        }
    }
}
