package org.apache.flink.streaming.util;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.junit.rules.ExternalResource;

/* loaded from: input_file:org/apache/flink/streaming/util/StreamCollector.class */
public class StreamCollector extends ExternalResource {
    private static final AtomicLong counter = new AtomicLong();
    private static final Map<Long, CountDownLatch> latches = new ConcurrentHashMap();
    private static final Map<Long, Queue> resultQueues = new ConcurrentHashMap();
    private List<Long> ids;

    /* loaded from: input_file:org/apache/flink/streaming/util/StreamCollector$CollectingSink.class */
    private static class CollectingSink<IN> extends RichSinkFunction<IN> {
        private final long id;
        private transient CountDownLatch latch;
        private transient Queue<IN> results;

        private CollectingSink(long j) {
            this.id = j;
        }

        public void open(Configuration configuration) throws Exception {
            this.latch = (CountDownLatch) StreamCollector.latches.get(Long.valueOf(this.id));
            this.results = (Queue) StreamCollector.resultQueues.get(Long.valueOf(this.id));
        }

        public void invoke(IN in, SinkFunction.Context context) throws Exception {
            this.results.add(in);
        }

        public void close() throws Exception {
            this.latch.countDown();
        }
    }

    protected void before() {
        this.ids = new ArrayList();
    }

    public <IN> CompletableFuture<Collection<IN>> collect(DataStream<IN> dataStream) {
        long andIncrement = counter.getAndIncrement();
        this.ids.add(Long.valueOf(andIncrement));
        int parallelism = dataStream.getParallelism();
        if (parallelism == -1) {
            parallelism = dataStream.getExecutionEnvironment().getParallelism();
        }
        CountDownLatch countDownLatch = new CountDownLatch(parallelism);
        latches.put(Long.valueOf(andIncrement), countDownLatch);
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        resultQueues.put(Long.valueOf(andIncrement), concurrentLinkedDeque);
        dataStream.addSink(new CollectingSink(andIncrement));
        return (CompletableFuture<Collection<IN>>) CompletableFuture.runAsync(() -> {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException("Failed to collect results");
            }
        }).thenApply(r3 -> {
            return concurrentLinkedDeque;
        });
    }

    protected void after() {
        for (Long l : this.ids) {
            latches.remove(l);
            resultQueues.remove(l);
        }
    }
}
