package org.apache.flink.cdc.connectors.oracle.table;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.RateLimiter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.lifecycle.Startables;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.class */
public class OracleConnectorITCase {
    private static final int RECORDS_COUNT = 10000;
    private static final int WORKERS_COUNT = 4;
    private static final Logger LOG = LoggerFactory.getLogger(OracleConnectorITCase.class);
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create(this.env, EnvironmentSettings.newInstance().inStreamingMode().build());
    private final boolean parallelismSnapshot;

    public OracleConnectorITCase(boolean z) {
        this.parallelismSnapshot = z;
    }

    @Parameterized.Parameters(name = "parallelismSnapshot: {0}")
    public static Object[] parameters() {
        return new Object[]{new Object[]{true}, new Object[]{false}};
    }

    @Before
    public void before() throws Exception {
        LOG.info("Starting containers...");
        Startables.deepStart(Stream.of(OracleSourceTestBase.ORACLE_CONTAINER)).join();
        LOG.info("Containers are started.");
        TestValuesTableFactory.clearAllData();
        if (!this.parallelismSnapshot) {
            this.env.setParallelism(1);
        } else {
            this.env.setParallelism(WORKERS_COUNT);
            this.env.enableCheckpointing(200L);
        }
    }

    @After
    public void teardown() {
        OracleSourceTestBase.ORACLE_CONTAINER.stop();
    }

