package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/FiniteTestSource.class */
public class FiniteTestSource<T> implements SourceFunction<T>, CheckpointedFunction, CheckpointListener {
    private static final long serialVersionUID = 1;
    private final List<T> elements;
    private final boolean emitOnce;
    private volatile boolean running = true;
    private transient int numCheckpointsComplete;
    private transient ListState<Integer> checkpointedState;
    private volatile int numTimesEmitted;

    public FiniteTestSource(List<T> list, boolean z) {
        this.elements = list;
        this.emitOnce = z;
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.checkpointedState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("emit-times", IntSerializer.INSTANCE));
        if (!functionInitializationContext.isRestored()) {
            this.numTimesEmitted = 0;
            return;
        }
        ArrayList arrayList = new ArrayList();
        Iterator<T> it = ((Iterable) this.checkpointedState.get()).iterator();
        while (it.hasNext()) {
            arrayList.add((Integer) it.next());
        }
        Preconditions.checkArgument(arrayList.size() == 1, getClass().getSimpleName() + " retrieved invalid state.");
        this.numTimesEmitted = ((Integer) arrayList.get(0)).intValue();
        Preconditions.checkArgument(this.numTimesEmitted <= 2, getClass().getSimpleName() + " retrieved invalid numTimesEmitted: " + this.numTimesEmitted);
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        switch (this.numTimesEmitted) {
            case 0:
                emitElementsAndWaitForCheckpoints(sourceContext, false);
                emitElementsAndWaitForCheckpoints(sourceContext, true);
                return;
            case 1:
                emitElementsAndWaitForCheckpoints(sourceContext, true);
                return;
            case ReadWriteTableTestUtil.DEFAULT_PARALLELISM /* 2 */:
                Object checkpointLock = sourceContext.getCheckpointLock();
                synchronized (checkpointLock) {
                    int i = this.numCheckpointsComplete + 2;
                    while (this.running && this.numCheckpointsComplete < i) {
                        checkpointLock.wait(serialVersionUID);
                    }
                }
                return;
            default:
                return;
        }
    }

    private void emitElementsAndWaitForCheckpoints(SourceFunction.SourceContext<T> sourceContext, boolean z) throws InterruptedException {
        int i;
        Object checkpointLock = sourceContext.getCheckpointLock();
        synchronized (checkpointLock) {
            i = this.numCheckpointsComplete + 2;
            if (!z || !this.emitOnce) {
                Iterator<T> it = this.elements.iterator();
                while (it.hasNext()) {
                    sourceContext.collect(it.next());
                }
            }
            this.numTimesEmitted++;
        }
        synchronized (checkpointLock) {
            while (this.running && this.numCheckpointsComplete < i) {
                checkpointLock.wait(serialVersionUID);
            }
        }
    }

    public void cancel() {
        this.running = false;
    }

    public void notifyCheckpointComplete(long j) {
        this.numCheckpointsComplete++;
    }

    public void notifyCheckpointAborted(long j) {
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        Preconditions.checkState(this.checkpointedState != null, "The " + getClass().getSimpleName() + " has not been properly initialized.");
        this.checkpointedState.clear();
        this.checkpointedState.add(Integer.valueOf(this.numTimesEmitted));
    }
}
