package org.apache.paimon.flink.action.cdc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.class */
public class CdcActionITCaseBase extends ActionITCaseBase {
    private static final Logger LOG = LoggerFactory.getLogger(CdcActionITCaseBase.class);
    protected StreamExecutionEnvironment env;

    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/CdcActionITCaseBase$SyncDatabaseActionBuilder.class */
    protected abstract class SyncDatabaseActionBuilder<T extends SynchronizationActionBase> {
        private final Class<T> clazz;
        private final Map<String, String> sourceConfig;

        @Nullable
        private Boolean ignoreIncompatible;

        @Nullable
        private Boolean mergeShards;

        @Nullable
        private String tablePrefix;

        @Nullable
        private String tableSuffix;

        @Nullable
        private String includingTables;

        @Nullable
        private String excludingTables;

        @Nullable
        private String mode;
        private Map<String, String> catalogConfig = Collections.emptyMap();
        private Map<String, String> tableConfig = Collections.emptyMap();
        private final List<String> typeMappingModes = new ArrayList();
        private final List<String> metadataColumn = new ArrayList();

        public SyncDatabaseActionBuilder(Class<T> cls, Map<String, String> map) {
            this.clazz = cls;
            this.sourceConfig = map;
        }

        public SyncDatabaseActionBuilder<T> withCatalogConfig(Map<String, String> map) {
            this.catalogConfig = map;
            return this;
        }

        public SyncDatabaseActionBuilder<T> withTableConfig(Map<String, String> map) {
            this.tableConfig = map;
            return this;
        }

        public SyncDatabaseActionBuilder<T> ignoreIncompatible(boolean z) {
            this.ignoreIncompatible = Boolean.valueOf(z);
            return this;
        }

        public SyncDatabaseActionBuilder<T> mergeShards(boolean z) {
            this.mergeShards = Boolean.valueOf(z);
            return this;
        }

        public SyncDatabaseActionBuilder<T> withTablePrefix(String str) {
            this.tablePrefix = str;
            return this;
        }

        public SyncDatabaseActionBuilder<T> withTableSuffix(String str) {
            this.tableSuffix = str;
            return this;
        }

        public SyncDatabaseActionBuilder<T> includingTables(String str) {
            this.includingTables = str;
            return this;
        }

        public SyncDatabaseActionBuilder<T> excludingTables(String str) {
            this.excludingTables = str;
            return this;
        }

        public SyncDatabaseActionBuilder<T> withMode(String str) {
            this.mode = str;
            return this;
        }

        public SyncDatabaseActionBuilder<T> withTypeMappingModes(String... strArr) {
            this.typeMappingModes.addAll(Arrays.asList(strArr));
            return this;
        }

        public SyncDatabaseActionBuilder<T> withMetadataColumn(List<String> list) {
            this.metadataColumn.addAll(list);
            return this;
        }