    @Test
    public void testConsumingAllEvents() throws Exception {
        OracleSourceTestBase.createAndInitialize("product.sql");
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10,3)) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'debezium.log.mining.strategy' = 'online_catalog', 'debezium.database.history.store.only.captured.tables.ddl' = 'true', 'scan.incremental.snapshot.chunk.size' = '2', 'database-name' = 'ORCLCDB', 'schema-name' = '%s', 'table-name' = '%s')", OracleSourceTestBase.ORACLE_CONTAINER.getHost(), OracleSourceTestBase.ORACLE_CONTAINER.getOraclePort(), OracleSourceTestBase.CONNECTOR_USER, "dbz", Boolean.valueOf(this.parallelismSnapshot), OracleSourceTestBase.TEST_USER, "products"));
        this.tEnv.executeSql("CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME");
        waitForSinkSize("sink", 9);
        Connection jdbcConnection = OracleSourceTestBase.getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("ALTER TABLE debezium.products ADD volume FLOAT");
                    createStatement.execute("UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106");
                    createStatement.execute("UPDATE debezium.products SET WEIGHT=5.1 WHERE ID=107");
                    createStatement.execute("INSERT INTO debezium.products VALUES (111,'jacket','water resistent white wind breaker',0.2, 1.2)");
                    createStatement.execute("INSERT INTO debezium.products VALUES (112,'scooter','Big 2-wheel scooter ',5.18, 2.2)");
                    createStatement.execute("UPDATE debezium.products SET DESCRIPTION='new water resistent white wind breaker', WEIGHT=0.5 WHERE ID=111");
                    createStatement.execute("UPDATE debezium.products SET WEIGHT=5.17 WHERE ID=112");
                    createStatement.execute("DELETE FROM debezium.products WHERE ID=112");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    waitForSinkSize("sink", 20);
                    OracleSourceTestBase.assertEqualsInAnyOrder(Arrays.asList("+I[scooter, 3.140]", "+I[car battery, 8.100]", "+I[12-pack drill bits, 0.800]", "+I[hammer, 2.625]", "+I[rocks, 5.100]", "+I[jacket, 0.600]", "+I[spare tire, 22.200]"), TestValuesTableFactory.getResults("sink"));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    /* JADX WARN: Failed to calculate best type for var: r10v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r10v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r11v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r11v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r12v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r12v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r9v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r9v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 10, insn: 0x0153: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r10 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:152:0x0153 */
    /* JADX WARN: Not initialized variable reg: 11, insn: 0x00c7: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r11 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:134:0x00c7 */
    /* JADX WARN: Not initialized variable reg: 12, insn: 0x00cc: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r12 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:136:0x00cc */
    /* JADX WARN: Not initialized variable reg: 9, insn: 0x014f: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r9 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:150:0x014f */
    /* JADX WARN: Type inference failed for: r10v0, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r11v1, types: [java.sql.Connection] */
    /* JADX WARN: Type inference failed for: r12v0, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r9v1, types: [java.sql.Statement] */
    @Test
    public void testSkipNestedTables() throws Exception {
        ?? r9;
        ?? r10;
        ?? r11;
        ?? r12;
        TableResult executeSql;
        Connection jdbcConnection;
        Throwable th;
        OracleSourceTestBase.createAndInitialize("product.sql");
        Connection jdbcConnection2 = OracleSourceTestBase.getJdbcConnection();
        Throwable th2 = null;
        try {
            try {
                Statement createStatement = jdbcConnection2.createStatement();
                Throwable th3 = null;
                try {
                    Connection jdbcConnectionAsDBA = OracleSourceTestBase.getJdbcConnectionAsDBA();
                    Throwable th4 = null;
                    Statement createStatement2 = jdbcConnectionAsDBA.createStatement();
                    Throwable th5 = null;
                    try {
                        try {
                            createStatement2.execute("GRANT CREATE ANY TYPE TO debezium");
                            if (createStatement2 != null) {
                                if (0 != 0) {
                                    try {
                                        createStatement2.close();
                                    } catch (Throwable th6) {
                                        th5.addSuppressed(th6);
                                    }
                                } else {
                                    createStatement2.close();
                                }
                            }
                            if (jdbcConnectionAsDBA != null) {
                                if (0 != 0) {
                                    try {
                                        jdbcConnectionAsDBA.close();
                                    } catch (Throwable th7) {
                                        th4.addSuppressed(th7);
                                    }
                                } else {
                                    jdbcConnectionAsDBA.close();
                                }
                            }
                            createStatement.execute("CREATE OR REPLACE TYPE debezium.embedding_table AS TABLE OF VARCHAR2(128);");
                            createStatement.execute("create table debezium.products_nested_table  ( id numeric(9,0) not null,  c1 int,  c2 debezium.embedding_table ,  primary key (id))  nested table c2 store as nested_embedding_table");
                            createStatement.execute("INSERT INTO debezium.products_nested_table VALUES (1, 25, debezium.embedding_table('test1'))");
                            createStatement.execute("INSERT INTO debezium.products_nested_table VALUES (2, 50, debezium.embedding_table('test2'))");
                            if (createStatement != null) {
                                if (0 != 0) {
                                    try {
                                        createStatement.close();
                                    } catch (Throwable th8) {
                                        th3.addSuppressed(th8);
                                    }
                                } else {
                                    createStatement.close();
                                }
                            }
                            this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10,3)) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'debezium.log.mining.strategy' = 'online_catalog', 'scan.incremental.snapshot.chunk.size' = '2', 'database-name' = 'ORCLCDB', 'schema-name' = '%s', 'table-name' = '%s')", OracleSourceTestBase.ORACLE_CONTAINER.getHost(), OracleSourceTestBase.ORACLE_CONTAINER.getOraclePort(), OracleSourceTestBase.CONNECTOR_USER, "dbz", Boolean.valueOf(this.parallelismSnapshot), OracleSourceTestBase.TEST_USER, "(products|products_nested_table)"));
                            this.tEnv.executeSql("CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
                            executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME");
                            waitForSinkSize("sink", 9);
                            jdbcConnection = OracleSourceTestBase.getJdbcConnection();
                            th = null;
                        } finally {
                        }
                        try {
                            createStatement2 = jdbcConnection.createStatement();
                            Throwable th9 = null;
                            try {
                                try {
                                    createStatement2.execute("UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106");
                                    createStatement2.execute("UPDATE debezium.products SET WEIGHT=5.1 WHERE ID=107");
                                    createStatement2.execute("INSERT INTO debezium.products VALUES (111,'jacket','water resistent white wind breaker',0.2)");
                                    createStatement2.execute("INSERT INTO debezium.products VALUES (112,'scooter','Big 2-wheel scooter ',5.18)");
                                    createStatement2.execute("UPDATE debezium.products SET DESCRIPTION='new water resistent white wind breaker', WEIGHT=0.5 WHERE ID=111");
                                    createStatement2.execute("UPDATE debezium.products SET WEIGHT=5.17 WHERE ID=112");
                                    createStatement2.execute("DELETE FROM debezium.products WHERE ID=112");
                                    if (createStatement2 != null) {
                                        if (0 != 0) {
                                            try {
                                                createStatement2.close();
                                            } catch (Throwable th10) {
                                                th9.addSuppressed(th10);
                                            }
                                        } else {
                                            createStatement2.close();
                                        }
                                    }
                                    waitForSinkSize("sink", 20);
                                    OracleSourceTestBase.assertEqualsInAnyOrder(Arrays.asList("+I[scooter, 3.140]", "+I[car battery, 8.100]", "+I[12-pack drill bits, 0.800]", "+I[hammer, 2.625]", "+I[rocks, 5.100]", "+I[jacket, 0.600]", "+I[spare tire, 22.200]"), TestValuesTableFactory.getResults("sink"));
                                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                                } finally {
                                }
                            } finally {
                            }
                        } finally {
                            if (jdbcConnection != null) {
                                if (0 != 0) {
                                    try {
                                        jdbcConnection.close();
                                    } catch (Throwable th11) {
                                        th.addSuppressed(th11);
                                    }
                                } else {
                                    jdbcConnection.close();
                                }
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th12) {
                    if (r11 != 0) {
                        if (r12 != 0) {
                            try {
                                r11.close();
                            } catch (Throwable th13) {
                                r12.addSuppressed(th13);
                            }
                        } else {
                            r11.close();
                        }
                    }
                    throw th12;
                }
            } catch (Throwable th14) {
                if (r9 != 0) {
                    if (r10 != 0) {
                        try {
                            r9.close();
                        } catch (Throwable th15) {
                            r10.addSuppressed(th15);
                        }
                    } else {
                        r9.close();
                    }
                }
                throw th14;
            }
        } finally {
            if (jdbcConnection2 != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection2.close();
                    } catch (Throwable th16) {
                        th2.addSuppressed(th16);
                    }
                } else {
                    jdbcConnection2.close();
                }
            }
        }
    }

    @Test
    public void testConsumingAllEventsByChunkKeyColumn() throws Exception {
        OracleSourceTestBase.createAndInitialize("product.sql");
        Connection jdbcConnectionAsDBA = OracleSourceTestBase.getJdbcConnectionAsDBA();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnectionAsDBA.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("GRANT ANALYZE ANY TO " + OracleSourceTestBase.ORACLE_CONTAINER.getUsername());
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    if (this.parallelismSnapshot) {
                        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10,3)) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'scan.incremental.snapshot.chunk.key-column' = 'ID', 'debezium.log.mining.strategy' = 'online_catalog', 'debezium.database.history.store.only.captured.tables.ddl' = 'true', 'scan.incremental.snapshot.chunk.size' = '2', 'database-name' = 'ORCLCDB', 'schema-name' = '%s', 'table-name' = '%s')", OracleSourceTestBase.ORACLE_CONTAINER.getHost(), OracleSourceTestBase.ORACLE_CONTAINER.getOraclePort(), OracleSourceTestBase.ORACLE_CONTAINER.getUsername(), OracleSourceTestBase.ORACLE_CONTAINER.getPassword(), Boolean.valueOf(this.parallelismSnapshot), OracleSourceTestBase.TEST_USER, "products"));
                        this.tEnv.executeSql("CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
                        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME");
                        waitForSinkSize("sink", 9);
                        Connection jdbcConnection = OracleSourceTestBase.getJdbcConnection();
                        Throwable th4 = null;
                        try {
                            Statement createStatement2 = jdbcConnection.createStatement();
                            Throwable th5 = null;
                            try {
                                createStatement2.execute("UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106");
                                createStatement2.execute("UPDATE debezium.products SET WEIGHT=5.1 WHERE ID=107");
                                createStatement2.execute("INSERT INTO debezium.products VALUES (111,'jacket','water resistent white wind breaker',0.2)");
                                createStatement2.execute("INSERT INTO debezium.products VALUES (112,'scooter','Big 2-wheel scooter ',5.18)");
                                createStatement2.execute("UPDATE debezium.products SET DESCRIPTION='new water resistent white wind breaker', WEIGHT=0.5 WHERE ID=111");
                                createStatement2.execute("UPDATE debezium.products SET WEIGHT=5.17 WHERE ID=112");
                                createStatement2.execute("DELETE FROM debezium.products WHERE ID=112");
                                if (createStatement2 != null) {
                                    if (0 != 0) {
                                        try {
                                            createStatement2.close();
                                        } catch (Throwable th6) {
                                            th5.addSuppressed(th6);
                                        }
                                    } else {
                                        createStatement2.close();
                                    }
                                }
                                waitForSinkSize("sink", 20);
                                List results = TestValuesTableFactory.getResults("sink");
                                LOG.info("actual:{}", results);
                                OracleSourceTestBase.assertEqualsInAnyOrder(Arrays.asList("+I[scooter, 3.140]", "+I[car battery, 8.100]", "+I[12-pack drill bits, 0.800]", "+I[hammer, 2.625]", "+I[rocks, 5.100]", "+I[jacket, 0.600]", "+I[spare tire, 22.200]"), results);
                                ((JobClient) executeSql.getJobClient().get()).cancel().get();
                            } catch (Throwable th7) {
                                if (createStatement2 != null) {
                                    if (0 != 0) {
                                        try {
                                            createStatement2.close();
                                        } catch (Throwable th8) {
                                            th5.addSuppressed(th8);
                                        }
                                    } else {
                                        createStatement2.close();
                                    }
                                }
                                throw th7;
                            }
                        } finally {
                            if (jdbcConnection != null) {
                                if (0 != 0) {
                                    try {
                                        jdbcConnection.close();
                                    } catch (Throwable th9) {
                                        th4.addSuppressed(th9);
                                    }
                                } else {
                                    jdbcConnection.close();
                                }
                            }
                        }
                    }
                } finally {
                }
            } catch (Throwable th10) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th11) {
                            th2.addSuppressed(th11);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th10;
            }
        } finally {
            if (jdbcConnectionAsDBA != null) {
                if (0 != 0) {
                    try {
                        jdbcConnectionAsDBA.close();
                    } catch (Throwable th12) {
                        th.addSuppressed(th12);
                    }
                } else {
                    jdbcConnectionAsDBA.close();
                }
            }
        }
    }

    @Test
    public void testMetadataColumns() throws Throwable {
        OracleSourceTestBase.createAndInitialize("product.sql");
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( DB_NAME STRING METADATA FROM 'database_name' VIRTUAL, SCHEMA_NAME STRING METADATA FROM 'schema_name' VIRTUAL, TABLE_NAME STRING METADATA  FROM 'table_name' VIRTUAL, ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10,3), PRIMARY KEY (ID) NOT ENFORCED) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'debezium.log.mining.strategy' = 'online_catalog', 'scan.incremental.snapshot.chunk.size' = '2', 'database-name' = 'ORCLCDB', 'schema-name' = '%s', 'table-name' = '%s')", OracleSourceTestBase.ORACLE_CONTAINER.getHost(), OracleSourceTestBase.ORACLE_CONTAINER.getOraclePort(), OracleSourceTestBase.CONNECTOR_USER, "dbz", Boolean.valueOf(this.parallelismSnapshot), OracleSourceTestBase.TEST_USER, "products"));
        this.tEnv.executeSql("CREATE TABLE sink ( database_name STRING, schema_name STRING, table_name STRING, id INT, name STRING, description STRING, weight DECIMAL(10,3), PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        waitForSinkSize("sink", 9);
        Connection jdbcConnection = OracleSourceTestBase.getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("UPDATE debezium.products SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106");
                    createStatement.execute("UPDATE debezium.products SET WEIGHT=5.1 WHERE ID=107");
                    createStatement.execute("INSERT INTO debezium.products VALUES (111,'jacket','water resistent white wind breaker',0.2)");
                    createStatement.execute("INSERT INTO debezium.products VALUES (112,'scooter','Big 2-wheel scooter ',5.18)");
                    createStatement.execute("UPDATE debezium.products SET DESCRIPTION='new water resistent white wind breaker', WEIGHT=0.5 WHERE ID=111");
                    createStatement.execute("UPDATE debezium.products SET WEIGHT=5.17 WHERE ID=112");
                    createStatement.execute("DELETE FROM debezium.products WHERE ID=112");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    waitForSinkSize("sink", 16);
                    List asList = Arrays.asList("+I[ORCLCDB, DEBEZIUM, PRODUCTS, 101, scooter, Small 2-wheel scooter, 3.140]", "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 102, car battery, 12V car battery, 8.100]", "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]", "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 104, hammer, 12oz carpenters hammer, 0.750]", "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 105, hammer, 14oz carpenters hammer, 0.875]", "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 106, hammer, 16oz carpenters hammer, 1.000]", "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 107, rocks, box of assorted rocks, 5.300]", "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 108, jacket, water resistent black wind breaker, 0.100]", "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 109, spare tire, 24 inch spare tire, 22.200]", "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 111, jacket, water resistent white wind breaker, 0.200]", "+I[ORCLCDB, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.180]", "+U[ORCLCDB, DEBEZIUM, PRODUCTS, 106, hammer, 18oz carpenter hammer, 1.000]", "+U[ORCLCDB, DEBEZIUM, PRODUCTS, 107, rocks, box of assorted rocks, 5.100]", "+U[ORCLCDB, DEBEZIUM, PRODUCTS, 111, jacket, new water resistent white wind breaker, 0.500]", "+U[ORCLCDB, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]", "-D[ORCLCDB, DEBEZIUM, PRODUCTS, 112, scooter, Big 2-wheel scooter , 5.170]");
                    List rawResults = TestValuesTableFactory.getRawResults("sink");
                    Collections.sort(asList);
                    Collections.sort(rawResults);
                    Assert.assertEquals(asList, rawResults);
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    @Test
    public void testStartupFromLatestOffset() throws Exception {
        OracleSourceTestBase.createAndInitialize("product.sql");
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10,3)) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'debezium.log.mining.strategy' = 'online_catalog', 'debezium.database.history.store.only.captured.tables.ddl' = 'true', 'database-name' = 'ORCLCDB', 'schema-name' = '%s', 'table-name' = '%s' , 'scan.startup.mode' = 'latest-offset')", OracleSourceTestBase.ORACLE_CONTAINER.getHost(), OracleSourceTestBase.ORACLE_CONTAINER.getOraclePort(), OracleSourceTestBase.CONNECTOR_USER, "dbz", Boolean.valueOf(this.parallelismSnapshot), OracleSourceTestBase.TEST_USER, "products"));
        this.tEnv.executeSql("CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE debezium_source (EXCLUDING OPTIONS)");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        Thread.sleep(10000L);
        Connection jdbcConnection = OracleSourceTestBase.getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("INSERT INTO debezium.products VALUES (110,'jacket','water resistent white wind breaker',0.2)");
                    createStatement.execute("INSERT INTO debezium.products VALUES (111,'scooter','Big 2-wheel scooter ',5.18)");
                    createStatement.execute("UPDATE debezium.products SET description='new water resistent white wind breaker', weight=0.5 WHERE id=110");
                    createStatement.execute("UPDATE debezium.products SET weight=5.17 WHERE id=111");
                    createStatement.execute("DELETE FROM debezium.products WHERE id=111");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    waitForSinkSize("sink", 7);
                    Assert.assertThat(TestValuesTableFactory.getResults("sink"), Matchers.containsInAnyOrder(new String[]{"+I[110, jacket, new water resistent white wind breaker, 0.500]"}));
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    @Test
    public void testConsumingNumericColumns() throws Exception {
        Connection jdbcConnection = OracleSourceTestBase.getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("CREATE TABLE debezium.test_numeric_table ( ID NUMBER(18,0), TEST_BOOLEAN NUMBER(1,0), TEST_TINYINT NUMBER(2,0), TEST_SMALLINT NUMBER(4,0), TEST_INT NUMBER(9,0), TEST_BIG_NUMERIC NUMBER(32,0), TEST_DECIMAL NUMBER(20,8), TEST_NUMBER NUMBER, TEST_NUMERIC NUMBER, TEST_FLOAT FLOAT(63), PRIMARY KEY (ID))");
                    createStatement.execute("INSERT INTO debezium.test_numeric_table VALUES (11000000000, 0, 98, 9998, 987654320, 20000000000000000000, 987654321.12345678, 2147483647, 1024.955, 1024.955)");
                    createStatement.execute("INSERT INTO debezium.test_numeric_table VALUES (11000000001, 1, 99, 9999, 987654321, 20000000000000000001, 987654321.87654321, 2147483648, 1024.965, 1024.965)");
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    this.tEnv.executeSql(String.format("CREATE TABLE test_numeric_table ( ID BIGINT, TEST_BOOLEAN BOOLEAN, TEST_TINYINT TINYINT, TEST_SMALLINT SMALLINT, TEST_INT INT, TEST_BIG_NUMERIC DECIMAL(32, 0), TEST_DECIMAL DECIMAL(20, 8), TEST_NUMBER BIGINT, TEST_NUMERIC DECIMAL(10, 3), TEST_FLOAT FLOAT, PRIMARY KEY (ID) NOT ENFORCED) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'debezium.log.mining.strategy' = 'online_catalog', 'debezium.database.history.store.only.captured.tables.ddl' = 'true', 'database-name' = 'ORCLCDB', 'schema-name' = '%s', 'table-name' = '%s')", OracleSourceTestBase.ORACLE_CONTAINER.getHost(), OracleSourceTestBase.ORACLE_CONTAINER.getOraclePort(), OracleSourceTestBase.CONNECTOR_USER, "dbz", Boolean.valueOf(this.parallelismSnapshot), OracleSourceTestBase.TEST_USER, "test_numeric_table"));
                    this.tEnv.executeSql("CREATE TABLE test_numeric_sink ( id BIGINT, test_boolean BOOLEAN, test_tinyint TINYINT, test_smallint SMALLINT, test_int INT, test_big_numeric DECIMAL(32, 0), test_decimal DECIMAL(20, 8), test_number BIGINT, test_numeric DECIMAL(10, 3), test_float FLOAT, PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
                    TableResult executeSql = this.tEnv.executeSql("INSERT INTO test_numeric_sink SELECT * FROM test_numeric_table");
                    waitForSnapshotStarted("test_numeric_sink");
                    waitForSinkSize("test_numeric_sink", 2);
                    List asList = Arrays.asList("+I[11000000000, false, 98, 9998, 987654320, 20000000000000000000, 987654321.12345678, 2147483647, 1024.955, 1024.955]", "+I[11000000001, true, 99, 9999, 987654321, 20000000000000000001, 987654321.87654321, 2147483648, 1024.965, 1024.965]");
                    List rawResults = TestValuesTableFactory.getRawResults("test_numeric_sink");
                    Collections.sort(rawResults);
                    Assert.assertEquals(asList, rawResults);
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    @Test
    public void testAllDataTypes() throws Throwable {
        OracleSourceTestBase.createAndInitialize("column_type_test.sql");
        this.tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
        this.tEnv.executeSql(String.format("CREATE TABLE full_types ( ID INT, VAL_VARCHAR STRING, VAL_VARCHAR2 STRING, VAL_NVARCHAR2 STRING, VAL_CHAR STRING, VAL_NCHAR STRING, VAL_BF FLOAT, VAL_BD DOUBLE, VAL_F FLOAT, VAL_F_10 FLOAT, VAL_NUM DECIMAL(10, 6), VAL_DP DOUBLE, VAL_R DECIMAL(38,2), VAL_DECIMAL DECIMAL(10, 6), VAL_NUMERIC DECIMAL(10, 6), VAL_NUM_VS DECIMAL(10, 3), VAL_INT DECIMAL(38,0), VAL_INTEGER DECIMAL(38,0), VAL_SMALLINT DECIMAL(38,0), VAL_NUMBER_38_NO_SCALE DECIMAL(38,0), VAL_NUMBER_38_SCALE_0 DECIMAL(38,0), VAL_NUMBER_1 BOOLEAN, VAL_NUMBER_2 TINYINT, VAL_NUMBER_4 SMALLINT, VAL_NUMBER_9 INT, VAL_NUMBER_18 BIGINT, VAL_NUMBER_2_NEGATIVE_SCALE TINYINT, VAL_NUMBER_4_NEGATIVE_SCALE SMALLINT, VAL_NUMBER_9_NEGATIVE_SCALE INT, VAL_NUMBER_18_NEGATIVE_SCALE BIGINT, VAL_NUMBER_36_NEGATIVE_SCALE DECIMAL(38,0), VAL_DATE TIMESTAMP, VAL_TS TIMESTAMP, VAL_TS_PRECISION2 TIMESTAMP(2 ), VAL_TS_PRECISION4 TIMESTAMP(4), VAL_TS_PRECISION9 TIMESTAMP(6), VAL_TSTZ STRING, VAL_TSLTZ TIMESTAMP_LTZ, VAL_INT_YTM BIGINT, VAL_INT_DTS BIGINT, T15VARCHAR STRING, PRIMARY KEY (ID) NOT ENFORCED) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'debezium.log.mining.strategy' = 'online_catalog', 'debezium.database.history.store.only.captured.tables.ddl' = 'true', 'scan.incremental.snapshot.chunk.size' = '2', 'database-name' = 'ORCLCDB', 'schema-name' = '%s', 'table-name' = '%s')", OracleSourceTestBase.ORACLE_CONTAINER.getHost(), OracleSourceTestBase.ORACLE_CONTAINER.getOraclePort(), OracleSourceTestBase.CONNECTOR_USER, "dbz", Boolean.valueOf(this.parallelismSnapshot), OracleSourceTestBase.TEST_USER, "full_types"));
        this.tEnv.executeSql("CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '2') LIKE full_types (EXCLUDING OPTIONS)");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM full_types");
        waitForSinkSize("sink", 1);
        List results = TestValuesTableFactory.getResults("sink");
        Collections.sort(results);
        Assert.assertEquals(Arrays.asList("+I[1, vc2, vc2, nvc2, c  , nc , 1.1, 2.22, 3.33, 8.888, 4.444400, 5.555, 6.66, 1234.567891, 1234.567891, 77.323, 1, 22, 333, 4444, 5555, true, 99, 9999, 999999999, 999999999999999999, 90, 9900, 999999990, 999999999999999900, 99999999999999999999999999999999999900, 2022-10-30T00:00, 2022-10-30T12:34:56.007890, 2022-10-30T12:34:56.130, 2022-10-30T12:34:56.125500, 2022-10-30T12:34:56.125457, 2022-10-30T01:34:56.00789-11:00, 2022-10-29T17:34:56.007890Z, -110451600000000, -93784560000, <name>\n  <a id=\"1\" value=\"some values\">test xmlType</a>\n</name>\n]"), results);
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    @Test
    public void testSnapshotToStreamingSwitchPendingTransactions() throws Exception {
        Assume.assumeFalse(this.parallelismSnapshot);
        CompletableFuture<Void> createRecordInserters = createRecordInserters();
        this.tEnv.executeSql(String.format("CREATE TABLE messages ( ID INT NOT NULL, CATEGORY_NAME STRING) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = 'category', 'scan.incremental.snapshot.enabled' = 'false', 'debezium.log.mining.strategy' = 'online_catalog', 'debezium.database.history.store.only.captured.tables.ddl' = 'true')", OracleSourceTestBase.ORACLE_CONTAINER.getHost(), OracleSourceTestBase.ORACLE_CONTAINER.getOraclePort(), OracleSourceTestBase.CONNECTOR_USER, "dbz", OracleSourceTestBase.ORACLE_DATABASE, OracleSourceTestBase.TEST_USER));
        this.tEnv.executeSql("CREATE TABLE sink ( ID INT, message STRING) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM messages");
        createRecordInserters.get(10L, TimeUnit.MINUTES);
        LOG.info("all async runners were finished");
        waitForSinkSize("sink", RECORDS_COUNT);
        Assert.assertEquals((List) IntStream.range(0, RECORDS_COUNT).boxed().collect(Collectors.toList()), (List) TestValuesTableFactory.getResults("sink").stream().map(str -> {
            return str.replaceFirst("\\+I\\[(\\d+).+", "$1");
        }).map(Integer::parseInt).sorted().collect(Collectors.toList()));
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    private CompletableFuture<Void> createRecordInserters() {
        int i = 100;
        int i2 = 2500;
        int i3 = 2500 / WORKERS_COUNT;
        List list = (List) IntStream.range(0, WORKERS_COUNT).mapToObj(i4 -> {
            return createRecordInserter(i, i2 * i4, i2, i3);
        }).collect(Collectors.toList());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(WORKERS_COUNT);
        return CompletableFuture.allOf((CompletableFuture[]) list.stream().map(runnable -> {
            return CompletableFuture.runAsync(runnable, newFixedThreadPool);
        }).toArray(i5 -> {
            return new CompletableFuture[i5];
        }));
    }

    private Runnable createRecordInserter(int i, int i2, int i3, int i4) {
        return () -> {
            Supplier createRandomSupplier = createRandomSupplier(Lists.newArrayList(new String[]{"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"}));
            RateLimiter create = RateLimiter.create(i);
            try {
                try {
                    Connection jdbcConnection = OracleSourceTestBase.getJdbcConnection();
                    Throwable th = null;
                    Statement createStatement = jdbcConnection.createStatement();
                    Throwable th2 = null;
                    try {
                        try {
                            jdbcConnection.setAutoCommit(false);
                            for (long j = i2; j < i2 + i3; j++) {
                                create.acquire();
                                createStatement.execute(String.format("INSERT INTO %s.%s VALUES (%d,'%s')", OracleSourceTestBase.TEST_USER, "category", Long.valueOf(j), createRandomSupplier.get()));
                                if (j % i4 == 0) {
                                    LOG.info("Committing at id {}", Long.valueOf(j));
                                    jdbcConnection.commit();
                                }
                            }
                            jdbcConnection.commit();
                            if (createStatement != null) {
                                if (0 != 0) {
                                    try {
                                        createStatement.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    createStatement.close();
                                }
                            }
                            if (jdbcConnection != null) {
                                if (0 != 0) {
                                    try {
                                        jdbcConnection.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    jdbcConnection.close();
                                }
                            }
                        } finally {
                        }
                    } catch (Throwable th5) {
                        if (createStatement != null) {
                            if (th2 != null) {
                                try {
                                    createStatement.close();
                                } catch (Throwable th6) {
                                    th2.addSuppressed(th6);
                                }
                            } else {
                                createStatement.close();
                            }
                        }
                        throw th5;
                    }
                } finally {
                }
            } catch (SQLException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        };
    }

    private <T> Supplier<T> createRandomSupplier(List<T> list) {
        int size = list.size();
        return () -> {
            return list.get(ThreadLocalRandom.current().nextInt(size));
        };
    }

    private static void waitForSnapshotStarted(String str) throws InterruptedException {
        while (sinkSize(str) == 0) {
            Thread.sleep(100L);
        }
    }

    private static void waitForSinkSize(String str, int i) throws InterruptedException {
        while (sinkSize(str) < i) {
            Thread.sleep(100L);
        }
    }

    private static int sinkSize(String str) {
        int size;
        synchronized (TestValuesTableFactory.class) {
            try {
                size = TestValuesTableFactory.getRawResults(str).size();
            } catch (IllegalArgumentException e) {
                return 0;
            }
        }
        return size;
    }

    @Test
    public void testCompositePkTableSplitsUnevenlyWithChunkKeyColumn() throws Exception {
        if (this.parallelismSnapshot) {
            testUseChunkColumn("PRODUCT_KIND");
        }
    }

    @Test
    public void testCompositePkTableSplitsEvenlyWithChunkKeyColumn() throws Exception {
        if (this.parallelismSnapshot) {
            testUseChunkColumn("PRODUCT_NO");
        }
    }

    private void testUseChunkColumn(String str) throws Exception {
        OracleSourceTestBase.createAndInitialize("customer.sql");
        Connection jdbcConnectionAsDBA = OracleSourceTestBase.getJdbcConnectionAsDBA();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnectionAsDBA.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute("GRANT ANALYZE ANY TO " + OracleSourceTestBase.ORACLE_CONTAINER.getUsername());
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    this.tEnv.executeSql(String.format("CREATE TABLE evenly_shopping_cart (\n    PRODUCT_NO INT NOT NULL,\n    PRODUCT_KIND VARCHAR(255),\n    USER_ID VARCHAR(255) NOT NULL,\n    DESCRIPTION VARCHAR(255) NOT NULL\n) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'scan.incremental.snapshot.chunk.key-column' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = '%s')", OracleSourceTestBase.ORACLE_CONTAINER.getHost(), OracleSourceTestBase.ORACLE_CONTAINER.getOraclePort(), OracleSourceTestBase.ORACLE_CONTAINER.getUsername(), OracleSourceTestBase.ORACLE_CONTAINER.getPassword(), Boolean.valueOf(this.parallelismSnapshot), str, Integer.valueOf(WORKERS_COUNT), OracleSourceTestBase.ORACLE_DATABASE, OracleSourceTestBase.ORACLE_SCHEMA, "EVENLY_SHOPPING_CART"));
                    this.tEnv.executeSql("CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE evenly_shopping_cart (EXCLUDING OPTIONS)");
                    TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM evenly_shopping_cart");
                    waitForSinkSize("sink", 12);
                    ((JobClient) executeSql.getJobClient().get()).cancel().get();
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnectionAsDBA != null) {
                if (0 != 0) {
                    try {
                        jdbcConnectionAsDBA.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnectionAsDBA.close();
                }
            }
        }
    }
}
