package org.apache.paimon.flink.source;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.table.source.TableRead;

/* loaded from: input_file:org/apache/paimon/flink/source/FileStoreSourceReader.class */
public class FileStoreSourceReader extends SingleThreadMultiplexSourceReaderBase<BulkFormat.RecordIterator<RowData>, RowData, FileStoreSourceSplit, FileStoreSourceSplitState> {
    private final IOManager ioManager;
    private long lastConsumeSnapshotId;

    public FileStoreSourceReader(SourceReaderContext sourceReaderContext, TableRead tableRead, FileStoreSourceReaderMetrics fileStoreSourceReaderMetrics, IOManager iOManager, @Nullable Long l) {
        super(() -> {
            return new FileStoreSourceSplitReader(tableRead, RecordLimiter.create(l), fileStoreSourceReaderMetrics);
        }, (recordIterator, sourceOutput, fileStoreSourceSplitState) -> {
            FlinkRecordsWithSplitIds.emitRecord(recordIterator, sourceOutput, fileStoreSourceSplitState, fileStoreSourceReaderMetrics);
        }, sourceReaderContext.getConfiguration(), sourceReaderContext);
        this.lastConsumeSnapshotId = Long.MIN_VALUE;
        this.ioManager = iOManager;
    }

    public FileStoreSourceReader(SourceReaderContext sourceReaderContext, TableRead tableRead, FileStoreSourceReaderMetrics fileStoreSourceReaderMetrics, IOManager iOManager, @Nullable Long l, FutureCompletingBlockingQueue<RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>> futureCompletingBlockingQueue) {
        super(futureCompletingBlockingQueue, () -> {
            return new FileStoreSourceSplitReader(tableRead, RecordLimiter.create(l), fileStoreSourceReaderMetrics);
        }, (recordIterator, sourceOutput, fileStoreSourceSplitState) -> {
            FlinkRecordsWithSplitIds.emitRecord(recordIterator, sourceOutput, fileStoreSourceSplitState, fileStoreSourceReaderMetrics);
        }, sourceReaderContext.getConfiguration(), sourceReaderContext);
        this.lastConsumeSnapshotId = Long.MIN_VALUE;
        this.ioManager = iOManager;
    }

    public void start() {
        if (getNumberOfCurrentlyAssignedSplits() == 0) {
            this.context.sendSplitRequest();
        }
    }

    protected void onSplitFinished(Map<String, FileStoreSourceSplitState> map) {
        if (getNumberOfCurrentlyAssignedSplits() == 0) {
            this.context.sendSplitRequest();
        }
        long orElse = map.values().stream().map(fileStoreSourceSplitState -> {
            return TableScanUtils.getSnapshotId(fileStoreSourceSplitState.toSourceSplit());
        }).filter((v0) -> {
            return v0.isPresent();
        }).mapToLong((v0) -> {
            return v0.get();
        }).max().orElse(Long.MIN_VALUE);
        if (this.lastConsumeSnapshotId < orElse) {
            this.lastConsumeSnapshotId = orElse;
            this.context.sendSourceEventToCoordinator(new ReaderConsumeProgressEvent(this.lastConsumeSnapshotId));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FileStoreSourceSplitState initializedState(FileStoreSourceSplit fileStoreSourceSplit) {
        return new FileStoreSourceSplitState(fileStoreSourceSplit);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FileStoreSourceSplit toSplitType(String str, FileStoreSourceSplitState fileStoreSourceSplitState) {
        return fileStoreSourceSplitState.toSourceSplit();
    }

    public void close() throws Exception {
        super.close();
        this.ioManager.close();
    }
}
