package org.apache.paimon.flink.source.align;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.source.ContinuousFileStoreSource;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamTableScan;

/* loaded from: input_file:org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.class */
public class AlignedContinuousFileStoreSource extends ContinuousFileStoreSource {
    public AlignedContinuousFileStoreSource(ReadBuilder readBuilder, Map<String, String> map, @Nullable Long l, BucketMode bucketMode) {
        super(readBuilder, map, l, bucketMode);
    }

    @Override // org.apache.paimon.flink.source.FlinkSource
    public SourceReader<RowData, FileStoreSourceSplit> createReader(SourceReaderContext sourceReaderContext) {
        return new AlignedSourceReader(sourceReaderContext, this.readBuilder.newRead(), IOManager.create(IOManagerImpl.splitPaths((String) sourceReaderContext.getConfiguration().get(CoreOptions.TMP_DIRS))), this.limit, new FutureCompletingBlockingQueue(sourceReaderContext.getConfiguration().getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)), new FileStoreSourceReaderMetrics(sourceReaderContext.metricGroup()));
    }

    @Override // org.apache.paimon.flink.source.ContinuousFileStoreSource
    protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEnumerator(SplitEnumeratorContext<FileStoreSourceSplit> splitEnumeratorContext, Collection<FileStoreSourceSplit> collection, @Nullable Long l, StreamTableScan streamTableScan) {
        Options fromMap = Options.fromMap(this.options);
        return new AlignedContinuousFileSplitEnumerator(splitEnumeratorContext, collection, l, ((Duration) fromMap.get(org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL)).toMillis(), streamTableScan, this.bucketMode, ((Duration) fromMap.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT)).toMillis(), ((Integer) fromMap.get(org.apache.paimon.CoreOptions.SCAN_MAX_SPLITS_PER_TASK)).intValue());
    }
}
