package org.apache.beam.sdk.io.iceberg;

import java.util.List;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Redistribute;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.joda.time.Duration;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/iceberg/IncrementalScanSource.class */
public class IncrementalScanSource extends PTransform<PBegin, PCollection<Row>> {
    private static final Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(60);
    private final IcebergScanConfig scanConfig;

    /* JADX INFO: Access modifiers changed from: package-private */
    public IncrementalScanSource(IcebergScanConfig icebergScanConfig) {
        this.scanConfig = icebergScanConfig;
    }

    public PCollection<Row> expand(PBegin pBegin) {
        Table loadTable = this.scanConfig.getCatalogConfig().catalog().loadTable(TableIdentifier.parse(this.scanConfig.getTableIdentifier()));
        return (((Boolean) MoreObjects.firstNonNull(this.scanConfig.getStreaming(), false)).booleanValue() ? unboundedSnapshots(pBegin) : boundedSnapshots(pBegin, loadTable)).setCoder(KvCoder.of(StringUtf8Coder.of(), ListCoder.of(SnapshotInfo.getCoder()))).apply(Redistribute.byKey()).apply("Create Read Tasks", ParDo.of(new CreateReadTasksDoFn(this.scanConfig))).setCoder(KvCoder.of(ReadTaskDescriptor.getCoder(), ReadTask.getCoder())).apply(Redistribute.arbitrarily()).apply("Read Rows From Tasks", ParDo.of(new ReadFromTasks(this.scanConfig))).setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(loadTable.schema()));
    }

    private PCollection<KV<String, List<SnapshotInfo>>> unboundedSnapshots(PBegin pBegin) {
        return pBegin.apply("Watch for Snapshots", new WatchForSnapshots(this.scanConfig, (Duration) MoreObjects.firstNonNull(this.scanConfig.getPollInterval(), DEFAULT_POLL_INTERVAL)));
    }

    private PCollection<KV<String, List<SnapshotInfo>>> boundedSnapshots(PBegin pBegin, Table table) {
        Preconditions.checkStateNotNull(Long.valueOf(table.currentSnapshot().snapshotId()), "Table %s does not have any snapshots to read from.", this.scanConfig.getTableIdentifier());
        return pBegin.apply("Create Snapshot Range", Create.of(KV.of(this.scanConfig.getTableIdentifier(), ReadUtils.snapshotsBetween(table, this.scanConfig.getTableIdentifier(), ReadUtils.getFromSnapshotExclusive(table, this.scanConfig), ((Long) MoreObjects.firstNonNull(ReadUtils.getToSnapshot(table, this.scanConfig), Long.valueOf(table.currentSnapshot().snapshotId()))).longValue())), new KV[0]));
    }
}
