package org.apache.flink.cdc.connectors.db2.source.fetch;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.db2.Db2Connection;
import io.debezium.connector.db2.Db2Connector;
import io.debezium.connector.db2.Db2ConnectorConfig;
import io.debezium.connector.db2.Db2DatabaseSchema;
import io.debezium.connector.db2.Db2OffsetContext;
import io.debezium.connector.db2.Db2Partition;
import io.debezium.connector.db2.Db2TaskContext;
import io.debezium.connector.db2.Db2TopicSelector;
import io.debezium.connector.db2.SourceInfo;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import io.debezium.util.SchemaNameAdjuster;
import java.time.Instant;
import java.util.Map;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.flink.cdc.connectors.db2.source.config.Db2SourceConfig;
import org.apache.flink.cdc.connectors.db2.source.dialect.Db2Dialect;
import org.apache.flink.cdc.connectors.db2.source.handler.Db2SchemaChangeEventHandler;
import org.apache.flink.cdc.connectors.db2.source.offset.LsnOffset;
import org.apache.flink.cdc.connectors.db2.source.utils.Db2Utils;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.apache.flink.table.types.logical.RowType;

/* loaded from: input_file:org/apache/flink/cdc/connectors/db2/source/fetch/Db2SourceFetchTaskContext.class */
public class Db2SourceFetchTaskContext extends JdbcSourceFetchTaskContext {
    private final Db2Connection connection;
    private final Db2Connection metaDataConnection;
    private final Db2EventMetadataProvider metadataProvider;
    private Db2OffsetContext offsetContext;
    private Db2Partition partition;
    private Db2DatabaseSchema databaseSchema;
    private JdbcSourceEventDispatcher<Db2Partition> dispatcher;
    private ErrorHandler errorHandler;
    private ChangeEventQueue<DataChangeEvent> queue;
    private Db2TaskContext taskContext;
    private TopicSelector<TableId> topicSelector;
    private EventDispatcher.SnapshotReceiver<Db2Partition> snapshotReceiver;
    private SnapshotChangeEventSourceMetrics<Db2Partition> snapshotChangeEventSourceMetrics;
    private StreamingChangeEventSourceMetrics<Db2Partition> streamingChangeEventSourceMetrics;

    /* loaded from: input_file:org/apache/flink/cdc/connectors/db2/source/fetch/Db2SourceFetchTaskContext$Db2EventMetadataProvider.class */
    public static class Db2EventMetadataProvider implements EventMetadataProvider {
        @Override // io.debezium.pipeline.source.spi.EventMetadataProvider
        public Instant getEventTimestamp(DataCollectionId dataCollectionId, OffsetContext offsetContext, Object obj, Struct struct) {
            Long int64;
            if (struct == null) {
                return null;
            }
            Struct struct2 = struct.getStruct("source");
            if (dataCollectionId == null || (int64 = struct2.getInt64("ts_ms")) == null) {
                return null;
            }
            return Instant.ofEpochMilli(int64.longValue());
        }

        @Override // io.debezium.pipeline.source.spi.EventMetadataProvider
        public Map<String, String> getEventSourcePosition(DataCollectionId dataCollectionId, OffsetContext offsetContext, Object obj, Struct struct) {
            if (struct == null) {
                return null;
            }
            Struct struct2 = struct.getStruct("source");
            if (dataCollectionId == null) {
                return null;
            }
            return Collect.hashMapOf(SourceInfo.COMMIT_LSN_KEY, struct2.getString(SourceInfo.COMMIT_LSN_KEY), SourceInfo.CHANGE_LSN_KEY, struct2.getString(SourceInfo.CHANGE_LSN_KEY));
        }

        @Override // io.debezium.pipeline.source.spi.EventMetadataProvider
        public String getTransactionId(DataCollectionId dataCollectionId, OffsetContext offsetContext, Object obj, Struct struct) {
            if (struct == null) {
                return null;
            }
            Struct struct2 = struct.getStruct("source");
            if (dataCollectionId == null) {
                return null;
            }
            return struct2.getString(SourceInfo.COMMIT_LSN_KEY);
        }
    }

