package org.apache.flink.ml.util;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

/* loaded from: input_file:org/apache/flink/ml/util/InMemorySinkFunction.class */
public class InMemorySinkFunction<T> extends RichSinkFunction<T> {
    private static final Map<UUID, BlockingQueue> queueMap = new ConcurrentHashMap();
    private final UUID id = UUID.randomUUID();
    private BlockingQueue<T> queue = new LinkedBlockingQueue();

    public InMemorySinkFunction() {
        queueMap.put(this.id, this.queue);
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.queue = queueMap.get(this.id);
    }

    public void close() throws Exception {
        super.close();
        queueMap.remove(this.id);
    }

    public void invoke(T t, SinkFunction.Context context) {
        if (!this.queue.offer(t)) {
            throw new RuntimeException("Failed to offer " + t + " to blocking queue " + this.id + ".");
        }
    }

    public List<T> poll(int i) throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(poll());
        }
        return arrayList;
    }

    public T poll() throws InterruptedException {
        T poll = this.queue.poll(1L, TimeUnit.MINUTES);
        if (poll == null) {
            throw new RuntimeException("Failed to poll next value from blocking queue.");
        }
        return poll;
    }
}
