package org.apache.paimon.flink.action.cdc;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/SyncTableActionBase.class */
public abstract class SyncTableActionBase extends ActionBase {
    private static final Logger LOG = LoggerFactory.getLogger(SyncTableActionBase.class);
    protected final String database;
    protected final String table;
    protected final Configuration cdcSourceConfig;
    protected FileStoreTable fileStoreTable;
    protected List<String> partitionKeys;
    protected List<String> primaryKeys;
    protected Map<String, String> tableConfig;
    protected List<String> computedColumnArgs;
    protected TypeMapping typeMapping;
    protected List<ComputedColumn> computedColumns;
    protected CdcMetadataConverter[] metadataConverters;

    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/SyncTableActionBase$SchemaRetrievalException.class */
    public static class SchemaRetrievalException extends Exception {
        public SchemaRetrievalException(String str) {
            super(str);
        }
    }

    public SyncTableActionBase(String str, String str2, String str3, Map<String, String> map, Map<String, String> map2) {
        super(str, map);
        this.partitionKeys = new ArrayList();
        this.primaryKeys = new ArrayList();
        this.tableConfig = new HashMap();
        this.computedColumnArgs = new ArrayList();
        this.typeMapping = TypeMapping.defaultMapping();
        this.computedColumns = new ArrayList();
        this.metadataConverters = new CdcMetadataConverter[0];
        this.database = str2;
        this.table = str3;
        this.cdcSourceConfig = Configuration.fromMap(map2);
    }

    public SyncTableActionBase withPartitionKeys(String... strArr) {
        return withPartitionKeys(Arrays.asList(strArr));
    }

    public SyncTableActionBase withPartitionKeys(List<String> list) {
        this.partitionKeys = list;
        return this;
    }

    public SyncTableActionBase withPrimaryKeys(String... strArr) {
        return withPrimaryKeys(Arrays.asList(strArr));
    }

    public SyncTableActionBase withPrimaryKeys(List<String> list) {
        this.primaryKeys = list;
        return this;
    }

    public SyncTableActionBase withTableConfig(Map<String, String> map) {
        this.tableConfig = map;
        return this;
    }

    public SyncTableActionBase withComputedColumnArgs(List<String> list) {
        this.computedColumnArgs = list;
        return this;
    }

    public SyncTableActionBase withTypeMapping(TypeMapping typeMapping) {
        this.typeMapping = typeMapping;
        return this;
    }

