package org.apache.flink.streaming.api.functions.source;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.class */
public class StatefulSequenceSource extends RichParallelSourceFunction<Long> implements Checkpointed<Long> {
    private static final long serialVersionUID = 1;
    private final long start;
    private final long end;
    private long collected;
    private volatile boolean isRunning = true;

    public StatefulSequenceSource(long j, long j2) {
        this.start = j;
        this.end = j2;
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void run(SourceFunction.SourceContext<Long> sourceContext) throws Exception {
        Object checkpointLock = sourceContext.getCheckpointLock();
        long numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        long indexOfThisSubtask = this.start + r0.getIndexOfThisSubtask();
        long j = ((this.end - this.start) + serialVersionUID) % numberOfParallelSubtasks > indexOfThisSubtask - this.start ? (((this.end - this.start) + serialVersionUID) / numberOfParallelSubtasks) + serialVersionUID : ((this.end - this.start) + serialVersionUID) / numberOfParallelSubtasks;
        while (this.isRunning && this.collected < j) {
            synchronized (checkpointLock) {
                sourceContext.collect(Long.valueOf((this.collected * numberOfParallelSubtasks) + indexOfThisSubtask));
                this.collected += serialVersionUID;
            }
        }
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void cancel() {
        this.isRunning = false;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.streaming.api.checkpoint.Checkpointed
    /* renamed from: snapshotState */
    public Long mo229snapshotState(long j, long j2) {
        return Long.valueOf(this.collected);
    }

    @Override // org.apache.flink.streaming.api.checkpoint.Checkpointed
    public void restoreState(Long l) {
        this.collected = l.longValue();
    }
}