        public T build() {
            ArrayList arrayList = new ArrayList(Arrays.asList(CdcActionITCaseBase.this.getActionName(this.clazz), "--warehouse", CdcActionITCaseBase.this.warehouse, "--database", CdcActionITCaseBase.this.database));
            arrayList.addAll(CdcActionITCaseBase.this.mapToArgs(CdcActionITCaseBase.this.getConfKey(this.clazz), this.sourceConfig));
            arrayList.addAll(CdcActionITCaseBase.this.mapToArgs("--catalog-conf", this.catalogConfig));
            arrayList.addAll(CdcActionITCaseBase.this.mapToArgs("--table-conf", this.tableConfig));
            arrayList.addAll(CdcActionITCaseBase.this.nullableToArgs("--ignore-incompatible", this.ignoreIncompatible));
            arrayList.addAll(CdcActionITCaseBase.this.nullableToArgs("--merge-shards", this.mergeShards));
            arrayList.addAll(CdcActionITCaseBase.this.nullableToArgs("--table-prefix", this.tablePrefix));
            arrayList.addAll(CdcActionITCaseBase.this.nullableToArgs("--table-suffix", this.tableSuffix));
            arrayList.addAll(CdcActionITCaseBase.this.nullableToArgs("--including-tables", this.includingTables));
            arrayList.addAll(CdcActionITCaseBase.this.nullableToArgs("--excluding-tables", this.excludingTables));
            arrayList.addAll(CdcActionITCaseBase.this.nullableToArgs("--mode", this.mode));
            arrayList.addAll(CdcActionITCaseBase.this.listToArgs("--type-mapping", this.typeMappingModes));
            arrayList.addAll(CdcActionITCaseBase.this.listToArgs("--metadata-column", this.metadataColumn));
            return CdcActionITCaseBase.this.createAction(this.clazz, arrayList);
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/CdcActionITCaseBase$SyncTableActionBuilder.class */
    protected abstract class SyncTableActionBuilder<T extends SynchronizationActionBase> {
        private final Class<T> clazz;
        private final Map<String, String> sourceConfig;
        private Map<String, String> catalogConfig = Collections.emptyMap();
        private Map<String, String> tableConfig = Collections.emptyMap();
        private final List<String> partitionKeys = new ArrayList();
        private final List<String> primaryKeys = new ArrayList();
        private final List<String> computedColumnArgs = new ArrayList();
        private final List<String> typeMappingModes = new ArrayList();
        private final List<String> metadataColumns = new ArrayList();

        public SyncTableActionBuilder(Class<T> cls, Map<String, String> map) {
            this.clazz = cls;
            this.sourceConfig = map;
        }

        public SyncTableActionBuilder<T> withCatalogConfig(Map<String, String> map) {
            this.catalogConfig = map;
            return this;
        }

        public SyncTableActionBuilder<T> withTableConfig(Map<String, String> map) {
            this.tableConfig = map;
            return this;
        }

        public SyncTableActionBuilder<T> withPartitionKeys(String... strArr) {
            this.partitionKeys.addAll(Arrays.asList(strArr));
            return this;
        }

        public SyncTableActionBuilder<T> withPrimaryKeys(String... strArr) {
            this.primaryKeys.addAll(Arrays.asList(strArr));
            return this;
        }

        public SyncTableActionBuilder<T> withComputedColumnArgs(String... strArr) {
            return withComputedColumnArgs(Arrays.asList(strArr));
        }

        public SyncTableActionBuilder<T> withComputedColumnArgs(List<String> list) {
            this.computedColumnArgs.addAll(list);
            return this;
        }

        public SyncTableActionBuilder<T> withTypeMappingModes(String... strArr) {
            this.typeMappingModes.addAll(Arrays.asList(strArr));
            return this;
        }

        public SyncTableActionBuilder<T> withMetadataColumns(String... strArr) {
            this.metadataColumns.addAll(Arrays.asList(strArr));
            return this;
        }

        public T build() {
            ArrayList arrayList = new ArrayList(Arrays.asList(CdcActionITCaseBase.this.getActionName(this.clazz), "--warehouse", CdcActionITCaseBase.this.warehouse, "--database", CdcActionITCaseBase.this.database, "--table", CdcActionITCaseBase.this.tableName));
            arrayList.addAll(CdcActionITCaseBase.this.mapToArgs(CdcActionITCaseBase.this.getConfKey(this.clazz), this.sourceConfig));
            arrayList.addAll(CdcActionITCaseBase.this.mapToArgs("--catalog-conf", this.catalogConfig));
            arrayList.addAll(CdcActionITCaseBase.this.mapToArgs("--table-conf", this.tableConfig));
            arrayList.addAll(CdcActionITCaseBase.this.listToArgs("--partition-keys", this.partitionKeys));
            arrayList.addAll(CdcActionITCaseBase.this.listToArgs("--primary-keys", this.primaryKeys));
            arrayList.addAll(CdcActionITCaseBase.this.listToArgs("--type-mapping", this.typeMappingModes));
            arrayList.addAll(CdcActionITCaseBase.this.listToMultiArgs("--computed-column", this.computedColumnArgs));
            arrayList.addAll(CdcActionITCaseBase.this.listToMultiArgs("--metadata-column", this.metadataColumns));
            return CdcActionITCaseBase.this.createAction(this.clazz, arrayList);
        }
    }

    @BeforeEach
    public void setEnv() {
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.env.setParallelism(2);
        this.env.enableCheckpointing(1000L);
        this.env.setRestartStrategy(RestartStrategies.noRestart());
    }

    @AfterEach
    public void closeEnv() throws Exception {
        this.env.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitingTables(String... strArr) throws Exception {
        waitingTables(Arrays.asList(strArr));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitingTables(List<String> list) throws Exception {
        LOG.info("Waiting for tables '{}'", list);
        while (!this.catalog.listTables(this.database).containsAll(list)) {
            Thread.sleep(100L);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertExactlyExistTables(List<String> list) throws Exception {
        assertExactlyExistTables((String[]) list.toArray(new String[0]));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertExactlyExistTables(String... strArr) throws Exception {
        Assertions.assertThat(this.catalog.listTables(this.database)).containsExactlyInAnyOrder(strArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertTableNotExists(List<String> list) throws Exception {
        assertTableNotExists((String[]) list.toArray(new String[0]));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertTableNotExists(String... strArr) throws Exception {
        Assertions.assertThat(this.catalog.listTables(this.database)).doesNotContain(strArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForResult(List<String> list, FileStoreTable fileStoreTable, RowType rowType, List<String> list2) throws Exception {
        Assertions.assertThat(fileStoreTable.schema().primaryKeys()).isEqualTo(list2);
        while (true) {
            if (rowType.getFieldCount() == fileStoreTable.schema().fields().size()) {
                int i = 0;
                for (int i2 = 0; i2 < fileStoreTable.schema().fields().size(); i2++) {
                    DataField dataField = (DataField) fileStoreTable.schema().fields().get(i2);
                    boolean equals = dataField.name().equals(rowType.getFieldNames().get(i2));
                    boolean equals2 = dataField.type().equals(rowType.getFieldTypes().get(i2));
                    if (equals && equals2) {
                        i++;
                    }
                }
                if (i == rowType.getFieldCount()) {
                    break;
                }
            }
            fileStoreTable = fileStoreTable.copyWithLatestSchema();
            Thread.sleep(1000L);
        }
        ArrayList arrayList = new ArrayList(list);
        Collections.sort(arrayList);
        while (true) {
            ReadBuilder newReadBuilder = fileStoreTable.newReadBuilder();
            TableScan.Plan plan = newReadBuilder.newScan().plan();
            ArrayList arrayList2 = new ArrayList(getResult(newReadBuilder.newRead(), plan == null ? Collections.emptyList() : plan.splits(), rowType));
            Collections.sort(arrayList2);
            if (arrayList.equals(arrayList2)) {
                return;
            } else {
                Thread.sleep(1000L);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> getBasicTableConfig() {
        HashMap hashMap = new HashMap();
        ThreadLocalRandom current = ThreadLocalRandom.current();
        hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
        hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
        return hashMap;
    }

    protected List<String> mapToArgs(String str, Map<String, String> map) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            arrayList.add(str);
            arrayList.add(String.format("%s=%s", entry.getKey(), entry.getValue()));
        }
        return arrayList;
    }

    protected List<String> listToArgs(String str, List<String> list) {
        return list.isEmpty() ? Collections.emptyList() : Arrays.asList(str, String.join(",", list));
    }

    protected List<String> listToMultiArgs(String str, List<String> list) {
        ArrayList arrayList = new ArrayList();
        for (String str2 : list) {
            arrayList.add(str);
            arrayList.add(str2);
        }
        return arrayList;
    }

    protected <T> List<String> nullableToArgs(String str, @Nullable T t) {
        return t == null ? Collections.emptyList() : Arrays.asList(str, t.toString());
    }

    public JobClient runActionWithDefaultEnv(ActionBase actionBase) throws Exception {
        actionBase.withStreamExecutionEnvironment(this.env).build();
        JobClient executeAsync = this.env.executeAsync();
        waitJobRunning(executeAsync);
        return executeAsync;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitJobRunning(JobClient jobClient) throws Exception {
        while (((JobStatus) jobClient.getJobStatus().get()) != JobStatus.RUNNING) {
            Thread.sleep(1000L);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> String getActionName(Class<T> cls) {
        String simpleName = cls.getSimpleName();
        boolean z = -1;
        switch (simpleName.hashCode()) {
            case -1996599554:
                if (simpleName.equals("MongoDBSyncDatabaseAction")) {
                    z = 5;
                    break;
                }
                break;
            case -1896830804:
                if (simpleName.equals("PulsarSyncTableAction")) {
                    z = 6;
                    break;
                }
                break;
            case -1324386473:
                if (simpleName.equals("MongoDBSyncTableAction")) {
                    z = 4;
                    break;
                }
                break;
            case -370531959:
                if (simpleName.equals("PulsarSyncDatabaseAction")) {
                    z = 7;
                    break;
                }
                break;
            case -171393870:
                if (simpleName.equals("KafkaSyncDatabaseAction")) {
                    z = 3;
                    break;
                }
                break;
            case -120650205:
                if (simpleName.equals("KafkaSyncTableAction")) {
                    z = 2;
                    break;
                }
                break;
            case 242733383:
                if (simpleName.equals("MySqlSyncTableAction")) {
                    z = false;
                    break;
                }
                break;
            case 1057471792:
                if (simpleName.equals("PostgresSyncTableAction")) {
                    z = 8;
                    break;
                }
                break;
            case 2071490318:
                if (simpleName.equals("MySqlSyncDatabaseAction")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return "mysql_sync_table";
            case true:
                return "mysql_sync_database";
            case true:
                return "kafka_sync_table";
            case true:
                return "kafka_sync_database";
            case true:
                return "mongodb_sync_table";
            case true:
                return "mongodb_sync_database";
            case true:
                return "pulsar_sync_table";
            case true:
                return "pulsar_sync_database";
            case true:
                return "postgres_sync_table";
            default:
                throw new UnsupportedOperationException("Unknown sync action: " + cls.getSimpleName());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> String getConfKey(Class<T> cls) {
        String simpleName = cls.getSimpleName();
        boolean z = -1;
        switch (simpleName.hashCode()) {
            case -1996599554:
                if (simpleName.equals("MongoDBSyncDatabaseAction")) {
                    z = 5;
                    break;
                }
                break;
            case -1896830804:
                if (simpleName.equals("PulsarSyncTableAction")) {
                    z = 6;
                    break;
                }
                break;
            case -1324386473:
                if (simpleName.equals("MongoDBSyncTableAction")) {
                    z = 4;
                    break;
                }
                break;
            case -370531959:
                if (simpleName.equals("PulsarSyncDatabaseAction")) {
                    z = 7;
                    break;
                }
                break;
            case -171393870:
                if (simpleName.equals("KafkaSyncDatabaseAction")) {
                    z = 3;
                    break;
                }
                break;
            case -120650205:
                if (simpleName.equals("KafkaSyncTableAction")) {
                    z = 2;
                    break;
                }
                break;
            case 242733383:
                if (simpleName.equals("MySqlSyncTableAction")) {
                    z = false;
                    break;
                }
                break;
            case 1057471792:
                if (simpleName.equals("PostgresSyncTableAction")) {
                    z = 8;
                    break;
                }
                break;
            case 2071490318:
                if (simpleName.equals("MySqlSyncDatabaseAction")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
            case true:
                return "--mysql_conf";
            case true:
            case true:
                return "--kafka_conf";
            case true:
            case true:
                return "--mongodb_conf";
            case true:
            case true:
                return "--pulsar_conf";
            case true:
                return "--postgres_conf";
            default:
                throw new UnsupportedOperationException("Unknown sync action: " + cls.getSimpleName());
        }
    }
}
