/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.oracle.source.reader.fetch;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.oracle.source.reader.fetch.EventProcessorFactory;
import org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext;
import org.apache.flink.cdc.connectors.oracle.source.reader.fetch.StoppableChangeEventSourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class OracleStreamFetchTask
implements FetchTask<SourceSplitBase> {
    private final StreamSplit split;
    private volatile boolean taskRunning = false;

    public OracleStreamFetchTask(StreamSplit split) {
        this.split = split;
    }

    public void execute(FetchTask.Context context) throws Exception {
        OracleSourceFetchTaskContext sourceFetchContext = (OracleSourceFetchTaskContext)context;
        this.taskRunning = true;
        RedoLogSplitReadTask redoLogSplitReadTask = new RedoLogSplitReadTask(sourceFetchContext.getDbzConnectorConfig(), sourceFetchContext.getConnection(), sourceFetchContext.getDispatcher(), sourceFetchContext.getErrorHandler(), sourceFetchContext.getDatabaseSchema(), sourceFetchContext.getSourceConfig().getOriginDbzConnectorConfig(), sourceFetchContext.getStreamingChangeEventSourceMetrics(), this.split);
        StoppableChangeEventSourceContext changeEventSourceContext = new StoppableChangeEventSourceContext();
        redoLogSplitReadTask.execute((ChangeEventSource.ChangeEventSourceContext)changeEventSourceContext, sourceFetchContext.getPartition(), sourceFetchContext.getOffsetContext());
    }

    public boolean isRunning() {
        return this.taskRunning;
    }

    public StreamSplit getSplit() {
        return this.split;
    }

    public void close() {
        this.taskRunning = false;
    }

    public static class RedoLogSplitReadTask
    extends LogMinerStreamingChangeEventSource {
        private static final Logger LOG = LoggerFactory.getLogger(RedoLogSplitReadTask.class);
        private final StreamSplit redoLogSplit;
        private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;
        private final ErrorHandler errorHandler;
        private ChangeEventSource.ChangeEventSourceContext context;
        private final OracleConnectorConfig connectorConfig;
        private final OracleConnection connection;
        private final OracleDatabaseSchema schema;
        private final OracleStreamingChangeEventSourceMetrics metrics;

        public RedoLogSplitReadTask(OracleConnectorConfig connectorConfig, OracleConnection connection, JdbcSourceEventDispatcher<OraclePartition> dispatcher, ErrorHandler errorHandler, OracleDatabaseSchema schema, Configuration jdbcConfig, OracleStreamingChangeEventSourceMetrics metrics, StreamSplit redoLogSplit) {
            super(connectorConfig, connection, (EventDispatcher<OraclePartition, TableId>)dispatcher, errorHandler, Clock.SYSTEM, schema, jdbcConfig, metrics);
            this.redoLogSplit = redoLogSplit;
            this.dispatcher = dispatcher;
            this.errorHandler = errorHandler;
            this.connectorConfig = connectorConfig;
            this.connection = connection;
            this.metrics = metrics;
            this.schema = schema;
        }

        @Override
        public void execute(ChangeEventSource.ChangeEventSourceContext context, OraclePartition partition, OracleOffsetContext offsetContext) {
            this.context = context;
            super.execute(context, partition, offsetContext);
        }

        @Override
        protected LogMinerEventProcessor createProcessor(ChangeEventSource.ChangeEventSourceContext context, OraclePartition partition, OracleOffsetContext offsetContext) {
            return EventProcessorFactory.createProcessor(context, this.connectorConfig, this.connection, this.dispatcher, partition, offsetContext, this.schema, this.metrics, this.errorHandler, this.redoLogSplit);
        }
    }
}

