package org.apache.iceberg.flink.source;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/* loaded from: input_file:org/apache/iceberg/flink/source/BoundedTestSource.class */
public final class BoundedTestSource<T> implements SourceFunction<T>, CheckpointListener {
    private final List<List<T>> elementsPerCheckpoint;
    private final boolean checkpointEnabled;
    private volatile boolean running;
    private final AtomicInteger numCheckpointsComplete;

    public BoundedTestSource(List<List<T>> list, boolean z) {
        this.running = true;
        this.numCheckpointsComplete = new AtomicInteger(0);
        this.elementsPerCheckpoint = list;
        this.checkpointEnabled = z;
    }

    public BoundedTestSource(List<List<T>> list) {
        this(list, true);
    }

    public BoundedTestSource(T... tArr) {
        this(Collections.singletonList(Arrays.asList(tArr)));
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        int i;
        if (!this.checkpointEnabled) {
            Preconditions.checkArgument(this.elementsPerCheckpoint.size() <= 1, "There should be at most one list in the elementsPerCheckpoint when checkpoint is disabled.");
            Stream<R> flatMap = this.elementsPerCheckpoint.stream().flatMap((v0) -> {
                return v0.stream();
            });
            Objects.requireNonNull(sourceContext);
            flatMap.forEach(sourceContext::collect);
            return;
        }
        for (List<T> list : this.elementsPerCheckpoint) {
            synchronized (sourceContext.getCheckpointLock()) {
                i = this.numCheckpointsComplete.get() + 2;
                Iterator<T> it = list.iterator();
                while (it.hasNext()) {
                    sourceContext.collect(it.next());
                }
            }
            synchronized (sourceContext.getCheckpointLock()) {
                while (this.running && this.numCheckpointsComplete.get() < i) {
                    sourceContext.getCheckpointLock().wait(1L);
                }
            }
        }
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.numCheckpointsComplete.incrementAndGet();
    }

    public void cancel() {
        this.running = false;
    }
}
