package org.apache.seatunnel.connectors.cdc.base.source.reader;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter.class */
public class IncrementalSourceRecordEmitter<T> implements RecordEmitter<SourceRecords, T, SourceSplitStateBase> {
    private static final Logger log = LoggerFactory.getLogger(IncrementalSourceRecordEmitter.class);
    protected final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
    protected final OutputCollector<T> outputCollector = new OutputCollector<>();
    protected final OffsetFactory offsetFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceRecordEmitter$OutputCollector.class */
    public static class OutputCollector<T> implements Collector<T> {
        private Collector<T> output;

        private OutputCollector() {
        }

        public void collect(T t) {
            this.output.collect(t);
        }

        public Object getCheckpointLock() {
            return null;
        }
    }

    public IncrementalSourceRecordEmitter(DebeziumDeserializationSchema<T> debeziumDeserializationSchema, OffsetFactory offsetFactory) {
        this.debeziumDeserializationSchema = debeziumDeserializationSchema;
        this.offsetFactory = offsetFactory;
    }

    public void emitRecord(SourceRecords sourceRecords, Collector<T> collector, SourceSplitStateBase sourceSplitStateBase) throws Exception {
        Iterator<SourceRecord> it = sourceRecords.iterator();
        while (it.hasNext()) {
            processElement(it.next(), collector, sourceSplitStateBase);
        }
    }

    protected void processElement(SourceRecord sourceRecord, Collector<T> collector, SourceSplitStateBase sourceSplitStateBase) throws Exception {
        if (WatermarkEvent.isWatermarkEvent(sourceRecord)) {
            Offset watermark = getWatermark(sourceRecord);
            if (WatermarkEvent.isLowWatermarkEvent(sourceRecord) && sourceSplitStateBase.isSnapshotSplitState()) {
                sourceSplitStateBase.asSnapshotSplitState().setHighWatermark(watermark);
                return;
            }
            return;
        }
        if (SourceRecordUtils.isSchemaChangeEvent(sourceRecord) && sourceSplitStateBase.isIncrementalSplitState()) {
            return;
        }
        if (!SourceRecordUtils.isDataChangeRecord(sourceRecord)) {
            log.info("Meet unknown element {}, just skip.", sourceRecord);
            return;
        }
        if (sourceSplitStateBase.isIncrementalSplitState()) {
            sourceSplitStateBase.asIncrementalSplitState().setStartupOffset(getOffsetPosition(sourceRecord));
        }
        emitElement(sourceRecord, collector);
    }

    private Offset getWatermark(SourceRecord sourceRecord) {
        return getOffsetPosition(sourceRecord.sourceOffset());
    }

    public Offset getOffsetPosition(SourceRecord sourceRecord) {
        return getOffsetPosition(sourceRecord.sourceOffset());
    }

    public Offset getOffsetPosition(Map<String, ?> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, ?> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
        }
        return this.offsetFactory.specific(hashMap);
    }

    protected void emitElement(SourceRecord sourceRecord, Collector<T> collector) throws Exception {
        ((OutputCollector) this.outputCollector).output = collector;
        this.debeziumDeserializationSchema.deserialize(sourceRecord, this.outputCollector);
    }
}