    public Db2SourceFetchTaskContext(JdbcSourceConfig jdbcSourceConfig, Db2Dialect db2Dialect, Db2Connection db2Connection, Db2Connection db2Connection2) {
        super(jdbcSourceConfig, db2Dialect);
        this.connection = db2Connection;
        this.metadataProvider = new Db2EventMetadataProvider();
        this.metaDataConnection = db2Connection2;
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask.Context
    public void configure(SourceSplitBase sourceSplitBase) {
        Db2ConnectorConfig dbzConnectorConfig = getDbzConnectorConfig();
        this.topicSelector = Db2TopicSelector.defaultSelector(dbzConnectorConfig);
        EmbeddedFlinkDatabaseHistory.registerHistory(this.sourceConfig.getDbzConfiguration().getString("database.history.instance.name"), sourceSplitBase.getTableSchemas().values());
        this.databaseSchema = Db2Utils.createDb2DatabaseSchema(dbzConnectorConfig, this.connection);
        this.offsetContext = loadStartingOffsetState(new Db2OffsetContext.Loader(dbzConnectorConfig), sourceSplitBase);
        this.partition = new Db2Partition(dbzConnectorConfig.getLogicalName());
        validateAndLoadDatabaseHistory(this.offsetContext, this.databaseSchema);
        this.taskContext = new Db2TaskContext(dbzConnectorConfig, this.databaseSchema);
        this.queue = new ChangeEventQueue.Builder().pollInterval(dbzConnectorConfig.getPollInterval()).maxBatchSize(dbzConnectorConfig.getMaxBatchSize()).maxQueueSize(sourceSplitBase.isSnapshotSplit() ? getSourceConfig().getSplitSize() : getSourceConfig().getDbzConnectorConfig().getMaxQueueSize()).maxQueueSizeInBytes(dbzConnectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> {
            return this.taskContext.configureLoggingContext("Db2-cdc-connector-task");
        }).build();
        this.dispatcher = new JdbcSourceEventDispatcher<>(dbzConnectorConfig, this.topicSelector, this.databaseSchema, this.queue, dbzConnectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, this.metadataProvider, this.schemaNameAdjuster, new Db2SchemaChangeEventHandler());
        this.snapshotReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
        DefaultChangeEventSourceMetricsFactory defaultChangeEventSourceMetricsFactory = new DefaultChangeEventSourceMetricsFactory();
        this.snapshotChangeEventSourceMetrics = defaultChangeEventSourceMetricsFactory.getSnapshotMetrics(this.taskContext, this.queue, this.metadataProvider);
        this.streamingChangeEventSourceMetrics = defaultChangeEventSourceMetricsFactory.getStreamingMetrics(this.taskContext, this.queue, this.metadataProvider);
        this.errorHandler = new ErrorHandler(Db2Connector.class, dbzConnectorConfig, this.queue);
    }

    private Db2OffsetContext loadStartingOffsetState(Db2OffsetContext.Loader loader, SourceSplitBase sourceSplitBase) {
        return loader.load((Map<String, ?>) (sourceSplitBase.isSnapshotSplit() ? LsnOffset.INITIAL_OFFSET : sourceSplitBase.asStreamSplit().getStartingOffset()).getOffset());
    }

    private void validateAndLoadDatabaseHistory(Db2OffsetContext db2OffsetContext, Db2DatabaseSchema db2DatabaseSchema) {
        db2DatabaseSchema.initializeStorage();
        db2DatabaseSchema.recover(this.partition, db2OffsetContext);
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask.Context
    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.queue;
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask.Context
    public Tables.TableFilter getTableFilter() {
        return getDbzConnectorConfig().getTableFilters().dataCollectionFilter();
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask.Context
    public Offset getStreamOffset(SourceRecord sourceRecord) {
        return Db2Utils.getLsn(sourceRecord);
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext
    public Db2DatabaseSchema getDatabaseSchema() {
        return this.databaseSchema;
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext
    public RowType getSplitType(Table table) {
        return Db2Utils.getSplitType(Db2Utils.getSplitColumn(table, this.sourceConfig.getChunkKeyColumn()));
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext
    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext
    public Db2ConnectorConfig getDbzConnectorConfig() {
        return (Db2ConnectorConfig) super.getDbzConnectorConfig();
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext, org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask.Context
    public Db2SourceConfig getSourceConfig() {
        return (Db2SourceConfig) this.sourceConfig;
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext
    public JdbcSourceEventDispatcher<Db2Partition> getDispatcher() {
        return this.dispatcher;
    }

    public EventDispatcher.SnapshotReceiver<Db2Partition> getSnapshotReceiver() {
        return this.snapshotReceiver;
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext
    public Db2OffsetContext getOffsetContext() {
        return this.offsetContext;
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext
    public Db2Partition getPartition() {
        return this.partition;
    }

    public Db2Connection getConnection() {
        return this.connection;
    }

    public Db2Connection getMetaDataConnection() {
        return this.metaDataConnection;
    }

    public SnapshotChangeEventSourceMetrics<Db2Partition> getSnapshotChangeEventSourceMetrics() {
        return this.snapshotChangeEventSourceMetrics;
    }

    public StreamingChangeEventSourceMetrics<Db2Partition> getStreamingChangeEventSourceMetrics() {
        return this.streamingChangeEventSourceMetrics;
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask.Context
    public void close() throws Exception {
        this.metaDataConnection.commit();
        this.connection.commit();
        this.metaDataConnection.close();
        this.connection.close();
    }

    @Override // org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext
    public SchemaNameAdjuster getSchemaNameAdjuster() {
        return this.schemaNameAdjuster;
    }
}
