package org.apache.beam.sdk.io.iceberg;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.Row;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.parquet.ParquetReader;

/* JADX INFO: Access modifiers changed from: package-private */
@DoFn.BoundedPerElement
/* loaded from: input_file:org/apache/beam/sdk/io/iceberg/ReadFromTasks.class */
public class ReadFromTasks extends DoFn<KV<ReadTaskDescriptor, ReadTask>, Row> {
    private final IcebergScanConfig scanConfig;
    private final Counter scanTasksCompleted = Metrics.counter(ReadFromTasks.class, "scanTasksCompleted");

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReadFromTasks(IcebergScanConfig icebergScanConfig) {
        this.scanConfig = icebergScanConfig;
    }

    @DoFn.Setup
    public void setup() {
        TableCache.setup(this.scanConfig);
    }

    @DoFn.ProcessElement
    public void process(@DoFn.Element KV<ReadTaskDescriptor, ReadTask> kv, RestrictionTracker<OffsetRange, Long> restrictionTracker, DoFn.OutputReceiver<Row> outputReceiver) throws IOException, ExecutionException, InterruptedException {
        ReadTask readTask = (ReadTask) kv.getValue();
        Table table = TableCache.get(this.scanConfig.getTableIdentifier());
        List<FileScanTask> fileScanTasks = readTask.getFileScanTasks();
        long from = ((OffsetRange) restrictionTracker.currentRestriction()).getFrom();
        while (true) {
            long j = from;
            if (j >= ((OffsetRange) restrictionTracker.currentRestriction()).getTo() || !restrictionTracker.tryClaim(Long.valueOf(j))) {
                return;
            }
            ParquetReader<Record> createReader = ReadUtils.createReader(fileScanTasks.get((int) j), table);
            try {
                CloseableIterator it = createReader.iterator();
                while (it.hasNext()) {
                    outputReceiver.output(IcebergUtils.icebergRecordToBeamRow(this.scanConfig.getSchema(), (Record) it.next()));
                }
                if (createReader != null) {
                    createReader.close();
                }
                this.scanTasksCompleted.inc();
                from = j + 1;
            } catch (Throwable th) {
                if (createReader != null) {
                    try {
                        createReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @DoFn.GetSize
    public double getSize(@DoFn.Element KV<ReadTaskDescriptor, ReadTask> kv, @DoFn.Restriction OffsetRange offsetRange) {
        return ((ReadTask) kv.getValue()).getSize(offsetRange.getFrom(), offsetRange.getTo());
    }

    @DoFn.GetInitialRestriction
    public OffsetRange getInitialRange(@DoFn.Element KV<ReadTaskDescriptor, ReadTask> kv) {
        return new OffsetRange(0L, ((ReadTask) kv.getValue()).getFileScanTaskJsons().size());
    }
}
