package org.apache.iceberg.flink.source.reader;

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.class */
public class WatermarkExtractorRecordEmitter<T> implements SerializableRecordEmitter<T> {
    private static final Logger LOG = LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class);
    private final SplitWatermarkExtractor timeExtractor;
    private String lastSplitId = null;
    private long watermark;

    /* JADX INFO: Access modifiers changed from: package-private */
    public WatermarkExtractorRecordEmitter(SplitWatermarkExtractor splitWatermarkExtractor) {
        this.timeExtractor = splitWatermarkExtractor;
    }

    public void emitRecord(RecordAndPosition<T> recordAndPosition, SourceOutput<T> sourceOutput, IcebergSourceSplit icebergSourceSplit) {
        if (!icebergSourceSplit.splitId().equals(this.lastSplitId)) {
            long extractWatermark = this.timeExtractor.extractWatermark(icebergSourceSplit);
            if (extractWatermark < this.watermark) {
                LOG.info("Received a new split with lower watermark. Previous watermark = {}, current watermark = {}, previous split = {}, current split = {}", new Object[]{Long.valueOf(this.watermark), Long.valueOf(extractWatermark), this.lastSplitId, icebergSourceSplit.splitId()});
            } else {
                this.watermark = extractWatermark;
                sourceOutput.emitWatermark(new Watermark(this.watermark));
                LOG.debug("Watermark = {} emitted based on split = {}", Long.valueOf(this.watermark), this.lastSplitId);
            }
            this.lastSplitId = icebergSourceSplit.splitId();
        }
        sourceOutput.collect(recordAndPosition.record());
        icebergSourceSplit.updatePosition(recordAndPosition.fileOffset(), recordAndPosition.recordOffset());
    }
}
