/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.testutils;

import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.SerializableObject;

public class IntegerSource
extends RichParallelSourceFunction<Integer>
implements ListCheckpointed<Integer>,
CheckpointListener {
    private final Object blocker = new SerializableObject();
    private final int numEventsTotal;
    private int currentPosition = -1;
    private long lastCheckpointTriggered;
    private long lastCheckpointConfirmed;
    private boolean restored;
    private volatile boolean running = true;

    public IntegerSource(int numEventsTotal) {
        this.numEventsTotal = numEventsTotal;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
        long lastCheckpoint;
        int current;
        int stepSize = this.getRuntimeContext().getNumberOfParallelSubtasks();
        int n = current = this.currentPosition >= 0 ? this.currentPosition : this.getRuntimeContext().getIndexOfThisSubtask();
        while (this.running && current < this.numEventsTotal) {
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                ctx.collect((Object)current);
                this.currentPosition = current += stepSize;
            }
            if (this.restored || current % 10 != 0) continue;
            Thread.sleep(1L);
        }
        Object object = ctx.getCheckpointLock();
        synchronized (object) {
            lastCheckpoint = this.lastCheckpointTriggered;
        }
        object = this.blocker;
        synchronized (object) {
            while (this.lastCheckpointConfirmed <= lastCheckpoint + 1L) {
                this.blocker.wait();
            }
        }
    }

    public void cancel() {
        this.running = false;
    }

    public List<Integer> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        this.lastCheckpointTriggered = checkpointId;
        return Collections.singletonList(this.currentPosition);
    }

    public void restoreState(List<Integer> state) throws Exception {
        this.currentPosition = state.get(0);
        this.lastCheckpointTriggered = 1L;
        this.lastCheckpointConfirmed = 1L;
        this.restored = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        Object object = this.blocker;
        synchronized (object) {
            this.lastCheckpointConfirmed = checkpointId;
            this.blocker.notifyAll();
        }
    }

    public void notifyCheckpointAborted(long checkpointId) {
    }
}

