package org.apache.iceberg.flink.source.enumerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSplitPlanner;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.class */
public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousSplitPlannerImpl.class);
    private final Table table;
    private final ScanContext scanContext;
    private final boolean isSharedPool;
    private final ExecutorService workerPool;
    private final TableLoader tableLoader;

    public ContinuousSplitPlannerImpl(TableLoader tableLoader, ScanContext scanContext, String str) {
        this.tableLoader = tableLoader.m33clone();
        this.tableLoader.open();
        this.table = this.tableLoader.loadTable();
        this.scanContext = scanContext;
        this.isSharedPool = str == null;
        this.workerPool = this.isSharedPool ? ThreadPools.getWorkerPool() : ThreadPools.newWorkerPool("iceberg-plan-worker-pool-" + str, scanContext.planParallelism().intValue());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (!this.isSharedPool) {
            this.workerPool.shutdown();
        }
        this.tableLoader.close();
    }

    @Override // org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlanner
    public ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition icebergEnumeratorPosition) {
        this.table.refresh();
        return icebergEnumeratorPosition != null ? discoverIncrementalSplits(icebergEnumeratorPosition) : discoverInitialSplits();
    }

    private Snapshot toSnapshotInclusive(Long l, Snapshot snapshot, int i) {
        ArrayList newArrayList = Lists.newArrayList(SnapshotUtil.ancestorsBetween(this.table, snapshot.snapshotId(), l));
        return newArrayList.size() <= i ? snapshot : (Snapshot) newArrayList.get(newArrayList.size() - i);
    }

    private ContinuousEnumerationResult discoverIncrementalSplits(IcebergEnumeratorPosition icebergEnumeratorPosition) {
        Snapshot snapshot = this.scanContext.branch() != null ? this.table.snapshot(this.scanContext.branch()) : this.table.currentSnapshot();
        if (snapshot == null) {
            Preconditions.checkArgument(icebergEnumeratorPosition.snapshotId() == null, "Invalid last enumerated position for an empty table: not null");
            LOG.info("Skip incremental scan because table is empty");
            return new ContinuousEnumerationResult(Collections.emptyList(), icebergEnumeratorPosition, icebergEnumeratorPosition);
        }
        if (icebergEnumeratorPosition.snapshotId() != null && snapshot.snapshotId() == icebergEnumeratorPosition.snapshotId().longValue()) {
            LOG.info("Current table snapshot is already enumerated: {}", Long.valueOf(snapshot.snapshotId()));
            return new ContinuousEnumerationResult(Collections.emptyList(), icebergEnumeratorPosition, icebergEnumeratorPosition);
        }
        Snapshot snapshotInclusive = toSnapshotInclusive(icebergEnumeratorPosition != null ? icebergEnumeratorPosition.snapshotId() : null, snapshot, this.scanContext.maxPlanningSnapshotCount());
        IcebergEnumeratorPosition of = IcebergEnumeratorPosition.of(snapshotInclusive.snapshotId(), Long.valueOf(snapshotInclusive.timestampMillis()));
        List<IcebergSourceSplit> planIcebergSourceSplits = FlinkSplitPlanner.planIcebergSourceSplits(this.table, this.scanContext.copyWithAppendsBetween(icebergEnumeratorPosition.snapshotId(), snapshotInclusive.snapshotId()), this.workerPool);
        LOG.info("Discovered {} splits from incremental scan: from snapshot (exclusive) is {}, to snapshot (inclusive) is {}", new Object[]{Integer.valueOf(planIcebergSourceSplits.size()), icebergEnumeratorPosition, of});
        return new ContinuousEnumerationResult(planIcebergSourceSplits, icebergEnumeratorPosition, of);
    }

    private ContinuousEnumerationResult discoverInitialSplits() {
        List<IcebergSourceSplit> emptyList;
        IcebergEnumeratorPosition empty;
        Optional<Snapshot> startSnapshot = startSnapshot(this.table, this.scanContext);
        if (!startSnapshot.isPresent()) {
            return new ContinuousEnumerationResult(Collections.emptyList(), null, IcebergEnumeratorPosition.empty());
        }
        Snapshot snapshot = startSnapshot.get();
        LOG.info("Get starting snapshot id {} based on strategy {}", Long.valueOf(snapshot.snapshotId()), this.scanContext.streamingStartingStrategy());
        if (this.scanContext.streamingStartingStrategy() == StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) {
            emptyList = FlinkSplitPlanner.planIcebergSourceSplits(this.table, this.scanContext.copyWithSnapshotId(snapshot.snapshotId()), this.workerPool);
            LOG.info("Discovered {} splits from initial batch table scan with snapshot Id {}", Integer.valueOf(emptyList.size()), Long.valueOf(snapshot.snapshotId()));
            empty = IcebergEnumeratorPosition.of(snapshot.snapshotId(), Long.valueOf(snapshot.timestampMillis()));
        } else {
            emptyList = Collections.emptyList();
            Long parentId = snapshot.parentId();
            if (parentId != null) {
                Snapshot snapshot2 = this.table.snapshot(parentId.longValue());
                empty = IcebergEnumeratorPosition.of(parentId.longValue(), snapshot2 != null ? Long.valueOf(snapshot2.timestampMillis()) : null);
            } else {
                empty = IcebergEnumeratorPosition.empty();
            }
            LOG.info("Start incremental scan with start snapshot (inclusive): id = {}, timestamp = {}", Long.valueOf(snapshot.snapshotId()), Long.valueOf(snapshot.timestampMillis()));
        }
        return new ContinuousEnumerationResult(emptyList, null, empty);
    }

    @VisibleForTesting
    static Optional<Snapshot> startSnapshot(Table table, ScanContext scanContext) {
        switch (scanContext.streamingStartingStrategy()) {
            case TABLE_SCAN_THEN_INCREMENTAL:
            case INCREMENTAL_FROM_LATEST_SNAPSHOT:
                return Optional.ofNullable(table.currentSnapshot());
            case INCREMENTAL_FROM_EARLIEST_SNAPSHOT:
                return Optional.ofNullable(SnapshotUtil.oldestAncestor(table));
            case INCREMENTAL_FROM_SNAPSHOT_ID:
                Snapshot snapshot = table.snapshot(scanContext.startSnapshotId().longValue());
                Preconditions.checkArgument(snapshot != null, "Start snapshot id not found in history: " + scanContext.startSnapshotId());
                return Optional.of(snapshot);
            case INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP:
                Snapshot oldestAncestorAfter = SnapshotUtil.oldestAncestorAfter(table, scanContext.startSnapshotTimestamp().longValue());
                Preconditions.checkArgument(oldestAncestorAfter != null, "Cannot find a snapshot after: " + scanContext.startSnapshotTimestamp());
                return Optional.of(oldestAncestorAfter);
            default:
                throw new IllegalArgumentException("Unknown starting strategy: " + scanContext.streamingStartingStrategy());
        }
    }
}
