package org.apache.seatunnel.connectors.cdc.base.source.reader;

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.common.metrics.Counter;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.event.MessageDelayedEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
import org.apache.seatunnel.connectors.cdc.base.utils.MessageDelayedEventLimiter;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.class */
public class IncrementalSourceRecordEmitter<T> implements RecordEmitter<SourceRecords, T, SourceSplitStateBase> {
    private static final Logger log = LoggerFactory.getLogger(IncrementalSourceRecordEmitter.class);
    private static final String CDC_RECORD_FETCH_DELAY = "CDCRecordFetchDelay";
    private static final String CDC_RECORD_EMIT_DELAY = "CDCRecordEmitDelay";
    protected final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
    protected final OffsetFactory offsetFactory;
    protected final SourceReader.Context context;
    protected final Counter recordFetchDelay;
    protected final Counter recordEmitDelay;
    protected final EventListener eventListener;
    protected final MessageDelayedEventLimiter delayedEventLimiter = new MessageDelayedEventLimiter(Duration.ofSeconds(1), 0.5d);
    protected final IncrementalSourceRecordEmitter<T>.OutputCollector<T> outputCollector = new OutputCollector<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter$OutputCollector.class */
    public class OutputCollector<T> implements Collector<T> {
        private Collector<T> output;

        private OutputCollector() {
        }

        public void collect(T t) {
            this.output.collect(t);
        }

        public void collect(SchemaChangeEvent schemaChangeEvent) {
            IncrementalSourceRecordEmitter.this.eventListener.onEvent(schemaChangeEvent);
            this.output.collect(schemaChangeEvent);
        }

        public void markSchemaChangeBeforeCheckpoint() {
            this.output.markSchemaChangeBeforeCheckpoint();
        }

        public void markSchemaChangeAfterCheckpoint() {
            this.output.markSchemaChangeAfterCheckpoint();
        }

        public Object getCheckpointLock() {
            return this.output.getCheckpointLock();
        }
    }

    public IncrementalSourceRecordEmitter(DebeziumDeserializationSchema<T> debeziumDeserializationSchema, OffsetFactory offsetFactory, SourceReader.Context context) {
        this.debeziumDeserializationSchema = debeziumDeserializationSchema;
        this.offsetFactory = offsetFactory;
        this.context = context;
        this.recordFetchDelay = context.getMetricsContext().counter(CDC_RECORD_FETCH_DELAY);
        this.recordEmitDelay = context.getMetricsContext().counter(CDC_RECORD_EMIT_DELAY);
        this.eventListener = context.getEventListener();
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter
    public void emitRecord(SourceRecords sourceRecords, Collector<T> collector, SourceSplitStateBase sourceSplitStateBase) throws Exception {
        Iterator<SourceRecord> it = sourceRecords.iterator();
        while (it.hasNext()) {
            SourceRecord next = it.next();
            reportMetrics(next);
            processElement(next, collector, sourceSplitStateBase);
            markEnterPureIncrementPhase(next, sourceSplitStateBase);
        }
    }

    protected void reportMetrics(SourceRecord sourceRecord) {
        long currentTimeMillis = System.currentTimeMillis();
        Long messageTimestamp = SourceRecordUtils.getMessageTimestamp(sourceRecord);
        if (messageTimestamp == null || messageTimestamp.longValue() <= 0) {
            return;
        }
        Long fetchTimestamp = SourceRecordUtils.getFetchTimestamp(sourceRecord);
        if (fetchTimestamp != null) {
            long longValue = fetchTimestamp.longValue() - messageTimestamp.longValue();
            this.recordFetchDelay.set(longValue > 0 ? longValue : 0L);
        }
        long longValue2 = currentTimeMillis - messageTimestamp.longValue();
        this.recordEmitDelay.set(longValue2 > 0 ? longValue2 : 0L);
        if (this.delayedEventLimiter.acquire(messageTimestamp.longValue())) {
            this.eventListener.onEvent(new MessageDelayedEvent(longValue2, sourceRecord.toString()));
        }
    }

    protected void processElement(SourceRecord sourceRecord, Collector<T> collector, SourceSplitStateBase sourceSplitStateBase) throws Exception {
        if (!WatermarkEvent.isWatermarkEvent(sourceRecord)) {
            if (SourceRecordUtils.isSchemaChangeEvent(sourceRecord) && sourceSplitStateBase.isIncrementalSplitState()) {
                emitElement(sourceRecord, collector);
                return;
            }
            if (!SourceRecordUtils.isDataChangeRecord(sourceRecord)) {
                emitElement(sourceRecord, collector);
                return;
            }
            if (sourceSplitStateBase.isIncrementalSplitState()) {
                sourceSplitStateBase.asIncrementalSplitState().setStartupOffset(getOffsetPosition(sourceRecord));
            }
            emitElement(sourceRecord, collector);
            return;
        }
        Offset watermark = getWatermark(sourceRecord);
        if (WatermarkEvent.isLowWatermarkEvent(sourceRecord) && sourceSplitStateBase.isSnapshotSplitState()) {
            sourceSplitStateBase.asSnapshotSplitState().setLowWatermark(watermark);
            return;
        }
        if (WatermarkEvent.isHighWatermarkEvent(sourceRecord) && sourceSplitStateBase.isSnapshotSplitState()) {
            sourceSplitStateBase.asSnapshotSplitState().setHighWatermark(watermark);
        } else if ((WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(sourceRecord) || WatermarkEvent.isSchemaChangeAfterWatermarkEvent(sourceRecord)) && sourceSplitStateBase.isIncrementalSplitState()) {
            emitElement(sourceRecord, collector);
        }
    }

    private void markEnterPureIncrementPhase(SourceRecord sourceRecord, SourceSplitStateBase sourceSplitStateBase) {
        if (sourceSplitStateBase.isIncrementalSplitState()) {
            IncrementalSplitState asIncrementalSplitState = sourceSplitStateBase.asIncrementalSplitState();
            if (asIncrementalSplitState.isEnterPureIncrementPhase()) {
                return;
            }
            Offset offsetPosition = getOffsetPosition(sourceRecord);
            if (asIncrementalSplitState.markEnterPureIncrementPhaseIfNeed(offsetPosition)) {
                log.info("The current record position {} is after the maxSnapshotSplitsHighWatermark {}, mark enter pure increment phase.", offsetPosition, asIncrementalSplitState.getMaxSnapshotSplitsHighWatermark());
                log.info("Clean the IncrementalSplit#completedSnapshotSplitInfos to empty.");
                this.context.sendSourceEventToEnumerator(new CompletedSnapshotPhaseEvent(asIncrementalSplitState.getTableIds()));
            }
        }
    }

    private Offset getWatermark(SourceRecord sourceRecord) {
        return getOffsetPosition(sourceRecord.sourceOffset());
    }

    public Offset getOffsetPosition(SourceRecord sourceRecord) {
        return getOffsetPosition(sourceRecord.sourceOffset());
    }

    public Offset getOffsetPosition(Map<String, ?> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, ?> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
        }
        return this.offsetFactory.specific(hashMap);
    }

    protected void emitElement(SourceRecord sourceRecord, Collector<T> collector) throws Exception {
        ((OutputCollector) this.outputCollector).output = collector;
        this.debeziumDeserializationSchema.deserialize(sourceRecord, this.outputCollector);
    }
}
