package org.apache.paimon.flink.action.cdc.kafka;

import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordSchemaBuilder;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.class */
public class KafkaSyncDatabaseAction extends ActionBase {
    private final Configuration kafkaConfig;
    private final String database;
    private final String tablePrefix;
    private final String tableSuffix;

    @Nullable
    private final Pattern includingPattern;

    @Nullable
    private final Pattern excludingPattern;
    private final Map<String, String> tableConfig;

    public KafkaSyncDatabaseAction(Map<String, String> map, String str, String str2, Map<String, String> map2, Map<String, String> map3) {
        this(map, str, str2, null, null, null, null, map2, map3);
    }

    public KafkaSyncDatabaseAction(Map<String, String> map, String str, String str2, @Nullable String str3, @Nullable String str4, @Nullable String str5, @Nullable String str6, Map<String, String> map2, Map<String, String> map3) {
        super(str, map2);
        this.kafkaConfig = Configuration.fromMap(map);
        this.database = str2;
        this.tablePrefix = str3 == null ? "" : str3;
        this.tableSuffix = str4 == null ? "" : str4;
        this.includingPattern = str5 == null ? null : Pattern.compile(str5);
        this.excludingPattern = str6 == null ? null : Pattern.compile(str6);
        this.tableConfig = map3;
    }

    public void build(StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
        boolean caseSensitive = this.catalog.caseSensitive();
        if (!caseSensitive) {
            validateCaseInsensitive();
        }
        this.catalog.createDatabase(this.database, true);
        TableNameConverter tableNameConverter = new TableNameConverter(caseSensitive, true, this.tablePrefix, this.tableSuffix);
        KafkaSource<String> buildKafkaSource = KafkaActionUtils.buildKafkaSource(this.kafkaConfig);
        RecordParser createParser = DataFormat.getDataFormat(this.kafkaConfig).createParser(caseSensitive, tableNameConverter, Collections.emptyList());
        RichCdcMultiplexRecordSchemaBuilder richCdcMultiplexRecordSchemaBuilder = new RichCdcMultiplexRecordSchemaBuilder(this.tableConfig);
        Pattern pattern = this.includingPattern;
        Pattern pattern2 = this.excludingPattern;
        FlinkCdcSyncDatabaseSinkBuilder withMode = new FlinkCdcSyncDatabaseSinkBuilder().withInput(streamExecutionEnvironment.fromSource(buildKafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source").flatMap(createParser)).withParserFactory(() -> {
            return new RichCdcMultiplexRecordEventParser(richCdcMultiplexRecordSchemaBuilder, pattern, pattern2);
        }).withCatalogLoader(catalogLoader()).withDatabase(this.database).withMode(DatabaseSyncMode.COMBINED);
        String str = this.tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (str != null) {
            withMode.withParallelism(Integer.valueOf(Integer.parseInt(str)));
        }
        withMode.build();
    }

    private void validateCaseInsensitive() {
        Preconditions.checkArgument(this.database.equals(this.database.toLowerCase()), String.format("Database name [%s] cannot contain upper case in case-insensitive catalog.", this.database));
        Preconditions.checkArgument(this.tablePrefix.equals(this.tablePrefix.toLowerCase()), String.format("Table prefix [%s] cannot contain upper case in case-insensitive catalog.", this.tablePrefix));
        Preconditions.checkArgument(this.tableSuffix.equals(this.tableSuffix.toLowerCase()), String.format("Table suffix [%s] cannot contain upper case in case-insensitive catalog.", this.tableSuffix));
    }

    @Override // org.apache.paimon.flink.action.Action
    public void run() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        build(executionEnvironment);
        executionEnvironment.execute(String.format("KAFKA-Paimon Database Sync: %s", this.database));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 603309346:
                if (implMethodName.equals("lambda$build$8ca810ab$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/cdc/EventParser$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/paimon/flink/sink/cdc/EventParser;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder;Ljava/util/regex/Pattern;Ljava/util/regex/Pattern;)Lorg/apache/paimon/flink/sink/cdc/EventParser;")) {
                    RichCdcMultiplexRecordSchemaBuilder richCdcMultiplexRecordSchemaBuilder = (RichCdcMultiplexRecordSchemaBuilder) serializedLambda.getCapturedArg(0);
                    Pattern pattern = (Pattern) serializedLambda.getCapturedArg(1);
                    Pattern pattern2 = (Pattern) serializedLambda.getCapturedArg(2);
                    return () -> {
                        return new RichCdcMultiplexRecordEventParser(richCdcMultiplexRecordSchemaBuilder, pattern, pattern2);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
