/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class StatefulSequenceSource
extends RichParallelSourceFunction<Long>
implements Checkpointed<Long> {
    private static final long serialVersionUID = 1L;
    private final long start;
    private final long end;
    private long collected;
    private volatile boolean isRunning = true;

    public StatefulSequenceSource(long start, long end) {
        this.start = start;
        this.end = end;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
        long congruence;
        long toCollect;
        Object checkpointLock = ctx.getCheckpointLock();
        RuntimeContext context = this.getRuntimeContext();
        long stepSize = context.getNumberOfParallelSubtasks();
        long l = toCollect = (this.end - this.start + 1L) % stepSize > (congruence = this.start + (long)context.getIndexOfThisSubtask()) - this.start ? (this.end - this.start + 1L) / stepSize + 1L : (this.end - this.start + 1L) / stepSize;
        while (this.isRunning && this.collected < toCollect) {
            Object object = checkpointLock;
            synchronized (object) {
                ctx.collect(this.collected * stepSize + congruence);
                ++this.collected;
            }
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

    @Override
    public Long snapshotState(long checkpointId, long checkpointTimestamp) {
        return this.collected;
    }

    @Override
    public void restoreState(Long state) {
        this.collected = state;
    }
}

