package org.apache.flink.connector.jdbc.xa;

import com.mysql.cj.jdbc.MysqlXADataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.sql.XADataSource;
import oracle.jdbc.xa.client.OracleXADataSource;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.connector.jdbc.DbMetadata;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcITCase;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcTestBase;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
import org.apache.flink.connector.jdbc.dialect.oracle.OracleContainer;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.LogLevelRule;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableSupplier;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.postgresql.xa.PGXADataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.PostgreSQLContainer;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.class */
public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
    private static final long CHECKPOINT_TIMEOUT_MS = 20000;
    private static final long TASK_CANCELLATION_TIMEOUT_MS = 20000;

    @Parameterized.Parameter
    public JdbcExactlyOnceSinkTestEnv dbEnv;
    private MiniClusterWithClientResource cluster;
    private static final Random RANDOM = new Random(System.currentTimeMillis());
    private static final Logger LOG = LoggerFactory.getLogger(JdbcExactlyOnceSinkE2eTest.class);

    @ClassRule
    public static final LogLevelRule TEST_LOG_LEVEL_RULE = new LogLevelRule().set(JdbcExactlyOnceSinkE2eTest.class, Level.TRACE).set(XaFacadeImpl.class, Level.TRACE).set(MySqlJdbcExactlyOnceSinkTestEnv.InnoDbStatusLogger.class, Level.TRACE);
    private static final Map<Integer, CountDownLatch> activeSources = new ConcurrentHashMap();
    private static final Map<Integer, CountDownLatch> inactiveMappers = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$FailingMapper.class */
    private static class FailingMapper extends RichMapFunction<JdbcTestFixture.TestEntry, JdbcTestFixture.TestEntry> {
        private final int minElementsPerFailure;
        private final int maxElementsPerFailure;
        private transient int remaining;

        public FailingMapper(int i, int i2) {
            this.minElementsPerFailure = i;
            this.maxElementsPerFailure = i2;
        }

        public void open(Configuration configuration) throws Exception {
            this.remaining = this.minElementsPerFailure + JdbcExactlyOnceSinkE2eTest.RANDOM.nextInt(this.maxElementsPerFailure);
            ((CountDownLatch) JdbcExactlyOnceSinkE2eTest.inactiveMappers.computeIfAbsent(Integer.valueOf(getRuntimeContext().getAttemptNumber()), num -> {
                return new CountDownLatch(getRuntimeContext().getNumberOfParallelSubtasks());
            })).countDown();
            JdbcExactlyOnceSinkE2eTest.LOG.debug("Mapper will fail after {} records", Integer.valueOf(this.remaining));
        }

        public JdbcTestFixture.TestEntry map(JdbcTestFixture.TestEntry testEntry) throws Exception {
            int i = this.remaining - 1;
            this.remaining = i;
            if (i > 0) {
                return testEntry;
            }
            JdbcExactlyOnceSinkE2eTest.LOG.debug("Mapper failing intentionally");
            throw new TestException();
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$JdbcExactlyOnceSinkTestEnv.class */
    private interface JdbcExactlyOnceSinkTestEnv {
        void start();

        void stop();

        JdbcDatabaseContainer<?> getContainer();

        SerializableSupplier<XADataSource> getDataSourceSupplier();

        int getParallelism();
    }

    /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$MySqlJdbcExactlyOnceSinkTestEnv.class */
    private static class MySqlJdbcExactlyOnceSinkTestEnv implements JdbcExactlyOnceSinkTestEnv {
        private final int parallelism;
        private final JdbcDatabaseContainer<?> db = new MySqlXaDb();

        /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$MySqlJdbcExactlyOnceSinkTestEnv$InnoDbStatusLogger.class */
        private static class InnoDbStatusLogger {
            private static final Logger LOG = LoggerFactory.getLogger(InnoDbStatusLogger.class);
            private final Thread thread;
            private volatile boolean running;

            private InnoDbStatusLogger(String str, String str2, String str3, long j) {
                this.running = true;
                this.thread = new Thread(() -> {
                    LOG.info("Logging InnoDB status every {}ms", Long.valueOf(j));
                    try {
                        try {
                            Connection connection = DriverManager.getConnection(str, str2, str3);
                            Throwable th = null;
                            while (this.running) {
                                try {
                                    try {
                                        Thread.sleep(j);
                                        queryAndLog(connection);
                                    } catch (Throwable th2) {
                                        if (connection != null) {
                                            if (th != null) {
                                                try {
                                                    connection.close();
                                                } catch (Throwable th3) {
                                                    th.addSuppressed(th3);
                                                }
                                            } else {
                                                connection.close();
                                            }
                                        }
                                        throw th2;
                                    }
                                } catch (Throwable th4) {
                                    th = th4;
                                    throw th4;
                                }
                            }
                            if (connection != null) {
                                if (0 != 0) {
                                    try {
                                        connection.close();
                                    } catch (Throwable th5) {
                                        th.addSuppressed(th5);
                                    }
                                } else {
                                    connection.close();
                                }
                            }
                            LOG.info("Logging InnoDB status stopped");
                        } catch (Exception e) {
                            LOG.warn("failed", e);
                            LOG.info("Logging InnoDB status stopped");
                        }
                    } catch (Throwable th6) {
                        LOG.info("Logging InnoDB status stopped");
                        throw th6;
                    }
                });
            }

            public void start() {
                this.thread.start();
            }

            public void stop() throws InterruptedException {
                this.running = false;
                this.thread.join();
            }

            private void queryAndLog(Connection connection) throws SQLException {
                Statement createStatement = connection.createStatement();
                Throwable th = null;
                try {
                    try {
                        showBlockedTrx(createStatement);
                        showAllTrx(createStatement);
                        showEngineStatus(createStatement);
                        showRecoveredTrx(createStatement);
                        if (createStatement != null) {
                            if (0 == 0) {
                                createStatement.close();
                                return;
                            }
                            try {
                                createStatement.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (createStatement != null) {
                        if (th != null) {
                            try {
                                createStatement.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    throw th4;
                }
            }

            private void showRecoveredTrx(Statement statement) throws SQLException {
                ResultSet executeQuery = statement.executeQuery("xa recover convert xid ");
                Throwable th = null;
                while (executeQuery.next()) {
                    try {
                        try {
                            LOG.debug("recovered trx: {} {} {} {}", new Object[]{executeQuery.getString(1), executeQuery.getString(2), executeQuery.getString(3), executeQuery.getString(4)});
                        } catch (Throwable th2) {
                            th = th2;
                            throw th2;
                        }
                    } catch (Throwable th3) {
                        if (executeQuery != null) {
                            if (th != null) {
                                try {
                                    executeQuery.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                executeQuery.close();
                            }
                        }
                        throw th3;
                    }
                }
                if (executeQuery != null) {
                    if (0 == 0) {
                        executeQuery.close();
                        return;
                    }
                    try {
                        executeQuery.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                }
            }

            private void showEngineStatus(Statement statement) throws SQLException {
                LOG.debug("Engine status");
                ResultSet executeQuery = statement.executeQuery("show engine innodb status");
                Throwable th = null;
                while (executeQuery.next()) {
                    try {
                        try {
                            LOG.debug(executeQuery.getString(3));
                        } catch (Throwable th2) {
                            th = th2;
                            throw th2;
                        }
                    } catch (Throwable th3) {
                        if (executeQuery != null) {
                            if (th != null) {
                                try {
                                    executeQuery.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                executeQuery.close();
                            }
                        }
                        throw th3;
                    }
                }
                if (executeQuery != null) {
                    if (0 == 0) {
                        executeQuery.close();
                        return;
                    }
                    try {
                        executeQuery.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                }
            }

            private void showAllTrx(Statement statement) throws SQLException {
                LOG.debug("All TRX");
                ResultSet executeQuery = statement.executeQuery("select * from information_schema.innodb_trx");
                Throwable th = null;
                while (executeQuery.next()) {
                    try {
                        try {
                            LOG.debug("trx_id: {}, trx_state: {}, trx_started: {}, trx_requested_lock_id: {}, trx_wait_started: {}, trx_mysql_thread_id: {},", new Object[]{executeQuery.getString("trx_id"), executeQuery.getString("trx_state"), executeQuery.getString("trx_started"), executeQuery.getString("trx_requested_lock_id"), executeQuery.getString("trx_wait_started"), executeQuery.getString("trx_mysql_thread_id")});
                        } catch (Throwable th2) {
                            th = th2;
                            throw th2;
                        }
                    } catch (Throwable th3) {
                        if (executeQuery != null) {
                            if (th != null) {
                                try {
                                    executeQuery.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                executeQuery.close();
                            }
                        }
                        throw th3;
                    }
                }
                if (executeQuery != null) {
                    if (0 == 0) {
                        executeQuery.close();
                        return;
                    }
                    try {
                        executeQuery.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                }
            }

            private void showBlockedTrx(Statement statement) throws SQLException {
                LOG.debug("Blocked TRX");
                ResultSet executeQuery = statement.executeQuery(" SELECT waiting_trx_id, waiting_pid, waiting_query, blocking_trx_id, blocking_pid, blocking_query FROM sys.innodb_lock_waits; ");
                Throwable th = null;
                while (executeQuery.next()) {
                    try {
                        try {
                            LOG.debug("waiting_trx_id: {}, waiting_pid: {}, waiting_query: {}, blocking_trx_id: {}, blocking_pid: {}, blocking_query: {}", new Object[]{executeQuery.getString(1), executeQuery.getString(2), executeQuery.getString(3), executeQuery.getString(4), executeQuery.getString(5), executeQuery.getString(6)});
                        } catch (Throwable th2) {
                            th = th2;
                            throw th2;
                        }
                    } catch (Throwable th3) {
                        if (executeQuery != null) {
                            if (th != null) {
                                try {
                                    executeQuery.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                executeQuery.close();
                            }
                        }
                        throw th3;
                    }
                }
                if (executeQuery != null) {
                    if (0 == 0) {
                        executeQuery.close();
                        return;
                    }
                    try {
                        executeQuery.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                }
            }
        }

        /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$MySqlJdbcExactlyOnceSinkTestEnv$MySqlXaDataSourceFactory.class */
        private static class MySqlXaDataSourceFactory implements SerializableSupplier<XADataSource> {
            private final String jdbcUrl;
            private final String username;
            private final String password;

            public MySqlXaDataSourceFactory(String str, String str2, String str3) {
                this.jdbcUrl = str;
                this.username = str2;
                this.password = str3;
            }

            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public XADataSource m23get() {
                MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
                mysqlXADataSource.setUrl(this.jdbcUrl);
                mysqlXADataSource.setUser(this.username);
                mysqlXADataSource.setPassword(this.password);
                return mysqlXADataSource;
            }
        }

        /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$MySqlJdbcExactlyOnceSinkTestEnv$MySqlXaDb.class */
        private static final class MySqlXaDb extends MySQLContainer<MySqlXaDb> {
            private static final String IMAGE_NAME = "mysql:8.0.23";
            private volatile InnoDbStatusLogger innoDbStatusLogger;

            public String toString() {
                return IMAGE_NAME;
            }

            public MySqlXaDb() {
                super(IMAGE_NAME);
            }

            public void start() {
                Connection connection;
                Throwable th;
                super.start();
                try {
                    connection = DriverManager.getConnection(getJdbcUrl(), "root", getPassword());
                    th = null;
                } catch (SQLException e) {
                    ExceptionUtils.rethrow(e);
                }
                try {
                    try {
                        prepareDb(connection, 80000L);
                        if (connection != null) {
                            if (0 != 0) {
                                try {
                                    connection.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                connection.close();
                            }
                        }
                        this.innoDbStatusLogger = new InnoDbStatusLogger(getJdbcUrl(), "root", getPassword(), 80000 / 2);
                        this.innoDbStatusLogger.start();
                    } finally {
                    }
                } finally {
                }
            }

            public void stop() {
                try {
                    this.innoDbStatusLogger.stop();
                } catch (Exception e) {
                    ExceptionUtils.rethrow(e);
                } finally {
                    super.stop();
                }
            }

            private void prepareDb(Connection connection, long j) throws SQLException {
                Statement createStatement = connection.createStatement();
                Throwable th = null;
                try {
                    try {
                        createStatement.execute("GRANT XA_RECOVER_ADMIN ON *.* TO '" + getUsername() + "'@'%'");
                        createStatement.execute("FLUSH PRIVILEGES");
                        createStatement.execute("SET GLOBAL innodb_lock_wait_timeout = " + j);
                        if (createStatement != null) {
                            if (0 == 0) {
                                createStatement.close();
                                return;
                            }
                            try {
                                createStatement.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (createStatement != null) {
                        if (th != null) {
                            try {
                                createStatement.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    throw th4;
                }
            }
        }

        public MySqlJdbcExactlyOnceSinkTestEnv(int i) {
            this.parallelism = i;
        }

        @Override // org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.JdbcExactlyOnceSinkTestEnv
        public void start() {
            this.db.start();
        }

        @Override // org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.JdbcExactlyOnceSinkTestEnv
        public void stop() {
            this.db.close();
        }

        @Override // org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.JdbcExactlyOnceSinkTestEnv
        public JdbcDatabaseContainer<?> getContainer() {
            return this.db;
        }

        @Override // org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.JdbcExactlyOnceSinkTestEnv
        public SerializableSupplier<XADataSource> getDataSourceSupplier() {
            return new MySqlXaDataSourceFactory(this.db.getJdbcUrl(), this.db.getUsername(), this.db.getPassword());
        }

        @Override // org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.JdbcExactlyOnceSinkTestEnv
        public int getParallelism() {
            return this.parallelism;
        }

        public String toString() {
            return this.db + ", parallelism=" + this.parallelism;
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$OracleJdbcExactlyOnceSinkTestEnv.class */
    private static class OracleJdbcExactlyOnceSinkTestEnv implements JdbcExactlyOnceSinkTestEnv {
        private final int parallelism;
        private final OracleContainer db;

        /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$OracleJdbcExactlyOnceSinkTestEnv$OracleXaDataSourceFactory.class */
        private static class OracleXaDataSourceFactory implements SerializableSupplier<XADataSource> {
            private final String jdbcUrl;
            private final String username;
            private final String password;

            public OracleXaDataSourceFactory(String str, String str2, String str3) {
                this.jdbcUrl = str;
                this.username = str2;
                this.password = str3;
            }

            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public XADataSource m24get() {
                try {
                    OracleXADataSource oracleXADataSource = new OracleXADataSource();
                    oracleXADataSource.setURL(this.jdbcUrl);
                    oracleXADataSource.setUser(this.username);
                    oracleXADataSource.setPassword(this.password);
                    return oracleXADataSource;
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        private OracleJdbcExactlyOnceSinkTestEnv(int i) {
            this.parallelism = i;
            this.db = new OracleContainer();
        }

        @Override // org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.JdbcExactlyOnceSinkTestEnv
        public void start() {
            this.db.start();
        }

        @Override // org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.JdbcExactlyOnceSinkTestEnv
        public void stop() {
            this.db.close();
        }

        @Override // org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.JdbcExactlyOnceSinkTestEnv
        public JdbcDatabaseContainer<?> getContainer() {
            return this.db;
        }

        @Override // org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.JdbcExactlyOnceSinkTestEnv
        public SerializableSupplier<XADataSource> getDataSourceSupplier() {
            return new OracleXaDataSourceFactory(this.db.getJdbcUrl(), this.db.getUsername(), this.db.getPassword());
        }

        @Override // org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.JdbcExactlyOnceSinkTestEnv
        public int getParallelism() {
            return this.parallelism;
        }

        public String toString() {
            return this.db + ", parallelism=" + this.parallelism;
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$PgSqlJdbcExactlyOnceSinkTestEnv.class */
    private static class PgSqlJdbcExactlyOnceSinkTestEnv implements JdbcExactlyOnceSinkTestEnv {
        private final int parallelism;
        private final PgXaDb db;

        /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$PgSqlJdbcExactlyOnceSinkTestEnv$PgXaDataSourceFactory.class */
        private static class PgXaDataSourceFactory implements SerializableSupplier<XADataSource> {
            private final String jdbcUrl;
            private final String username;
            private final String password;

            public PgXaDataSourceFactory(String str, String str2, String str3) {
                this.jdbcUrl = str;
                this.username = str2;
                this.password = str3;
            }

            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public XADataSource m25get() {
                PGXADataSource pGXADataSource = new PGXADataSource();
                pGXADataSource.setUrl(this.jdbcUrl);
                pGXADataSource.setUser(this.username);
                pGXADataSource.setPassword(this.password);
                return pGXADataSource;
            }
        }

        /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$PgSqlJdbcExactlyOnceSinkTestEnv$PgXaDb.class */
        private static final class PgXaDb extends PostgreSQLContainer<PgXaDb> {
            private static final String IMAGE_NAME = "postgres:9.6.12";
            private static final int SUPERUSER_RESERVED_CONNECTIONS = 1;

            public String toString() {
                return IMAGE_NAME;
            }

            public PgXaDb(int i, int i2) {
                super(IMAGE_NAME);
                Preconditions.checkArgument(i > SUPERUSER_RESERVED_CONNECTIONS, "maxConnections should be greater than superuser_reserved_connections");
                setCommand(new String[]{"postgres", "-c", "superuser_reserved_connections=1", "-c", "max_connections=" + i, "-c", "max_prepared_transactions=" + i2, "-c", "fsync=off"});
            }
        }

        private PgSqlJdbcExactlyOnceSinkTestEnv(int i) {
            this.parallelism = i;
            this.db = new PgXaDb(i * 2, 50);
        }

        @Override // org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.JdbcExactlyOnceSinkTestEnv
        public void start() {
            this.db.start();
        }

        @Override // org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.JdbcExactlyOnceSinkTestEnv
        public void stop() {
            this.db.close();
        }

        @Override // org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.JdbcExactlyOnceSinkTestEnv
        public JdbcDatabaseContainer<?> getContainer() {
            return this.db;
        }

        @Override // org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.JdbcExactlyOnceSinkTestEnv
        public SerializableSupplier<XADataSource> getDataSourceSupplier() {
            return new PgXaDataSourceFactory(this.db.getJdbcUrl(), this.db.getUsername(), this.db.getPassword());
        }

        @Override // org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.JdbcExactlyOnceSinkTestEnv
        public int getParallelism() {
            return this.parallelism;
        }

        public String toString() {
            return this.db + ", parallelism=" + this.parallelism;
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$TestEntrySource.class */
    private static class TestEntrySource extends RichParallelSourceFunction<JdbcTestFixture.TestEntry> implements CheckpointListener, CheckpointedFunction {
        private final int numElements;
        private final int numElementsPerCheckpoint;
        private volatile transient ListState<SourceRange> ranges;
        private volatile long lastCheckpointId;
        private volatile boolean lastSnapshotConfirmed;
        private volatile boolean running;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$TestEntrySource$SourceRange.class */
        public static final class SourceRange {
            private int from;
            private final int to;

            private SourceRange(int i, int i2) {
                this.from = i;
                this.to = i2;
            }

            public static SourceRange forSubtask(int i, int i2) {
                return new SourceRange(i * i2, (i + 1) * i2);
            }

            public void advance() {
                Preconditions.checkState(this.from < this.to);
                this.from++;
            }

            public String toString() {
                return String.format("%d..%d", Integer.valueOf(this.from), Integer.valueOf(this.to));
            }
        }

        private TestEntrySource(int i, int i2) {
            this.lastCheckpointId = -1L;
            this.lastSnapshotConfirmed = false;
            this.running = true;
            this.numElements = i;
            this.numElementsPerCheckpoint = i2;
        }

        public void run(SourceFunction.SourceContext<JdbcTestFixture.TestEntry> sourceContext) throws Exception {
            try {
                waitForConsumers();
                Iterator it = ((Iterable) this.ranges.get()).iterator();
                while (it.hasNext()) {
                    emitRange((SourceRange) it.next(), sourceContext);
                }
                ((CountDownLatch) JdbcExactlyOnceSinkE2eTest.activeSources.get(Integer.valueOf(getRuntimeContext().getAttemptNumber()))).countDown();
                waitOtherSources();
            } catch (Throwable th) {
                ((CountDownLatch) JdbcExactlyOnceSinkE2eTest.activeSources.get(Integer.valueOf(getRuntimeContext().getAttemptNumber()))).countDown();
                throw th;
            }
        }

        private void waitForConsumers() throws InterruptedException {
            sleep(() -> {
                return Boolean.valueOf(!JdbcExactlyOnceSinkE2eTest.inactiveMappers.containsKey(Integer.valueOf(getRuntimeContext().getAttemptNumber())));
            });
            ((CountDownLatch) JdbcExactlyOnceSinkE2eTest.inactiveMappers.get(Integer.valueOf(getRuntimeContext().getAttemptNumber()))).await();
        }

        private void emitRange(SourceRange sourceRange, SourceFunction.SourceContext<JdbcTestFixture.TestEntry> sourceContext) {
            int i = sourceRange.from;
            while (true) {
                int i2 = i;
                if (i2 >= sourceRange.to || !this.running) {
                    return;
                }
                int min = Math.min(sourceRange.to - i2, this.numElementsPerCheckpoint);
                emit(i2, min, sourceRange, sourceContext);
                i = i2 + min;
            }
        }

        private void emit(int i, int i2, SourceRange sourceRange, SourceFunction.SourceContext<JdbcTestFixture.TestEntry> sourceContext) {
            synchronized (sourceContext.getCheckpointLock()) {
                this.lastCheckpointId = -1L;
                this.lastSnapshotConfirmed = false;
                for (int i3 = i; i3 < i + i2 && this.running; i3++) {
                    try {
                        sourceContext.collect(new JdbcTestFixture.TestEntry(Integer.valueOf(i3), Integer.toString(i3), Integer.toString(i3), Double.valueOf(i3), Integer.valueOf(i3)));
                        sourceRange.advance();
                    } catch (Exception e) {
                        if (!ExceptionUtils.findThrowable(e, TestException.class).isPresent()) {
                            JdbcExactlyOnceSinkE2eTest.LOG.warn("Exception during record emission", e);
                        }
                        throw e;
                    }
                }
            }
            sleep(() -> {
                return Boolean.valueOf(!this.lastSnapshotConfirmed);
            });
        }

        public void cancel() {
            this.running = false;
        }

        public void notifyCheckpointComplete(long j) {
            if (this.lastCheckpointId <= -1 || j < this.lastCheckpointId) {
                return;
            }
            this.lastSnapshotConfirmed = true;
        }

        public void open(Configuration configuration) throws Exception {
            JdbcExactlyOnceSinkE2eTest.activeSources.putIfAbsent(Integer.valueOf(getRuntimeContext().getAttemptNumber()), new CountDownLatch(getRuntimeContext().getNumberOfParallelSubtasks()));
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.ranges = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("SourceState", SourceRange.class));
            if (!functionInitializationContext.isRestored()) {
                this.ranges.update(Collections.singletonList(SourceRange.forSubtask(getRuntimeContext().getIndexOfThisSubtask(), this.numElements)));
            }
            JdbcExactlyOnceSinkE2eTest.LOG.debug("Source initialized with ranges: {}", this.ranges.get());
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
            this.lastCheckpointId = functionSnapshotContext.getCheckpointId();
        }

        private void sleep(Supplier<Boolean> supplier) {
            long currentTimeMillis = System.currentTimeMillis();
            while (supplier.get().booleanValue() && this.running && !Thread.currentThread().isInterrupted() && haveActiveSources()) {
                if (System.currentTimeMillis() - currentTimeMillis > 10000) {
                    JdbcExactlyOnceSinkE2eTest.LOG.debug("Slept more than 10s", new Exception());
                    currentTimeMillis = Long.MAX_VALUE;
                }
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    ExceptionUtils.rethrow(e);
                }
            }
        }

        private void waitOtherSources() throws InterruptedException {
            long currentTimeMillis = System.currentTimeMillis();
            while (this.running && haveActiveSources()) {
                if (System.currentTimeMillis() - currentTimeMillis > 10000) {
                    JdbcExactlyOnceSinkE2eTest.LOG.debug("Slept more than 10s", new Exception());
                    currentTimeMillis = Long.MAX_VALUE;
                }
                ((CountDownLatch) JdbcExactlyOnceSinkE2eTest.activeSources.get(Integer.valueOf(getRuntimeContext().getAttemptNumber()))).await(100L, TimeUnit.MILLISECONDS);
            }
        }

        private boolean haveActiveSources() {
            return ((CountDownLatch) JdbcExactlyOnceSinkE2eTest.activeSources.get(Integer.valueOf(getRuntimeContext().getAttemptNumber()))).getCount() > 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest$TestException.class */
    public static final class TestException extends Exception {
        public TestException() {
            super("java.lang.Exception: Artificial failure", null, true, false);
        }
    }

    @Parameterized.Parameters(name = "{0}")
    public static Collection<JdbcExactlyOnceSinkTestEnv> parameters() {
        return Arrays.asList(new PgSqlJdbcExactlyOnceSinkTestEnv(4), new MySqlJdbcExactlyOnceSinkTestEnv(4), new OracleJdbcExactlyOnceSinkTestEnv(4));
    }

    @Override // org.apache.flink.connector.jdbc.JdbcTestBase
    @Before
    public void before() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full");
        configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 20000L);
        configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofMillis(20000L));
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(this.dbEnv.getParallelism()).build());
        this.cluster.before();
        this.dbEnv.start();
        super.before();
    }

    @Override // org.apache.flink.connector.jdbc.JdbcTestBase
    @After
    public void after() {
        if (this.cluster != null) {
            this.cluster.after();
            this.cluster = null;
        }
        this.dbEnv.stop();
        activeSources.clear();
        inactiveMappers.clear();
    }

    @Test
    public void testInsert() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        LOG.info("Test insert for {}", this.dbEnv);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(this.dbEnv.getParallelism());
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.milliseconds(100L)));
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        executionEnvironment.enableCheckpointing(50L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setCheckpointTimeout(1000L);
        executionEnvironment.addSource(new TestEntrySource(50, 7)).setParallelism(this.dbEnv.getParallelism()).map(new FailingMapper(7 / 3, 7 * 3)).addSink(JdbcSink.exactlyOnceSink(String.format(JdbcTestFixture.INSERT_TEMPLATE, JdbcTestFixture.INPUT_TABLE), JdbcITCase.TEST_ENTRY_JDBC_STATEMENT_BUILDER, JdbcExecutionOptions.builder().withMaxRetries(0).build(), JdbcExactlyOnceOptions.builder().withTransactionPerConnection(true).build(), this.dbEnv.getDataSourceSupplier()));
        executionEnvironment.execute();
        List<Integer> insertedIds = JdbcXaFacadeTestHelper.getInsertedIds(this.dbEnv.getContainer().getJdbcUrl(), this.dbEnv.getContainer().getUsername(), this.dbEnv.getContainer().getPassword(), JdbcTestFixture.INPUT_TABLE);
        List list = (List) IntStream.range(0, 50 * this.dbEnv.getParallelism()).boxed().collect(Collectors.toList());
        Assert.assertTrue(insertedIds.toString(), insertedIds.size() == list.size() && list.containsAll(insertedIds));
        LOG.info("Test insert for {} finished in {} ms.", this.dbEnv, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.connector.jdbc.JdbcTestBase
    public DbMetadata getDbMetadata() {
        return new DbMetadata() { // from class: org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest.1
            @Override // org.apache.flink.connector.jdbc.DbMetadata
            public String getInitUrl() {
                return JdbcExactlyOnceSinkE2eTest.this.dbEnv.getContainer().getJdbcUrl();
            }

            @Override // org.apache.flink.connector.jdbc.DbMetadata
            public String getUrl() {
                return JdbcExactlyOnceSinkE2eTest.this.dbEnv.getContainer().getJdbcUrl();
            }

            @Override // org.apache.flink.connector.jdbc.DbMetadata
            public XADataSource buildXaDataSource() {
                throw new UnsupportedOperationException();
            }

            @Override // org.apache.flink.connector.jdbc.DbMetadata
            public String getDriverClass() {
                return JdbcExactlyOnceSinkE2eTest.this.dbEnv.getContainer().getDriverClassName();
            }

            @Override // org.apache.flink.connector.jdbc.DbMetadata
            public String getUser() {
                return JdbcExactlyOnceSinkE2eTest.this.dbEnv.getContainer().getUsername();
            }

            @Override // org.apache.flink.connector.jdbc.DbMetadata
            public String getPassword() {
                return JdbcExactlyOnceSinkE2eTest.this.dbEnv.getContainer().getPassword();
            }
        };
    }
}
