package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.MySqlChangeEventSourceMetricsFactory;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlDatabaseSchema;
import io.debezium.connector.mysql.MySqlErrorHandler;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.connector.mysql.MySqlTopicSelector;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import java.io.IOException;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.class */
public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
    private static final Logger log = LoggerFactory.getLogger(MySqlSourceFetchTaskContext.class);
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceFetchTaskContext.class);
    private final MySqlConnection connection;
    private final BinaryLogClient binaryLogClient;
    private final MySqlEventMetadataProvider metadataProvider;
    private MySqlDatabaseSchema databaseSchema;
    private MySqlTaskContextImpl taskContext;
    private MySqlOffsetContext offsetContext;
    private SnapshotChangeEventSourceMetrics<MySqlPartition> snapshotChangeEventSourceMetrics;
    private MySqlStreamingChangeEventSourceMetrics streamingChangeEventSourceMetrics;
    private TopicSelector<TableId> topicSelector;
    private JdbcSourceEventDispatcher<MySqlPartition> dispatcher;
    private MySqlPartition mySqlPartition;
    private ChangeEventQueue<DataChangeEvent> queue;
    private MySqlErrorHandler errorHandler;
    private RelationalDatabaseConnectorConfig dbzConnectorConfig;

    /* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext$MySqlEventMetadataProvider.class */
    public static class MySqlEventMetadataProvider implements EventMetadataProvider {
        public static final String SERVER_ID_KEY = "server_id";
        public static final String GTID_KEY = "gtid";
        public static final String BINLOG_FILENAME_OFFSET_KEY = "file";
        public static final String BINLOG_POSITION_OFFSET_KEY = "pos";
        public static final String BINLOG_ROW_IN_EVENT_OFFSET_KEY = "row";
        public static final String THREAD_KEY = "thread";
        public static final String QUERY_KEY = "query";

        @Override // io.debezium.pipeline.source.spi.EventMetadataProvider
        public Instant getEventTimestamp(DataCollectionId dataCollectionId, OffsetContext offsetContext, Object obj, Struct struct) {
            Long int64;
            if (struct == null) {
                return null;
            }
            Struct struct2 = struct.getStruct("source");
            if (dataCollectionId == null || (int64 = struct2.getInt64("ts_ms")) == null) {
                return null;
            }
            return Instant.ofEpochMilli(int64.longValue());
        }

        @Override // io.debezium.pipeline.source.spi.EventMetadataProvider
        public Map<String, String> getEventSourcePosition(DataCollectionId dataCollectionId, OffsetContext offsetContext, Object obj, Struct struct) {
            if (struct == null) {
                return null;
            }
            Struct struct2 = struct.getStruct("source");
            if (dataCollectionId == null) {
                return null;
            }
            return Collect.hashMapOf("file", struct2.getString("file"), "pos", Long.toString(struct2.getInt64("pos").longValue()), "row", Integer.toString(struct2.getInt32("row").intValue()));
        }

        @Override // io.debezium.pipeline.source.spi.EventMetadataProvider
        public String getTransactionId(DataCollectionId dataCollectionId, OffsetContext offsetContext, Object obj, Struct struct) {
            return ((MySqlOffsetContext) offsetContext).getTransactionId();
        }
    }

    /* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext$MySqlTaskContextImpl.class */
    public class MySqlTaskContextImpl extends MySqlTaskContext {
        private final BinaryLogClient reusedBinaryLogClient;

        public MySqlTaskContextImpl(MySqlConnectorConfig mySqlConnectorConfig, MySqlDatabaseSchema mySqlDatabaseSchema, BinaryLogClient binaryLogClient) {
            super(mySqlConnectorConfig, mySqlDatabaseSchema);
            this.reusedBinaryLogClient = resetBinaryLogClient(binaryLogClient);
        }

        @Override // io.debezium.connector.mysql.MySqlTaskContext
        public BinaryLogClient getBinaryLogClient() {
            return this.reusedBinaryLogClient;
        }

        private BinaryLogClient resetBinaryLogClient(BinaryLogClient binaryLogClient) {
            ReflectionUtils.getField(binaryLogClient, BinaryLogClient.class, "eventListeners").ifPresent(obj -> {
                ((List) obj).clear();
            });
            ReflectionUtils.getField(binaryLogClient, BinaryLogClient.class, "lifecycleListeners").ifPresent(obj2 -> {
                ((List) obj2).clear();
            });
            return binaryLogClient;
        }
    }

    public MySqlSourceFetchTaskContext(JdbcSourceConfig jdbcSourceConfig, JdbcDataSourceDialect jdbcDataSourceDialect) {
        super(jdbcSourceConfig, jdbcDataSourceDialect);
        this.dbzConnectorConfig = jdbcSourceConfig.getDbzConnectorConfig();
        this.connection = MySqlConnectionUtils.createMySqlConnection(jdbcSourceConfig.getDbzConfiguration());
        this.binaryLogClient = MySqlConnectionUtils.createBinaryClient(jdbcSourceConfig.getDbzConfiguration());
        this.metadataProvider = new MySqlEventMetadataProvider();
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public void configure(SourceSplitBase sourceSplitBase) {
        registerDatabaseHistory(sourceSplitBase);
        MySqlConnectorConfig dbzConnectorConfig = getDbzConnectorConfig();
        boolean isTableIdCaseSensitive = this.connection.isTableIdCaseSensitive();
        this.topicSelector = MySqlTopicSelector.defaultSelector(dbzConnectorConfig);
        this.databaseSchema = MySqlConnectionUtils.createMySqlDatabaseSchema(dbzConnectorConfig, isTableIdCaseSensitive);
        this.offsetContext = loadStartingOffsetState(new MySqlOffsetContext.Loader(dbzConnectorConfig), sourceSplitBase);
        this.mySqlPartition = new MySqlPartition(dbzConnectorConfig.getLogicalName());
        validateAndLoadDatabaseHistory(this.offsetContext, this.databaseSchema);
        this.taskContext = new MySqlTaskContextImpl(dbzConnectorConfig, this.databaseSchema, this.binaryLogClient);
        this.queue = new ChangeEventQueue.Builder().pollInterval(dbzConnectorConfig.getPollInterval()).maxBatchSize(dbzConnectorConfig.getMaxBatchSize()).maxQueueSize((sourceSplitBase.isSnapshotSplit() && isExactlyOnce()) ? Integer.MAX_VALUE : getSourceConfig().getDbzConnectorConfig().getMaxQueueSize()).maxQueueSizeInBytes(dbzConnectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> {
            return this.taskContext.configureLoggingContext("mysql-cdc-connector-task");
        }).build();
        this.dispatcher = new JdbcSourceEventDispatcher<>(dbzConnectorConfig, this.topicSelector, this.databaseSchema, this.queue, dbzConnectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, this.metadataProvider, this.schemaNameAdjuster);
        MySqlChangeEventSourceMetricsFactory mySqlChangeEventSourceMetricsFactory = new MySqlChangeEventSourceMetricsFactory(new MySqlStreamingChangeEventSourceMetrics(this.taskContext, this.queue, this.metadataProvider));
        this.snapshotChangeEventSourceMetrics = mySqlChangeEventSourceMetricsFactory.getSnapshotMetrics(this.taskContext, this.queue, this.metadataProvider);
        this.streamingChangeEventSourceMetrics = (MySqlStreamingChangeEventSourceMetrics) mySqlChangeEventSourceMetricsFactory.getStreamingMetrics(this.taskContext, this.queue, this.metadataProvider);
        this.errorHandler = new MySqlErrorHandler(dbzConnectorConfig, this.queue);
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public void close() {
        try {
            this.connection.close();
            this.binaryLogClient.disconnect();
        } catch (IOException e) {
            log.warn("Failed to close binaryLogClient", e);
        } catch (SQLException e2) {
            log.warn("Failed to close connection", e2);
        }
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext
    public MySqlSourceConfig getSourceConfig() {
        return (MySqlSourceConfig) this.sourceConfig;
    }

    public MySqlConnection getConnection() {
        return this.connection;
    }

    public BinaryLogClient getBinaryLogClient() {
        return this.binaryLogClient;
    }

    public MySqlTaskContextImpl getTaskContext() {
        return this.taskContext;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext
    public MySqlConnectorConfig getDbzConnectorConfig() {
        return (MySqlConnectorConfig) super.getDbzConnectorConfig();
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext
    public MySqlOffsetContext getOffsetContext() {
        return this.offsetContext;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext
    public MySqlPartition getPartition() {
        return this.mySqlPartition;
    }

    public SnapshotChangeEventSourceMetrics<MySqlPartition> getSnapshotChangeEventSourceMetrics() {
        return this.snapshotChangeEventSourceMetrics;
    }

    public MySqlStreamingChangeEventSourceMetrics getStreamingChangeEventSourceMetrics() {
        return this.streamingChangeEventSourceMetrics;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext
    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext
    public MySqlDatabaseSchema getDatabaseSchema() {
        return this.databaseSchema;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext
    public SeaTunnelRowType getSplitType(Table table) {
        return MySqlUtils.getSplitType(table, this.dbzConnectorConfig);
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext
    public JdbcSourceEventDispatcher<MySqlPartition> getDispatcher() {
        return this.dispatcher;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.queue;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public Tables.TableFilter getTableFilter() {
        return getDbzConnectorConfig().getTableFilters().dataCollectionFilter();
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask.Context
    public Offset getStreamOffset(SourceRecord sourceRecord) {
        return MySqlUtils.getBinlogPosition(sourceRecord);
    }

    private MySqlOffsetContext loadStartingOffsetState(MySqlOffsetContext.Loader loader, SourceSplitBase sourceSplitBase) {
        MySqlOffsetContext load = loader.load((Map<String, ?>) (sourceSplitBase.isSnapshotSplit() ? BinlogOffset.INITIAL_OFFSET : sourceSplitBase.asIncrementalSplit().getStartupOffset()).getOffset());
        if (isBinlogAvailable(load)) {
            return load;
        }
        throw new IllegalStateException("The connector is trying to read binlog starting at " + load.getSourceInfo() + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.");
    }

    private boolean isBinlogAvailable(MySqlOffsetContext mySqlOffsetContext) {
        String string = mySqlOffsetContext.getSourceInfo().getString("file");
        if (string == null || string.equals("")) {
            return true;
        }
        List<String> availableBinlogFiles = this.connection.availableBinlogFiles();
        Stream<String> stream = availableBinlogFiles.stream();
        Objects.requireNonNull(string);
        boolean anyMatch = stream.anyMatch((v1) -> {
            return r1.equals(v1);
        });
        if (anyMatch) {
            LOG.info("MySQL has the binlog file '{}' required by the connector", string);
        } else {
            LOG.info("Connector requires binlog file '{}', but MySQL only has {}", string, String.join(", ", availableBinlogFiles));
        }
        return anyMatch;
    }

    private void validateAndLoadDatabaseHistory(MySqlOffsetContext mySqlOffsetContext, MySqlDatabaseSchema mySqlDatabaseSchema) {
        mySqlDatabaseSchema.initializeStorage();
        mySqlDatabaseSchema.recover(Offsets.of(this.mySqlPartition, mySqlOffsetContext));
    }

    private void registerDatabaseHistory(SourceSplitBase sourceSplitBase) {
        ArrayList arrayList = new ArrayList();
        if (sourceSplitBase instanceof SnapshotSplit) {
            arrayList.add(this.dataSourceDialect.queryTableSchema(this.connection, ((SnapshotSplit) sourceSplitBase).getTableId()));
        } else {
            Iterator<TableId> it = ((IncrementalSplit) sourceSplitBase).getTableIds().iterator();
            while (it.hasNext()) {
                arrayList.add(this.dataSourceDialect.queryTableSchema(this.connection, it.next()));
            }
        }
        EmbeddedDatabaseHistory.registerHistory(this.sourceConfig.getDbzConfiguration().getString(EmbeddedDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME), arrayList);
    }
}