    public SyncTableActionBase withMetadataColumns(List<String> list) {
        this.metadataConverters = (CdcMetadataConverter[]) list.stream().map(this::metadataConverter).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).toArray(i -> {
            return new CdcMetadataConverter[i];
        });
        return this;
    }

    protected Optional<CdcMetadataConverter<?>> metadataConverter(String str) {
        return Optional.empty();
    }

    protected void checkCdcSourceArgument() {
    }

    protected abstract Schema retrieveSchema() throws Exception;

    protected Schema buildPaimonSchema(Schema schema) {
        return CdcActionCommonUtils.buildPaimonSchema(this.partitionKeys, this.primaryKeys, this.computedColumns, this.tableConfig, schema, this.metadataConverters, true);
    }

    protected abstract DataStreamSource<String> buildSource() throws Exception;

    protected abstract String sourceName();

    protected abstract FlatMapFunction<String, RichCdcMultiplexRecord> recordParse();

    public void build() throws Exception {
        checkCdcSourceArgument();
        this.catalog.createDatabase(this.database, true);
        boolean caseSensitive = this.catalog.caseSensitive();
        validateCaseInsensitive(caseSensitive);
        Identifier identifier = new Identifier(this.database, this.table);
        if (this.catalog.tableExists(identifier)) {
            this.fileStoreTable = this.catalog.getTable(identifier).copy(this.tableConfig);
            try {
                Schema retrieveSchema = retrieveSchema();
                this.computedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, retrieveSchema.fields());
                CdcActionCommonUtils.assertSchemaCompatible(this.fileStoreTable.schema(), buildPaimonSchema(retrieveSchema).fields());
            } catch (SchemaRetrievalException e) {
                LOG.info("Failed to retrieve schema from record data but there exists specified Paimon table. Schema compatibility check will be skipped. If you have specified computed columns, here will use the existed Paimon table schema to build them. Please make sure the Paimon table has defined all the argument columns used for computed columns.");
                this.computedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, this.fileStoreTable.schema().fields());
                checkConstraints();
            }
        } else {
            Schema retrieveSchema2 = retrieveSchema();
            this.computedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, retrieveSchema2.fields());
            this.catalog.createTable(identifier, buildPaimonSchema(retrieveSchema2), false);
            this.fileStoreTable = this.catalog.getTable(identifier).copy(this.tableConfig);
        }
        checkComputedColumns(this.computedColumns);
        SingleOutputStreamOperator name = buildSource().flatMap(recordParse()).name("Parse");
        CdcSinkBuilder withCatalogLoader = new CdcSinkBuilder().withInput(name).withParserFactory(() -> {
            return new RichCdcMultiplexRecordEventParser(caseSensitive);
        }).withTable(this.fileStoreTable).withIdentifier(identifier).withCatalogLoader(catalogLoader());
        String str = this.tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (str != null) {
            withCatalogLoader.withParallelism(Integer.valueOf(Integer.parseInt(str)));
        }
        withCatalogLoader.build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DataStreamSource<String> buildDataStreamSource(Object obj) {
        if (obj instanceof Source) {
            return this.env.fromSource((Source) obj, WatermarkStrategy.noWatermarks(), sourceName());
        }
        if (obj instanceof SourceFunction) {
            return this.env.addSource((SourceFunction) obj, sourceName());
        }
        throw new UnsupportedOperationException("Unrecognized source type");
    }

    protected void validateCaseInsensitive(boolean z) {
        AbstractCatalog.validateCaseInsensitive(z, "Database", new String[]{this.database});
        AbstractCatalog.validateCaseInsensitive(z, "Table", new String[]{this.table});
        AbstractCatalog.validateCaseInsensitive(z, "Partition keys", this.partitionKeys);
        AbstractCatalog.validateCaseInsensitive(z, "Primary keys", this.primaryKeys);
    }

    protected void checkComputedColumns(List<ComputedColumn> list) {
        if (list.isEmpty()) {
            return;
        }
        List list2 = (List) list.stream().map((v0) -> {
            return v0.columnName();
        }).collect(Collectors.toList());
        List fieldNames = this.fileStoreTable.schema().fieldNames();
        Preconditions.checkArgument(new HashSet(fieldNames).containsAll(list2), " Exists Table should contain all computed columns %s, but are %s.", new Object[]{list2, fieldNames});
    }

    private void checkConstraints() {
        if (!this.partitionKeys.isEmpty()) {
            List partitionKeys = this.fileStoreTable.partitionKeys();
            Preconditions.checkState(partitionKeys.size() == this.partitionKeys.size() && partitionKeys.containsAll(this.partitionKeys), "Specified partition keys [%s] are not equal to the existed table partition keys [%s]. You should remove the --partition-keys argument or re-create the table if the partition keys are wrong.", new Object[]{String.join(",", this.partitionKeys), String.join(",", partitionKeys)});
        }
        if (this.primaryKeys.isEmpty()) {
            return;
        }
        List primaryKeys = this.fileStoreTable.primaryKeys();
        Preconditions.checkState(primaryKeys.size() == this.primaryKeys.size() && primaryKeys.containsAll(this.primaryKeys), "Specified primary keys [%s] are not equal to the existed table primary keys [%s]. You should remove the --primary-keys argument or re-create the table if the primary keys are wrong.", new Object[]{String.join(",", this.primaryKeys), String.join(",", primaryKeys)});
    }

    @VisibleForTesting
    public Map<String, String> tableConfig() {
        return this.tableConfig;
    }

    @VisibleForTesting
    public FileStoreTable fileStoreTable() {
        return this.fileStoreTable;
    }

    protected abstract String jobName();

    public void run() throws Exception {
        build();
        execute(jobName());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 7481310:
                if (implMethodName.equals("lambda$build$faa2fd2f$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/cdc/EventParser$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/paimon/flink/sink/cdc/EventParser;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/action/cdc/SyncTableActionBase") && serializedLambda.getImplMethodSignature().equals("(Z)Lorg/apache/paimon/flink/sink/cdc/EventParser;")) {
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(0)).booleanValue();
                    return () -> {
                        return new RichCdcMultiplexRecordEventParser(booleanValue);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
