package org.apache.flink.cdc.connectors.db2.source.fetch;

import io.debezium.DebeziumException;
import io.debezium.connector.db2.Db2Connection;
import io.debezium.connector.db2.Db2ConnectorConfig;
import io.debezium.connector.db2.Db2DatabaseSchema;
import io.debezium.connector.db2.Db2OffsetContext;
import io.debezium.connector.db2.Db2Partition;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.SnapshotChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Map;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.db2.source.fetch.Db2StreamFetchTask;
import org.apache.flink.cdc.connectors.db2.source.utils.Db2Utils;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask.class */
public class Db2ScanFetchTask extends AbstractScanFetchTask {

    /* loaded from: input_file:org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask$Db2SnapshotSplitChangeEventSourceContext.class */
    public class Db2SnapshotSplitChangeEventSourceContext implements ChangeEventSource.ChangeEventSourceContext {
        public Db2SnapshotSplitChangeEventSourceContext() {
        }

        public void finished() {
            Db2ScanFetchTask.this.taskRunning = false;
        }

        @Override // io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext
        public boolean isRunning() {
            return Db2ScanFetchTask.this.taskRunning;
        }
    }

    /* loaded from: input_file:org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask$Db2SnapshotSplitReadTask.class */
    public static class Db2SnapshotSplitReadTask extends AbstractSnapshotChangeEventSource<Db2Partition, Db2OffsetContext> {
        private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Db2SnapshotSplitReadTask.class);
        private static final Duration LOG_INTERVAL = Duration.ofMillis(10000);
        private final Db2ConnectorConfig connectorConfig;
        private final Db2DatabaseSchema databaseSchema;
        private final Db2Connection jdbcConnection;
        private final JdbcSourceEventDispatcher<Db2Partition> dispatcher;
        private final Clock clock;
        private final SnapshotSplit snapshotSplit;
        private final Db2OffsetContext offsetContext;
        private final SnapshotProgressListener<Db2Partition> snapshotProgressListener;
        private final EventDispatcher.SnapshotReceiver<Db2Partition> snapshotReceiver;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/cdc/connectors/db2/source/fetch/Db2ScanFetchTask$Db2SnapshotSplitReadTask$Db2SnapshotContext.class */
        public static class Db2SnapshotContext extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<Db2Partition, Db2OffsetContext> {
            public Db2SnapshotContext(Db2Partition db2Partition) throws SQLException {
                super(db2Partition, "");
            }
        }

        public Db2SnapshotSplitReadTask(Db2ConnectorConfig db2ConnectorConfig, Db2OffsetContext db2OffsetContext, SnapshotProgressListener<Db2Partition> snapshotProgressListener, Db2DatabaseSchema db2DatabaseSchema, Db2Connection db2Connection, JdbcSourceEventDispatcher<Db2Partition> jdbcSourceEventDispatcher, EventDispatcher.SnapshotReceiver<Db2Partition> snapshotReceiver, SnapshotSplit snapshotSplit) {
            super(db2ConnectorConfig, snapshotProgressListener);
            this.offsetContext = db2OffsetContext;
            this.connectorConfig = db2ConnectorConfig;
            this.databaseSchema = db2DatabaseSchema;
            this.jdbcConnection = db2Connection;
            this.dispatcher = jdbcSourceEventDispatcher;
            this.clock = Clock.SYSTEM;
            this.snapshotSplit = snapshotSplit;
            this.snapshotProgressListener = snapshotProgressListener;
            this.snapshotReceiver = snapshotReceiver;
        }

