package org.apache.paimon.flink.source.operator;

import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.utils.CloseableIterator;

/* loaded from: input_file:org/apache/paimon/flink/source/operator/ReadOperator.class */
public class ReadOperator extends AbstractStreamOperator<RowData> implements OneInputStreamOperator<Split, RowData> {
    private static final long serialVersionUID = 1;
    private final ReadBuilder readBuilder;
    private transient TableRead read;
    private transient StreamRecord<RowData> reuseRecord;
    private transient FlinkRowData reuseRow;
    private transient IOManager ioManager;
    private transient FileStoreSourceReaderMetrics sourceReaderMetrics;
    private transient Counter numRecordsIn;

    public ReadOperator(ReadBuilder readBuilder) {
        this.readBuilder = readBuilder;
    }

    public void open() throws Exception {
        super.open();
        this.sourceReaderMetrics = new FileStoreSourceReaderMetrics(getMetricGroup());
        getMetricGroup().gauge("currentEmitEventTimeLag", () -> {
            long latestFileCreationTime = this.sourceReaderMetrics.getLatestFileCreationTime();
            if (latestFileCreationTime == -1) {
                return -1L;
            }
            return Long.valueOf(System.currentTimeMillis() - latestFileCreationTime);
        });
        this.numRecordsIn = InternalSourceReaderMetricGroup.wrap(getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
        this.ioManager = IOManager.create(getContainingTask().getEnvironment().getIOManager().getSpillingDirectoriesPaths());
        this.read = this.readBuilder.newRead().withIOManager(this.ioManager);
        this.reuseRow = new FlinkRowData(null);
        this.reuseRecord = new StreamRecord<>(this.reuseRow);
    }

    public void processElement(StreamRecord<Split> streamRecord) throws Exception {
        Split split = (Split) streamRecord.getValue();
        this.sourceReaderMetrics.recordSnapshotUpdate(((DataSplit) split).latestFileCreationEpochMillis().orElse(-1L));
        boolean z = true;
        CloseableIterator<InternalRow> closeableIterator = this.read.createReader(split).toCloseableIterator();
        Throwable th = null;
        while (closeableIterator.hasNext()) {
            try {
                try {
                    if (z) {
                        z = false;
                    } else {
                        this.numRecordsIn.inc();
                    }
                    this.reuseRow.replace(closeableIterator.next());
                    this.output.collect(this.reuseRecord);
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (closeableIterator != null) {
                    if (th != null) {
                        try {
                            closeableIterator.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        closeableIterator.close();
                    }
                }
                throw th3;
            }
        }
        if (closeableIterator != null) {
            if (0 == 0) {
                closeableIterator.close();
                return;
            }
            try {
                closeableIterator.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    public void close() throws Exception {
        super.close();
        if (this.ioManager != null) {
            this.ioManager.close();
        }
    }
}
