package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.ExecutorService;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/flink/source/StreamingMonitorFunction.class */
public class StreamingMonitorFunction extends RichSourceFunction<FlinkInputSplit> implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingMonitorFunction.class);
    private static final long INIT_LAST_SNAPSHOT_ID = -1;
    private final TableLoader tableLoader;
    private final ScanContext scanContext;
    private volatile boolean isRunning = true;
    private volatile long lastSnapshotId = -1;
    private transient SourceFunction.SourceContext<FlinkInputSplit> sourceContext;
    private transient Table table;
    private transient ListState<Long> lastSnapshotIdState;
    private transient ExecutorService workerPool;

    public StreamingMonitorFunction(TableLoader tableLoader, ScanContext scanContext) {
        Preconditions.checkArgument(scanContext.snapshotId() == null, "Cannot set snapshot-id option for streaming reader");
        Preconditions.checkArgument(scanContext.asOfTimestamp() == null, "Cannot set as-of-timestamp option for streaming reader");
        Preconditions.checkArgument(scanContext.endSnapshotId() == null, "Cannot set end-snapshot-id option for streaming reader");
        this.tableLoader = tableLoader;
        this.scanContext = scanContext;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        StreamingRuntimeContext runtimeContext = getRuntimeContext();
        ValidationException.check(runtimeContext instanceof StreamingRuntimeContext, "context should be instance of StreamingRuntimeContext", new Object[0]);
        this.workerPool = ThreadPools.newWorkerPool("iceberg-worker-pool-" + runtimeContext.getOperatorUniqueID(), this.scanContext.planParallelism().intValue());
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.tableLoader.open();
        this.table = this.tableLoader.loadTable();
        this.lastSnapshotIdState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("snapshot-id-state", LongSerializer.INSTANCE));
        if (functionInitializationContext.isRestored()) {
            LOG.info("Restoring state for the {}.", getClass().getSimpleName());
            this.lastSnapshotId = ((Long) ((Iterable) this.lastSnapshotIdState.get()).iterator().next()).longValue();
        } else if (this.scanContext.startSnapshotId() != null) {
            Preconditions.checkNotNull(this.table.currentSnapshot(), "Don't have any available snapshot in table.");
            Preconditions.checkState(SnapshotUtil.isAncestorOf(this.table, this.table.currentSnapshot().snapshotId(), this.scanContext.startSnapshotId().longValue()), "The option start-snapshot-id %s is not an ancestor of the current snapshot.", this.scanContext.startSnapshotId());
            this.lastSnapshotId = this.scanContext.startSnapshotId().longValue();
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.lastSnapshotIdState.clear();
        this.lastSnapshotIdState.add(Long.valueOf(this.lastSnapshotId));
    }

    public void run(SourceFunction.SourceContext<FlinkInputSplit> sourceContext) throws Exception {
        this.sourceContext = sourceContext;
        while (this.isRunning) {
            monitorAndForwardSplits();
            Thread.sleep(this.scanContext.monitorInterval().toMillis());
        }
    }

    private void monitorAndForwardSplits() {
        this.table.refresh();
        Snapshot currentSnapshot = this.table.currentSnapshot();
        if (currentSnapshot == null || currentSnapshot.snapshotId() == this.lastSnapshotId) {
            return;
        }
        long snapshotId = currentSnapshot.snapshotId();
        FlinkInputSplit[] planInputSplits = FlinkSplitPlanner.planInputSplits(this.table, this.lastSnapshotId == -1 ? this.scanContext.copyWithSnapshotId(snapshotId) : this.scanContext.copyWithAppendsBetween(Long.valueOf(this.lastSnapshotId), snapshotId), this.workerPool);
        synchronized (this.sourceContext.getCheckpointLock()) {
            for (FlinkInputSplit flinkInputSplit : planInputSplits) {
                this.sourceContext.collect(flinkInputSplit);
            }
            this.lastSnapshotId = snapshotId;
        }
    }

    public void cancel() {
        if (this.sourceContext != null) {
            synchronized (this.sourceContext.getCheckpointLock()) {
                this.isRunning = false;
            }
        } else {
            this.isRunning = false;
        }
        if (this.tableLoader != null) {
            try {
                this.tableLoader.close();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    }

    public void close() {
        cancel();
        if (this.workerPool != null) {
            this.workerPool.shutdown();
        }
    }
}