        @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource, io.debezium.pipeline.source.spi.SnapshotChangeEventSource
        public SnapshotResult<Db2OffsetContext> execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, Db2Partition db2Partition, Db2OffsetContext db2OffsetContext) throws InterruptedException {
            try {
                try {
                    return doExecute(changeEventSourceContext, db2OffsetContext, (AbstractSnapshotChangeEventSource.SnapshotContext<Db2Partition, Db2OffsetContext>) prepare(db2Partition), getSnapshottingTask(db2Partition, db2OffsetContext));
                } catch (InterruptedException e) {
                    LOG.warn("Snapshot was interrupted before completion");
                    throw e;
                } catch (Exception e2) {
                    throw new DebeziumException(e2);
                }
            } catch (Exception e3) {
                LOG.error("Failed to initialize snapshot context.", (Throwable) e3);
                throw new RuntimeException(e3);
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
        public SnapshotResult<Db2OffsetContext> doExecute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, Db2OffsetContext db2OffsetContext, AbstractSnapshotChangeEventSource.SnapshotContext<Db2Partition, Db2OffsetContext> snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
            Db2SnapshotContext db2SnapshotContext = (Db2SnapshotContext) snapshotContext;
            db2SnapshotContext.offset = this.offsetContext;
            createDataEvents(db2SnapshotContext, this.snapshotSplit.getTableId());
            return SnapshotResult.completed(db2SnapshotContext.offset);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
        public AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(Db2Partition db2Partition, Db2OffsetContext db2OffsetContext) {
            return new AbstractSnapshotChangeEventSource.SnapshottingTask(false, true);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
        public Db2SnapshotContext prepare(Db2Partition db2Partition) throws Exception {
            return new Db2SnapshotContext(db2Partition);
        }

        private void createDataEvents(Db2SnapshotContext db2SnapshotContext, TableId tableId) throws Exception {
            LOG.debug("Snapshotting table {}", tableId);
            createDataEventsForTable(db2SnapshotContext, this.snapshotReceiver, this.databaseSchema.tableFor(tableId));
            this.snapshotReceiver.completeSnapshot();
        }

        /* JADX WARN: Failed to calculate best type for var: r17v1 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r18v0 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
        	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
        	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
        	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
        	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
         */
        /* JADX WARN: Not initialized variable reg: 17, insn: 0x0248: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r17 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:75:0x0248 */
        /* JADX WARN: Not initialized variable reg: 18, insn: 0x024d: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r18 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:77:0x024d */
        /* JADX WARN: Type inference failed for: r17v1, types: [java.sql.PreparedStatement] */
        /* JADX WARN: Type inference failed for: r18v0, types: [java.lang.Throwable] */
        private void createDataEventsForTable(Db2SnapshotContext db2SnapshotContext, EventDispatcher.SnapshotReceiver<Db2Partition> snapshotReceiver, Table table) throws InterruptedException {
            long currentTimeInMillis = this.clock.currentTimeInMillis();
            LOG.info("Exporting data from split '{}' of table {}", this.snapshotSplit.splitId(), table.id());
            String buildSplitScanQuery = Db2Utils.buildSplitScanQuery(this.snapshotSplit.getTableId(), this.snapshotSplit.getSplitKeyType(), this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null);
            LOG.info("For split '{}' of table {} using select statement: '{}'", this.snapshotSplit.splitId(), table.id(), buildSplitScanQuery);
            try {
                try {
                    PreparedStatement readTableSplitDataStatement = Db2Utils.readTableSplitDataStatement(this.jdbcConnection, buildSplitScanQuery, this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null, this.snapshotSplit.getSplitStart(), this.snapshotSplit.getSplitEnd(), this.snapshotSplit.getSplitKeyType().getFieldCount(), this.connectorConfig.getQueryFetchSize());
                    Throwable th = null;
                    ResultSet executeQuery = readTableSplitDataStatement.executeQuery();
                    Throwable th2 = null;
                    try {
                        try {
                            ColumnUtils.ColumnArray array = ColumnUtils.toArray(executeQuery, table);
                            long j = 0;
                            Threads.Timer tableScanLogTimer = getTableScanLogTimer();
                            while (executeQuery.next()) {
                                j++;
                                Object[] rowToArray = this.jdbcConnection.rowToArray(table, this.databaseSchema, executeQuery, array);
                                if (tableScanLogTimer.expired()) {
                                    LOG.info("Exported {} records for split '{}' after {}", Long.valueOf(j), this.snapshotSplit.splitId(), Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis));
                                    this.snapshotProgressListener.rowsScanned(db2SnapshotContext.partition, table.id(), j);
                                    tableScanLogTimer = getTableScanLogTimer();
                                }
                                this.dispatcher.dispatchSnapshotEvent(db2SnapshotContext.partition, table.id(), getChangeRecordEmitter(db2SnapshotContext, table.id(), rowToArray), snapshotReceiver);
                            }
                            LOG.info("Finished exporting {} records for split '{}', total duration '{}'", Long.valueOf(j), this.snapshotSplit.splitId(), Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis));
                            if (executeQuery != null) {
                                if (0 != 0) {
                                    try {
                                        executeQuery.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    executeQuery.close();
                                }
                            }
                            if (readTableSplitDataStatement != null) {
                                if (0 != 0) {
                                    try {
                                        readTableSplitDataStatement.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    readTableSplitDataStatement.close();
                                }
                            }
                        } finally {
                        }
                    } catch (Throwable th5) {
                        if (executeQuery != null) {
                            if (th2 != null) {
                                try {
                                    executeQuery.close();
                                } catch (Throwable th6) {
                                    th2.addSuppressed(th6);
                                }
                            } else {
                                executeQuery.close();
                            }
                        }
                        throw th5;
                    }
                } finally {
                }
            } catch (SQLException e) {
                throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);
            }
        }

        protected ChangeRecordEmitter<Db2Partition> getChangeRecordEmitter(Db2SnapshotContext db2SnapshotContext, TableId tableId, Object[] objArr) {
            ((Db2OffsetContext) db2SnapshotContext.offset).event(tableId, this.clock.currentTime());
            return new SnapshotChangeRecordEmitter(db2SnapshotContext.partition, db2SnapshotContext.offset, objArr, this.clock);
        }

        private Threads.Timer getTableScanLogTimer() {
            return Threads.timer(this.clock, LOG_INTERVAL);
        }
    }

    public Db2ScanFetchTask(SnapshotSplit snapshotSplit) {
        super(snapshotSplit);
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask
    protected void executeDataSnapshot(FetchTask.Context context) throws Exception {
        Db2SourceFetchTaskContext db2SourceFetchTaskContext = (Db2SourceFetchTaskContext) context;
        if (new Db2SnapshotSplitReadTask(db2SourceFetchTaskContext.getDbzConnectorConfig(), db2SourceFetchTaskContext.getOffsetContext(), db2SourceFetchTaskContext.getSnapshotChangeEventSourceMetrics(), db2SourceFetchTaskContext.getDatabaseSchema(), db2SourceFetchTaskContext.getConnection(), db2SourceFetchTaskContext.getDispatcher(), db2SourceFetchTaskContext.getSnapshotReceiver(), this.snapshotSplit).execute((ChangeEventSource.ChangeEventSourceContext) new Db2SnapshotSplitChangeEventSourceContext(), db2SourceFetchTaskContext.getPartition(), db2SourceFetchTaskContext.getOffsetContext()).isCompletedOrSkipped()) {
            return;
        }
        this.taskRunning = false;
        throw new IllegalStateException(String.format("Read snapshot for Db2 split %s fail", this.snapshotSplit));
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask
    protected void executeBackfillTask(FetchTask.Context context, StreamSplit streamSplit) throws Exception {
        Db2SourceFetchTaskContext db2SourceFetchTaskContext = (Db2SourceFetchTaskContext) context;
        createBackFillLsnSplitReadTask(streamSplit, db2SourceFetchTaskContext).execute((ChangeEventSource.ChangeEventSourceContext) new Db2SnapshotSplitChangeEventSourceContext(), db2SourceFetchTaskContext.getPartition(), new Db2OffsetContext.Loader(db2SourceFetchTaskContext.getDbzConnectorConfig()).load((Map<String, ?>) streamSplit.getStartingOffset().getOffset()));
    }

    private Db2StreamFetchTask.StreamSplitReadTask createBackFillLsnSplitReadTask(StreamSplit streamSplit, Db2SourceFetchTaskContext db2SourceFetchTaskContext) {
        return new Db2StreamFetchTask.StreamSplitReadTask(new Db2ConnectorConfig(db2SourceFetchTaskContext.getDbzConnectorConfig().getConfig().edit().with("table.include.list", new TableId(null, this.snapshotSplit.getTableId().schema(), this.snapshotSplit.getTableId().table())).with(Heartbeat.HEARTBEAT_INTERVAL, 0).build()), db2SourceFetchTaskContext.getConnection(), db2SourceFetchTaskContext.getMetaDataConnection(), db2SourceFetchTaskContext.getDispatcher(), db2SourceFetchTaskContext.getErrorHandler(), db2SourceFetchTaskContext.getDatabaseSchema(), streamSplit);
    }
}
