package org.apache.flink.ml.util;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/ml/util/InMemorySourceFunction.class */
public class InMemorySourceFunction<T> extends RichSourceFunction<T> {
    private static final Map<UUID, BlockingQueue> queueMap = new ConcurrentHashMap();
    private volatile boolean isRunning = true;
    private final UUID id = UUID.randomUUID();
    private BlockingQueue<Optional<T>> queue = new LinkedBlockingQueue();

    public InMemorySourceFunction() {
        queueMap.put(this.id, this.queue);
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.queue = queueMap.get(this.id);
    }

    public void close() throws Exception {
        super.close();
        queueMap.remove(this.id);
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws InterruptedException {
        while (this.isRunning) {
            Optional<T> take = this.queue.take();
            if (!take.isPresent()) {
                return;
            } else {
                sourceContext.collect(take.get());
            }
        }
    }

    public void cancel() {
        this.isRunning = false;
        this.queue.add(Optional.empty());
    }

    @SafeVarargs
    public final void addAll(T... tArr) {
        Preconditions.checkState(this.isRunning);
        for (T t : tArr) {
            this.queue.add(Optional.of(t));
        }
    }
}
