package org.apache.paimon.flink.sink.cdc;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.class */
public class TestCdcSourceFunction extends RichParallelSourceFunction<TestCdcEvent> implements CheckpointedFunction {
    private static final long serialVersionUID = 1;
    private final LinkedList<TestCdcEvent> events;
    private volatile boolean isRunning = true;
    private transient int numRecordsPerCheckpoint;
    private transient AtomicInteger recordsThisCheckpoint;
    private transient ListState<Integer> remainingEventsCount;

    public TestCdcSourceFunction(Collection<TestCdcEvent> collection) {
        this.events = new LinkedList<>(collection);
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.numRecordsPerCheckpoint = this.events.size() / ThreadLocalRandom.current().nextInt(10, 20);
        this.recordsThisCheckpoint = new AtomicInteger(0);
        this.remainingEventsCount = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("count", Integer.class));
        if (functionInitializationContext.isRestored()) {
            int i = 0;
            Iterator it = ((Iterable) this.remainingEventsCount.get()).iterator();
            while (it.hasNext()) {
                i += ((Integer) it.next()).intValue();
            }
            while (this.events.size() > i) {
                this.events.poll();
            }
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.recordsThisCheckpoint.set(0);
        this.remainingEventsCount.clear();
        this.remainingEventsCount.add(Integer.valueOf(this.events.size()));
    }

    public void run(SourceFunction.SourceContext<TestCdcEvent> sourceContext) throws Exception {
        while (this.isRunning && !this.events.isEmpty()) {
            if (this.recordsThisCheckpoint.get() >= this.numRecordsPerCheckpoint) {
                Thread.sleep(10L);
            } else {
                synchronized (sourceContext.getCheckpointLock()) {
                    TestCdcEvent poll = this.events.poll();
                    if (poll.records() != null) {
                        if (Math.abs(poll.hashCode()) % getRuntimeContext().getNumberOfParallelSubtasks() != getRuntimeContext().getIndexOfThisSubtask()) {
                        }
                    }
                    sourceContext.collect(poll);
                    this.recordsThisCheckpoint.incrementAndGet();
                }
            }
        }
    }

    public void cancel() {
        this.isRunning = false;
    }
}
