package org.apache.paimon.flink.action.cdc;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/SyncTableActionBase.class */
public abstract class SyncTableActionBase extends SynchronizationActionBase {
    private static final Logger LOG = LoggerFactory.getLogger(SyncTableActionBase.class);
    protected final String table;
    protected FileStoreTable fileStoreTable;
    protected List<String> partitionKeys;
    protected List<String> primaryKeys;
    protected List<String> computedColumnArgs;
    protected List<ComputedColumn> computedColumns;

    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/SyncTableActionBase$SchemaRetrievalException.class */
    public static class SchemaRetrievalException extends Exception {
        public SchemaRetrievalException(String str) {
            super(str);
        }
    }

    public SyncTableActionBase(String str, String str2, String str3, Map<String, String> map, Map<String, String> map2, SyncJobHandler.SourceType sourceType) {
        super(str, str2, map, map2, new SyncJobHandler(sourceType, map2, str2, str3));
        this.partitionKeys = new ArrayList();
        this.primaryKeys = new ArrayList();
        this.computedColumnArgs = new ArrayList();
        this.computedColumns = new ArrayList();
        this.table = str3;
    }

    public SyncTableActionBase withPartitionKeys(String... strArr) {
        return withPartitionKeys(Arrays.asList(strArr));
    }

    public SyncTableActionBase withPartitionKeys(List<String> list) {
        this.partitionKeys = list;
        return this;
    }

    public SyncTableActionBase withPrimaryKeys(String... strArr) {
        return withPrimaryKeys(Arrays.asList(strArr));
    }

    public SyncTableActionBase withPrimaryKeys(List<String> list) {
        this.primaryKeys = list;
        return this;
    }

    public SyncTableActionBase withComputedColumnArgs(List<String> list) {
        this.computedColumnArgs = list;
        return this;
    }

    protected abstract Schema retrieveSchema() throws Exception;

    protected Schema buildPaimonSchema(Schema schema) {
        return CdcActionCommonUtils.buildPaimonSchema(this.table, this.partitionKeys, this.primaryKeys, this.computedColumns, this.tableConfig, schema, this.metadataConverters, this.caseSensitive, true);
    }

    @Override // org.apache.paimon.flink.action.cdc.SynchronizationActionBase
    protected void validateCaseSensitivity() {
        AbstractCatalog.validateCaseInsensitive(this.caseSensitive, "Database", new String[]{this.database});
        AbstractCatalog.validateCaseInsensitive(this.caseSensitive, "Table", new String[]{this.table});
    }

    @Override // org.apache.paimon.flink.action.cdc.SynchronizationActionBase
    protected void beforeBuildingSourceSink() throws Exception {
        Identifier identifier = new Identifier(this.database, this.table);
        if (!this.catalog.tableExists(identifier)) {
            Schema retrieveSchema = retrieveSchema();
            this.computedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, retrieveSchema.fields());
            this.catalog.createTable(identifier, buildPaimonSchema(retrieveSchema), false);
            this.fileStoreTable = this.catalog.getTable(identifier).copy(this.tableConfig);
            return;
        }
        this.fileStoreTable = this.catalog.getTable(identifier).copy(this.tableConfig);
        try {
            Schema retrieveSchema2 = retrieveSchema();
            this.computedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, retrieveSchema2.fields());
            CdcActionCommonUtils.assertSchemaCompatible(this.fileStoreTable.schema(), buildPaimonSchema(retrieveSchema2).fields());
        } catch (SchemaRetrievalException e) {
            LOG.info("Failed to retrieve schema from record data but there exists specified Paimon table. Schema compatibility check will be skipped. If you have specified computed columns, here will use the existed Paimon table schema to build them. Please make sure the Paimon table has defined all the argument columns used for computed columns.");
            this.computedColumns = ComputedColumnUtils.buildComputedColumns(this.computedColumnArgs, this.fileStoreTable.schema().fields(), this.caseSensitive);
            checkConstraints();
        }
    }

    @Override // org.apache.paimon.flink.action.cdc.SynchronizationActionBase
    protected FlatMapFunction<String, RichCdcMultiplexRecord> recordParse() {
        return this.syncJobHandler.provideRecordParser(this.caseSensitive, this.computedColumns, this.typeMapping, this.metadataConverters);
    }

    @Override // org.apache.paimon.flink.action.cdc.SynchronizationActionBase
    protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory() {
        boolean z = this.caseSensitive;
        return () -> {
            return new RichCdcMultiplexRecordEventParser(z);
        };
    }

    @Override // org.apache.paimon.flink.action.cdc.SynchronizationActionBase
    protected void buildSink(DataStream<RichCdcMultiplexRecord> dataStream, EventParser.Factory<RichCdcMultiplexRecord> factory) {
        CdcSinkBuilder withCatalogLoader = new CdcSinkBuilder().withInput(dataStream).withParserFactory(factory).withTable(this.fileStoreTable).withIdentifier(new Identifier(this.database, this.table)).withCatalogLoader(catalogLoader());
        String str = this.tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        if (str != null) {
            withCatalogLoader.withParallelism(Integer.valueOf(Integer.parseInt(str)));
        }
        withCatalogLoader.build();
    }

    private void checkConstraints() {
        if (!this.partitionKeys.isEmpty()) {
            List partitionKeys = this.fileStoreTable.partitionKeys();
            Preconditions.checkState(partitionKeys.size() == this.partitionKeys.size() && partitionKeys.containsAll(this.partitionKeys), "Specified partition keys [%s] are not equal to the existed table partition keys [%s]. You should remove the --partition-keys argument or re-create the table if the partition keys are wrong.", new Object[]{String.join(",", this.partitionKeys), String.join(",", partitionKeys)});
        }
        if (this.primaryKeys.isEmpty()) {
            return;
        }
        List primaryKeys = this.fileStoreTable.primaryKeys();
        Preconditions.checkState(primaryKeys.size() == this.primaryKeys.size() && primaryKeys.containsAll(this.primaryKeys), "Specified primary keys [%s] are not equal to the existed table primary keys [%s]. You should remove the --primary-keys argument or re-create the table if the primary keys are wrong.", new Object[]{String.join(",", this.primaryKeys), String.join(",", primaryKeys)});
    }

    @VisibleForTesting
    public FileStoreTable fileStoreTable() {
        return this.fileStoreTable;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1085108549:
                if (implMethodName.equals("lambda$buildEventParserFactory$d112e23d$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/cdc/EventParser$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/paimon/flink/sink/cdc/EventParser;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/action/cdc/SyncTableActionBase") && serializedLambda.getImplMethodSignature().equals("(Z)Lorg/apache/paimon/flink/sink/cdc/EventParser;")) {
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(0)).booleanValue();
                    return () -> {
                        return new RichCdcMultiplexRecordEventParser(booleanValue);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
