/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.oracle.source.reader.fetch;

import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.EmbeddedInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.RemoteInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.memory.MemoryLogMinerEventProcessor;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import java.sql.SQLException;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import org.apache.flink.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import org.apache.flink.cdc.connectors.oracle.source.reader.fetch.StoppableChangeEventSourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class EventProcessorFactory {
    private static final Logger LOG = LoggerFactory.getLogger(EventProcessorFactory.class);

    private EventProcessorFactory() {
    }

    public static LogMinerEventProcessor createProcessor(ChangeEventSource.ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, OracleConnection jdbcConnection, JdbcSourceEventDispatcher<OraclePartition> dispatcher, OraclePartition partition, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, OracleStreamingChangeEventSourceMetrics metrics, ErrorHandler errorHandler, StreamSplit redoLogSplit) {
        OracleConnectorConfig.LogMiningBufferType bufferType = connectorConfig.getLogMiningBufferType();
        if (bufferType.equals((Object)OracleConnectorConfig.LogMiningBufferType.MEMORY)) {
            return new CDCMemoryLogMinerEventProcessor(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics, errorHandler, redoLogSplit);
        }
        if (bufferType.equals((Object)OracleConnectorConfig.LogMiningBufferType.INFINISPAN_EMBEDDED)) {
            return new CDCEmbeddedInfinispanLogMinerEventProcessor(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics, errorHandler, redoLogSplit);
        }
        if (bufferType.equals((Object)OracleConnectorConfig.LogMiningBufferType.INFINISPAN_REMOTE)) {
            return new CDCRemoteInfinispanLogMinerEventProcessor(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics, errorHandler, redoLogSplit);
        }
        throw new IllegalArgumentException("not support this type of bufferType: " + bufferType);
    }

    public static boolean reachEndingOffset(OraclePartition partition, LogMinerEventRow row, StreamSplit redoLogSplit, ErrorHandler errorHandler, JdbcSourceEventDispatcher dispatcher, ChangeEventSource.ChangeEventSourceContext context) {
        RedoLogOffset currentRedoLogOffset;
        if (EventProcessorFactory.isBoundedRead(redoLogSplit) && (currentRedoLogOffset = new RedoLogOffset(row.getScn().longValue())).isAtOrAfter(redoLogSplit.getEndingOffset())) {
            try {
                dispatcher.dispatchWatermarkEvent(partition.getSourcePartition(), (SourceSplitBase)redoLogSplit, (Offset)currentRedoLogOffset, WatermarkKind.END);
            }
            catch (InterruptedException e) {
                LOG.error("Send signal event error.", (Throwable)e);
                errorHandler.setProducerThrowable((Throwable)new DebeziumException("Error processing redo log signal event", (Throwable)e));
            }
            ((StoppableChangeEventSourceContext)context).stopChangeEventSource();
            return true;
        }
        return false;
    }

    private static boolean isBoundedRead(StreamSplit redoLogSplit) {
        return !RedoLogOffset.NO_STOPPING_OFFSET.equals(redoLogSplit.getEndingOffset());
    }

    public static class CDCRemoteInfinispanLogMinerEventProcessor
    extends RemoteInfinispanLogMinerEventProcessor {
        private final StreamSplit redoLogSplit;
        private final ErrorHandler errorHandler;
        private ChangeEventSource.ChangeEventSourceContext context;
        private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;

        public CDCRemoteInfinispanLogMinerEventProcessor(ChangeEventSource.ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, OracleConnection jdbcConnection, JdbcSourceEventDispatcher<OraclePartition> dispatcher, OraclePartition partition, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, OracleStreamingChangeEventSourceMetrics metrics, ErrorHandler errorHandler, StreamSplit redoLogSplit) {
            super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics);
            this.redoLogSplit = redoLogSplit;
            this.errorHandler = errorHandler;
            this.context = context;
            this.dispatcher = dispatcher;
        }

        protected void processRow(OraclePartition partition, LogMinerEventRow row) throws SQLException, InterruptedException {
            if (EventProcessorFactory.reachEndingOffset(partition, row, this.redoLogSplit, this.errorHandler, this.dispatcher, this.context)) {
                return;
            }
            super.processRow(partition, row);
        }
    }

    public static class CDCEmbeddedInfinispanLogMinerEventProcessor
    extends EmbeddedInfinispanLogMinerEventProcessor {
        private final StreamSplit redoLogSplit;
        private final ErrorHandler errorHandler;
        private ChangeEventSource.ChangeEventSourceContext context;
        private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;

        public CDCEmbeddedInfinispanLogMinerEventProcessor(ChangeEventSource.ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, OracleConnection jdbcConnection, JdbcSourceEventDispatcher<OraclePartition> dispatcher, OraclePartition partition, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, OracleStreamingChangeEventSourceMetrics metrics, ErrorHandler errorHandler, StreamSplit redoLogSplit) {
            super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics);
            this.redoLogSplit = redoLogSplit;
            this.errorHandler = errorHandler;
            this.context = context;
            this.dispatcher = dispatcher;
        }

        protected void processRow(OraclePartition partition, LogMinerEventRow row) throws SQLException, InterruptedException {
            if (EventProcessorFactory.reachEndingOffset(partition, row, this.redoLogSplit, this.errorHandler, this.dispatcher, this.context)) {
                return;
            }
            super.processRow(partition, row);
        }
    }

    public static class CDCMemoryLogMinerEventProcessor
    extends MemoryLogMinerEventProcessor {
        private final StreamSplit redoLogSplit;
        private final ErrorHandler errorHandler;
        private ChangeEventSource.ChangeEventSourceContext context;
        private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;

        public CDCMemoryLogMinerEventProcessor(ChangeEventSource.ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, OracleConnection jdbcConnection, JdbcSourceEventDispatcher<OraclePartition> dispatcher, OraclePartition partition, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, OracleStreamingChangeEventSourceMetrics metrics, ErrorHandler errorHandler, StreamSplit redoLogSplit) {
            super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics);
            this.redoLogSplit = redoLogSplit;
            this.errorHandler = errorHandler;
            this.context = context;
            this.dispatcher = dispatcher;
        }

        protected void processRow(OraclePartition partition, LogMinerEventRow row) throws SQLException, InterruptedException {
            if (EventProcessorFactory.reachEndingOffset(partition, row, this.redoLogSplit, this.errorHandler, this.dispatcher, this.context)) {
                return;
            }
            super.processRow(partition, row);
        }
    }
}

