package org.apache.paimon.flink.action.cdc.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.class */
public class KafkaSyncTableActionITCase extends KafkaActionITCaseBase {
    /* JADX INFO: Access modifiers changed from: protected */
    public void runSingleTableSchemaEvolution(String str, String str2) throws Exception {
        createTestTopic("schema_evolution", 1, 1);
        writeRecordsToKafka("schema_evolution", "kafka/%s/table/%s/%s-data-1.txt", str2, str, str2);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), str2 + "-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "schema_evolution");
        runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys("id").withTableConfig(getBasicTableConfig()).build());
        testSchemaEvolutionImpl("schema_evolution", str, str2);
    }

    private void testSchemaEvolutionImpl(String str, String str2, String str3) throws Exception {
        FileStoreTable fileStoreTable = getFileStoreTable(this.tableName);
        RowType of = RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"});
        List<String> singletonList = Collections.singletonList("id");
        waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]"), fileStoreTable, of, singletonList);
        writeRecordsToKafka(str, "kafka/%s/table/%s/%s-data-2.txt", str3, str2, str3);
        waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]", "+I[102, car battery, 12V car battery, 8.1, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight", "age"}), singletonList);
        writeRecordsToKafka(str, "kafka/%s/table/%s/%s-data-3.txt", str3, str2, str3);
        waitForResult(Arrays.asList("+I[102, car battery, 12V car battery, 8.1, NULL, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18, NULL]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 24, NULL]", "+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL, Beijing]", "+I[107, rocks, box of assorted rocks, 5.3, NULL, NULL]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight", "age", "address"}), singletonList);
    }

    public void testNotSupportFormat(String str) throws Exception {
        createTestTopic("not_support", 1, 1);
        writeRecordsToKafka("not_support", "kafka/%s/table/schemaevolution/%s-data-1.txt", str, str);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "togg-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "not_support");
        KafkaSyncTableAction build = syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys("id").withTableConfig(getBasicTableConfig()).build();
        build.getClass();
        Assertions.assertThatThrownBy(build::run).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class, "This format: togg-json is not supported.")});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testAssertSchemaCompatible(String str) throws Exception {
        createTestTopic("assert_schema_compatible", 1, 1);
        writeRecordsToKafka("assert_schema_compatible", "kafka/%s/table/schemaevolution/%s-data-1.txt", str, str);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), str + "-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "assert_schema_compatible");
        createFileStoreTable(RowType.of(new DataType[]{DataTypes.STRING(), DataTypes.STRING()}, new String[]{"k", "v1"}), Collections.emptyList(), Collections.singletonList("k"), Collections.emptyMap());
        KafkaSyncTableAction build = syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys("id").withTableConfig(getBasicTableConfig()).build();
        build.getClass();
        Assertions.assertThatThrownBy(build::run).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, "Paimon schema and source table schema are not compatible.\nPaimon fields are: [`k` STRING NOT NULL, `v1` STRING].\nSource table fields are: [`id` STRING NOT NULL, `name` STRING, `description` STRING, `weight` STRING]")});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testStarUpOptionSpecific(String str) throws Exception {
        createTestTopic("start_up_specific", 1, 1);
        writeRecordsToKafka("start_up_specific", "kafka/%s/table/startupmode/%s-data-1.txt", str, str);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), str + "-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_specific");
        basicKafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.SPECIFIC_OFFSETS.toString());
        basicKafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS.key(), "partition:0,offset:1");
        runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys("id").withTableConfig(getBasicTableConfig()).build());
        waitForResult(Collections.singletonList("+I[102, car battery, 12V car battery, 8.1]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testStarUpOptionLatest(String str) throws Exception {
        createTestTopic("start_up_latest", 1, 1);
        writeRecordsToKafka("start_up_latest", true, "kafka/%s/table/startupmode/%s-data-1.txt", str, str);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), str + "-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_latest");
        basicKafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.LATEST_OFFSET.toString());
        runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys("id").withTableConfig(getBasicTableConfig()).build());
        Thread.sleep(5000L);
        FileStoreTable fileStoreTable = getFileStoreTable(this.tableName);
        writeRecordsToKafka("start_up_latest", "kafka/%s/table/startupmode/%s-data-2.txt", str, str);
        waitForResult(Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
    }

    public void testStarUpOptionTimestamp(String str) throws Exception {
        createTestTopic("start_up_timestamp", 1, 1);
        writeRecordsToKafka("start_up_timestamp", true, "kafka/%s/table/startupmode/%s-data-1.txt", str, str);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), str + "-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_timestamp");
        basicKafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.TIMESTAMP.toString());
        basicKafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis()));
        runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys("id").withTableConfig(getBasicTableConfig()).build());
        writeRecordsToKafka("start_up_timestamp", "kafka/%s/table/startupmode/%s-data-2.txt", str, str);
        waitForResult(Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
    }

    public void testStarUpOptionEarliest(String str) throws Exception {
        createTestTopic("start_up_earliest", 1, 1);
        writeRecordsToKafka("start_up_earliest", "kafka/%s/table/startupmode/%s-data-1.txt", str, str);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), str + "-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_earliest");
        basicKafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.EARLIEST_OFFSET.toString());
        runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys("id").withTableConfig(getBasicTableConfig()).build());
        writeRecordsToKafka("start_up_earliest", "kafka/%s/table/startupmode/%s-data-2.txt", str, str);
        waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
    }

    public void testStarUpOptionGroup(String str) throws Exception {
        createTestTopic("start_up_group", 1, 1);
        writeRecordsToKafka("start_up_group", "kafka/%s/table/startupmode/%s-data-1.txt", str, str);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), str + "-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "start_up_group");
        basicKafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.GROUP_OFFSETS.toString());
        runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys("id").withTableConfig(getBasicTableConfig()).build());
        writeRecordsToKafka("start_up_group", "kafka/%s/table/startupmode/%s-data-2.txt", str, str);
        waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
    }

    public void testComputedColumn(String str) throws Exception {
        createTestTopic("computed_column", 1, 1);
        writeRecordsToKafka("computed_column", "kafka/%s/table/computedcolumn/%s-data-1.txt", str, str);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), str + "-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "computed_column");
        runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPartitionKeys("_year").withPrimaryKeys("_id", "_year").withComputedColumnArgs("_year=year(_date)").withTableConfig(getBasicTableConfig()).build());
        waitForResult(Collections.singletonList("+I[101, 2023-03-23, 2023]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.INT().notNull()}, new String[]{"_id", "_date", "_year"}), Arrays.asList("_id", "_year"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testCDCOperations(String str) throws Exception {
        createTestTopic("event", 1, 1);
        writeRecordsToKafka("event", "kafka/%s/table/event/event-insert.txt", str);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), "ogg-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "event");
        runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys("id").withTableConfig(getBasicTableConfig()).build());
        FileStoreTable fileStoreTable = getFileStoreTable(this.tableName);
        List<String> singletonList = Collections.singletonList("id");
        RowType of = RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"});
        waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]", "+I[103, scooter, Big 2-wheel scooter , 5.1]"), fileStoreTable, of, singletonList);
        writeRecordsToKafka("event", "kafka/%s/table/event/event-update.txt", str);
        waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]", "+I[103, scooter, Big 2-wheel scooter , 8.1]"), fileStoreTable, of, singletonList);
        writeRecordsToKafka("event", "kafka/%s/table/event/event-delete.txt", str);
        waitForResult(Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]"), fileStoreTable, of, singletonList);
    }

    public void testKafkaBuildSchemaWithDelete(String str) throws Exception {
        createTestTopic("test_kafka_schema", 1, 1);
        writeRecordsToKafka("test_kafka_schema", "kafka/%s/table/schema/schemaevolution/%s-data-4.txt", str, str);
        Configuration fromMap = Configuration.fromMap(getBasicKafkaConfig());
        fromMap.setString(KafkaConnectorOptions.VALUE_FORMAT.key(), str + "-json");
        fromMap.setString(KafkaConnectorOptions.TOPIC.key(), "test_kafka_schema");
        Schema schema = MessageQueueSchemaUtils.getSchema(KafkaActionUtils.getKafkaEarliestConsumer(fromMap), KafkaActionUtils.getDataFormat(fromMap), TypeMapping.defaultMapping());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new DataField(0, "id", DataTypes.STRING()));
        arrayList.add(new DataField(1, "name", DataTypes.STRING()));
        arrayList.add(new DataField(2, "description", DataTypes.STRING()));
        arrayList.add(new DataField(3, "weight", DataTypes.STRING()));
        Assertions.assertThat(schema.fields()).isEqualTo(arrayList);
    }

    public void testWaterMarkSyncTable(String str) throws Exception {
        createTestTopic("watermark", 1, 1);
        writeRecordsToKafka("watermark", "kafka/%s/table/watermark/%s-data-1.txt", str, str);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), str + "-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "watermark");
        Map<String, String> basicTableConfig = getBasicTableConfig();
        basicTableConfig.put("tag.automatic-creation", "watermark");
        basicTableConfig.put("tag.creation-period", "hourly");
        basicTableConfig.put("scan.watermark.alignment.group", "alignment-group-1");
        basicTableConfig.put("scan.watermark.alignment.max-drift", "20 s");
        basicTableConfig.put("scan.watermark.alignment.update-interval", "1 s");
        runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withTableConfig(basicTableConfig).build());
        FileStoreTable table = this.catalog.getTable(new Identifier(this.database, this.tableName));
        while (true) {
            if (table.snapshotManager().snapshotCount() > 0 && table.snapshotManager().latestSnapshot().watermark().longValue() != Long.MIN_VALUE) {
                return;
            } else {
                Thread.sleep(1000L);
            }
        }
    }

    public void testSchemaIncludeRecord(String str) throws Exception {
        createTestTopic("schema_include", 1, 1);
        writeRecordsToKafka("schema_include", "kafka/debezium/table/schema/include/debezium-data-1.txt", new Object[0]);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), str + "-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "schema_include");
        runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys("id").withTableConfig(getBasicTableConfig()).build());
        waitForResult(Collections.singletonList("+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DOUBLE()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
    }

    public void testAllTypesWithSchemaImpl(String str) throws Exception {
        createTestTopic("schema_include_all_type", 1, 1);
        writeRecordsToKafka("schema_include_all_type", "kafka/debezium/table/schema/alltype/debezium-data-1.txt", new Object[0]);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), str + "-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "schema_include_all_type");
        runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").withTableConfig(getBasicTableConfig()).build());
        waitingTables(this.tableName);
        waitForResult(Collections.singletonList("+I[1, 1.1, " + String.format("true, %s, ", Arrays.toString(new byte[]{0, 0, 0, 0, 0, 0, 7, -57})) + "1, 1, 0, 1, 2, 3, 1000, 2000, 3000, 100000, 200000, 300000, 1000000, 2000000, 3000000, 10000000000, 20000000000, 30000000000, 40000000000, 1.5, 2.5, 3.5, 1.000001, 2.000002, 3.000003, 1.000011, 2.000022, 3.000033, 1.000111, 2.000222, 3.000333, 12345.110, 12345.220, 12345.330, 123456789876543212345678987654321.11, 123456789876543212345678987654321.22, 123456789876543212345678987654321.33, 11111, 22222, 33333, 2222222222222222300000001111.1234567890, 19439, 2023-03-23T14:30:05, 2023-03-23T14:30:05.123, 2023-03-23T14:30:05.123456, 2023-03-24T14:30, 2023-03-24T14:30:05.120, 2023-03-23T22:00:10.123456, 2023-03-23T07:10, Paimon, Apache Paimon, Apache Paimon MySQL TINYTEXT Test Data, Apache Paimon MySQL Test Data, Apache Paimon MySQL MEDIUMTEXT Test Data, Apache Paimon MySQL Long Test Data, [98, 121, 116, 101, 115, 0, 0, 0, 0, 0], [109, 111, 114, 101, 32, 98, 121, 116, 101, 115], [84, 73, 78, 89, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [77, 69, 68, 73, 85, 77, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [76, 79, 78, 71, 66, 76, 79, 66, 32, 32, 98, 121, 116, 101, 115, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], {\"a\": \"b\"}, value1, 2023, 36803000, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}, {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}, a,b]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.DECIMAL(2, 1).notNull(), DataTypes.BOOLEAN(), DataTypes.BINARY(8), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.DECIMAL(20, 0), DataTypes.DECIMAL(20, 0), DataTypes.DECIMAL(20, 0), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DECIMAL(8, 3), DataTypes.DECIMAL(8, 3), DataTypes.DECIMAL(8, 3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DECIMAL(8, 0), DataTypes.DECIMAL(8, 0), DataTypes.DECIMAL(8, 0), DataTypes.DECIMAL(38, 10), DataTypes.DATE(), DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP(6), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT(), DataTypes.TIME(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"_id", "pt", "_bit1", "_bit", "_tinyint1", "_boolean", "_bool", "_tinyint", "_tinyint_unsigned", "_tinyint_unsigned_zerofill", "_smallint", "_smallint_unsigned", "_smallint_unsigned_zerofill", "_mediumint", "_mediumint_unsigned", "_mediumint_unsigned_zerofill", "_int", "_int_unsigned", "_int_unsigned_zerofill", "_bigint", "_bigint_unsigned", "_bigint_unsigned_zerofill", "_serial", "_float", "_float_unsigned", "_float_unsigned_zerofill", "_real", "_real_unsigned", "_real_unsigned_zerofill", "_double", "_double_unsigned", "_double_unsigned_zerofill", "_double_precision", "_double_precision_unsigned", "_double_precision_unsigned_zerofill", "_numeric", "_numeric_unsigned", "_numeric_unsigned_zerofill", "_fixed", "_fixed_unsigned", "_fixed_unsigned_zerofill", "_decimal", "_decimal_unsigned", "_decimal_unsigned_zerofill", "_big_decimal", "_date", "_datetime", "_datetime3", "_datetime6", "_datetime_p", "_datetime_p2", "_timestamp", "_timestamp0", "_char", "_varchar", "_tinytext", "_text", "_mediumtext", "_longtext", "_bin", "_varbin", "_tinyblob", "_blob", "_mediumblob", "_longblob", "_json", "_enum", "_year", "_time", "_point", "_geometry", "_linestring", "_polygon", "_multipoint", "_multiline", "_multipolygon", "_geometrycollection", "_set"}), Arrays.asList("pt", "_id"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testTableFiledValNull(String str) throws Exception {
        createTestTopic("table_filed_val_null", 1, 1);
        writeRecordsToKafka("table_filed_val_null", "kafka/%s/table/schemaevolution/%s-data-4.txt", str, str);
        Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
        basicKafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), str + "-json");
        basicKafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "table_filed_val_null");
        basicKafkaConfig.put(KafkaConnectorOptions.SCAN_STARTUP_MODE.key(), KafkaConnectorOptions.ScanStartupMode.EARLIEST_OFFSET.toString());
        runActionWithDefaultEnv(syncTableActionBuilder(basicKafkaConfig).withPrimaryKeys("id").withTableConfig(getBasicTableConfig()).build());
        Thread.sleep(5000L);
        waitForResult(Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, null]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"id", "name", "description", "weight"}), Collections.singletonList("id"));
    }
}
