/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.oracle.source.reader.fetch;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.oracle.OracleChangeEventSourceMetricsFactory;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleErrorHandler;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.connector.oracle.OracleTopicSelector;
import io.debezium.connector.oracle.logminer.LogMinerOracleOffsetContextLoader;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import io.debezium.util.SchemaNameAdjuster;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Map;
import oracle.sql.ROWID;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.relational.handler.SchemaChangeEventHandler;
import org.apache.flink.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig;
import org.apache.flink.cdc.connectors.oracle.source.handler.OracleSchemaChangeEventHandler;
import org.apache.flink.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import org.apache.flink.cdc.connectors.oracle.source.utils.OracleConnectionUtils;
import org.apache.flink.cdc.connectors.oracle.source.utils.OracleUtils;
import org.apache.flink.cdc.connectors.oracle.util.ChunkUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OracleSourceFetchTaskContext
extends JdbcSourceFetchTaskContext {
    private static final Logger LOG = LoggerFactory.getLogger(OracleSourceFetchTaskContext.class);
    private final OracleConnection connection;
    private final OracleEventMetadataProvider metadataProvider;
    private OracleDatabaseSchema databaseSchema;
    private OracleTaskContext taskContext;
    private OracleOffsetContext offsetContext;
    private OraclePartition partition;
    private SnapshotChangeEventSourceMetrics<OraclePartition> snapshotChangeEventSourceMetrics;
    private OracleStreamingChangeEventSourceMetrics streamingChangeEventSourceMetrics;
    private TopicSelector<TableId> topicSelector;
    private JdbcSourceEventDispatcher<OraclePartition> dispatcher;
    private ChangeEventQueue<DataChangeEvent> queue;
    private OracleErrorHandler errorHandler;

    public OracleSourceFetchTaskContext(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
        super(sourceConfig, dataSourceDialect);
        this.connection = OracleConnectionUtils.createOracleConnection(sourceConfig.getDbzConfiguration());
        this.metadataProvider = new OracleEventMetadataProvider();
    }

    public void configure(SourceSplitBase sourceSplitBase) {
        OracleConnectorConfig connectorConfig = this.getDbzConnectorConfig();
        this.topicSelector = OracleTopicSelector.defaultSelector((OracleConnectorConfig)connectorConfig);
        EmbeddedFlinkDatabaseHistory.registerHistory((String)this.sourceConfig.getDbzConfiguration().getString("database.history.instance.name"), sourceSplitBase.getTableSchemas().values());
        this.databaseSchema = OracleUtils.createOracleDatabaseSchema(connectorConfig, this.connection);
        this.offsetContext = this.loadStartingOffsetState((OffsetContext.Loader<OracleOffsetContext>)new LogMinerOracleOffsetContextLoader(connectorConfig), sourceSplitBase);
        this.partition = new OraclePartition(connectorConfig.getLogicalName());
        this.validateAndLoadDatabaseHistory(this.offsetContext, this.databaseSchema);
        this.taskContext = new OracleTaskContext(connectorConfig, this.databaseSchema);
        int queueSize = sourceSplitBase.isSnapshotSplit() ? this.getSourceConfig().getSplitSize() : this.getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
        this.queue = new ChangeEventQueue.Builder().pollInterval(connectorConfig.getPollInterval()).maxBatchSize(connectorConfig.getMaxBatchSize()).maxQueueSize(queueSize).maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> this.taskContext.configureLoggingContext("oracle-cdc-connector-task")).build();
        this.dispatcher = new JdbcSourceEventDispatcher((CommonConnectorConfig)connectorConfig, this.topicSelector, (DatabaseSchema)this.databaseSchema, this.queue, (DataCollectionFilters.DataCollectionFilter)connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, (EventMetadataProvider)this.metadataProvider, this.schemaNameAdjuster, (SchemaChangeEventHandler)new OracleSchemaChangeEventHandler());
        OracleChangeEventSourceMetricsFactory changeEventSourceMetricsFactory = new OracleChangeEventSourceMetricsFactory(new OracleStreamingChangeEventSourceMetrics((CdcSourceTaskContext)this.taskContext, this.queue, (EventMetadataProvider)this.metadataProvider, connectorConfig));
        this.snapshotChangeEventSourceMetrics = changeEventSourceMetricsFactory.getSnapshotMetrics((CdcSourceTaskContext)this.taskContext, this.queue, (EventMetadataProvider)this.metadataProvider);
        this.streamingChangeEventSourceMetrics = (OracleStreamingChangeEventSourceMetrics)changeEventSourceMetricsFactory.getStreamingMetrics((CdcSourceTaskContext)this.taskContext, this.queue, (EventMetadataProvider)this.metadataProvider);
        this.errorHandler = new OracleErrorHandler(connectorConfig, this.queue);
    }

    public OracleSourceConfig getSourceConfig() {
        return (OracleSourceConfig)this.sourceConfig;
    }

    public OracleConnection getConnection() {
        return this.connection;
    }

    public OracleConnectorConfig getDbzConnectorConfig() {
        return (OracleConnectorConfig)super.getDbzConnectorConfig();
    }

    public OracleOffsetContext getOffsetContext() {
        return this.offsetContext;
    }

    public SnapshotChangeEventSourceMetrics<OraclePartition> getSnapshotChangeEventSourceMetrics() {
        return this.snapshotChangeEventSourceMetrics;
    }

    public OracleStreamingChangeEventSourceMetrics getStreamingChangeEventSourceMetrics() {
        return this.streamingChangeEventSourceMetrics;
    }

    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    public OracleDatabaseSchema getDatabaseSchema() {
        return this.databaseSchema;
    }

    public RowType getSplitType(Table table) {
        OracleSourceConfig oracleSourceConfig = this.getSourceConfig();
        return ChunkUtils.getSplitType(ChunkUtils.getChunkKeyColumn(table, oracleSourceConfig.getChunkKeyColumn()));
    }

    public boolean isRecordBetween(SourceRecord record, Object[] splitStart, Object[] splitEnd) {
        RowType splitKeyType = this.getSplitType(this.getDatabaseSchema().tableFor(SourceRecordUtils.getTableId((SourceRecord)record)));
        if (splitKeyType.getFieldNames().contains(ROWID.class.getSimpleName())) {
            ConnectHeaders headers = (ConnectHeaders)record.headers();
            ROWID rowId = null;
            try {
                rowId = new ROWID(((Header)headers.iterator().next()).value().toString());
            }
            catch (SQLException e) {
                LOG.error("{} can not convert to RowId", (Object)record);
            }
            Object[] rowIds = new ROWID[]{rowId};
            return SourceRecordUtils.splitKeyRangeContains((Object[])rowIds, (Object[])splitStart, (Object[])splitEnd);
        }
        Object[] key = SourceRecordUtils.getSplitKey((RowType)splitKeyType, (SourceRecord)record, (SchemaNameAdjuster)this.getSchemaNameAdjuster());
        return SourceRecordUtils.splitKeyRangeContains((Object[])key, (Object[])splitStart, (Object[])splitEnd);
    }

    public JdbcSourceEventDispatcher<OraclePartition> getDispatcher() {
        return this.dispatcher;
    }

    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.queue;
    }

    public OraclePartition getPartition() {
        return this.partition;
    }

    public Tables.TableFilter getTableFilter() {
        return this.getDbzConnectorConfig().getTableFilters().dataCollectionFilter();
    }

    public Offset getStreamOffset(SourceRecord sourceRecord) {
        return OracleUtils.getRedoLogPosition(sourceRecord);
    }

    public void close() throws Exception {
        this.connection.close();
    }

    private OracleOffsetContext loadStartingOffsetState(OffsetContext.Loader<OracleOffsetContext> loader, SourceSplitBase oracleSplit) {
        RedoLogOffset offset = oracleSplit.isSnapshotSplit() ? RedoLogOffset.INITIAL_OFFSET : oracleSplit.asStreamSplit().getStartingOffset();
        return (OracleOffsetContext)loader.load(offset.getOffset());
    }

    private void validateAndLoadDatabaseHistory(OracleOffsetContext offset, OracleDatabaseSchema schema) {
        schema.initializeStorage();
        schema.recover(Offsets.of((Partition)this.partition, (OffsetContext)offset));
    }

    public static class OracleEventMetadataProvider
    implements EventMetadataProvider {
        public Instant getEventTimestamp(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            if (value == null) {
                return null;
            }
            Struct sourceInfo = value.getStruct("source");
            if (source == null) {
                return null;
            }
            Long timestamp = sourceInfo.getInt64("ts_ms");
            return timestamp == null ? null : Instant.ofEpochMilli(timestamp);
        }

        public Map<String, String> getEventSourcePosition(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            if (value == null) {
                return null;
            }
            Struct sourceInfo = value.getStruct("source");
            if (source == null) {
                return null;
            }
            String scn = sourceInfo.getString("scn");
            return Collect.hashMapOf((Object)"scn", (Object)(scn == null ? "null" : scn));
        }

        public String getTransactionId(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            if (value == null) {
                return null;
            }
            Struct sourceInfo = value.getStruct("source");
            if (source == null) {
                return null;
            }
            return sourceInfo.getString("txId");
        }
    }
}

