package org.apache.beam.sdk.io.iceberg;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/iceberg/CreateReadTasksDoFn.class */
class CreateReadTasksDoFn extends DoFn<KV<String, List<SnapshotInfo>>, KV<ReadTaskDescriptor, ReadTask>> {
    private static final Logger LOG = LoggerFactory.getLogger(CreateReadTasksDoFn.class);
    private static final Counter totalScanTasks = Metrics.counter(CreateReadTasksDoFn.class, "totalScanTasks");
    private final IcebergScanConfig scanConfig;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CreateReadTasksDoFn(IcebergScanConfig icebergScanConfig) {
        this.scanConfig = icebergScanConfig;
    }

    @DoFn.Setup
    public void setup() {
        TableCache.setup(this.scanConfig);
    }

    @DoFn.ProcessElement
    public void process(@DoFn.Element KV<String, List<SnapshotInfo>> kv, DoFn.OutputReceiver<KV<ReadTaskDescriptor, ReadTask>> outputReceiver) throws IOException, ExecutionException {
        Table refreshed = TableCache.getRefreshed((String) kv.getKey());
        for (SnapshotInfo snapshotInfo : (List) kv.getValue()) {
            Long parentId = snapshotInfo.getParentId();
            long snapshotId = snapshotInfo.getSnapshotId();
            if (!"append".equals(snapshotInfo.getOperation())) {
                LOG.info("Skipping non-append snapshot of operation '{}'. Sequence number: {}, id: {}", new Object[]{snapshotInfo.getOperation(), Long.valueOf(snapshotInfo.getSequenceNumber()), Long.valueOf(snapshotInfo.getSnapshotId())});
            }
            LOG.info("Planning to scan snapshot {}", Long.valueOf(snapshotId));
            IncrementalAppendScan incrementalAppendScan = (IncrementalAppendScan) refreshed.newIncrementalAppendScan().toSnapshot(snapshotId);
            if (parentId != null) {
                incrementalAppendScan = (IncrementalAppendScan) incrementalAppendScan.fromSnapshotExclusive(parentId.longValue());
            }
            createAndOutputReadTasks(incrementalAppendScan, snapshotInfo, outputReceiver);
        }
    }

    private void createAndOutputReadTasks(IncrementalAppendScan incrementalAppendScan, SnapshotInfo snapshotInfo, DoFn.OutputReceiver<KV<ReadTaskDescriptor, ReadTask>> outputReceiver) throws IOException {
        int i = 0;
        CloseableIterable planTasks = incrementalAppendScan.planTasks();
        try {
            CloseableIterator it = planTasks.iterator();
            while (it.hasNext()) {
                CombinedScanTask combinedScanTask = (CombinedScanTask) it.next();
                outputReceiver.outputWithTimestamp(KV.of(ReadTaskDescriptor.builder().setTableIdentifierString((String) Preconditions.checkStateNotNull(snapshotInfo.getTableIdentifierString())).build(), ReadTask.builder().setCombinedScanTask(combinedScanTask).build()), Instant.ofEpochMilli(snapshotInfo.getTimestampMillis()));
                i += combinedScanTask.tasks().size();
            }
            if (planTasks != null) {
                planTasks.close();
            }
            totalScanTasks.inc(i);
            LOG.info("Snapshot {} produced {} read tasks.", Long.valueOf(snapshotInfo.getSnapshotId()), Integer.valueOf(i));
        } catch (Throwable th) {
            if (planTasks != null) {
                try {
                    planTasks.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
