package org.apache.flink.streaming.api.functions.source.datagen;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

@Experimental
/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.class */
public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implements CheckpointedFunction {
    private static final long serialVersionUID = 1;
    private final DataGenerator<T> generator;
    private final long rowsPerSecond;
    volatile transient boolean isRunning;

    public DataGeneratorSource(DataGenerator<T> dataGenerator) {
        this(dataGenerator, Long.MAX_VALUE);
    }

    public DataGeneratorSource(DataGenerator<T> dataGenerator, long j) {
        this.generator = dataGenerator;
        this.rowsPerSecond = j;
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.generator.open("DataGenerator", functionInitializationContext, getRuntimeContext());
        this.isRunning = true;
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.generator.snapshotState(functionSnapshotContext);
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        double numberOfParallelSubtasks = this.rowsPerSecond / getRuntimeContext().getNumberOfParallelSubtasks();
        long currentTimeMillis = System.currentTimeMillis();
        while (this.isRunning) {
            for (int i = 0; i < numberOfParallelSubtasks; i++) {
                if (!this.isRunning || !this.generator.hasNext()) {
                    return;
                }
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(this.generator.next());
                }
            }
            currentTimeMillis += 1000;
            long j = currentTimeMillis;
            long currentTimeMillis2 = System.currentTimeMillis();
            while (true) {
                long j2 = j - currentTimeMillis2;
                if (j2 > 0) {
                    Thread.sleep(j2);
                    j = currentTimeMillis;
                    currentTimeMillis2 = System.currentTimeMillis();
                }
            }
        }
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void cancel() {
        this.isRunning = false;
    }
}
