package org.apache.flink.streaming.api.operators.async.queue;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.class */
public class UnorderedStreamElementQueueTest extends TestLogger {
    private static final long timeout = 10000;
    private static ExecutorService executor;

    @BeforeClass
    public static void setup() {
        executor = Executors.newFixedThreadPool(3);
    }

    @AfterClass
    public static void shutdown() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Test
    public void testCompletionOrder() throws Exception {
        OperatorActions operatorActions = (OperatorActions) Mockito.mock(OperatorActions.class);
        final UnorderedStreamElementQueue unorderedStreamElementQueue = new UnorderedStreamElementQueue(8, executor, operatorActions);
        StreamElementQueueEntry streamRecordQueueEntry = new StreamRecordQueueEntry(new StreamRecord(1, 0L));
        StreamElementQueueEntry streamRecordQueueEntry2 = new StreamRecordQueueEntry(new StreamRecord(2, 1L));
        StreamElementQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(new Watermark(2L));
        StreamElementQueueEntry streamRecordQueueEntry3 = new StreamRecordQueueEntry(new StreamRecord(3, 3L));
        StreamElementQueueEntry streamRecordQueueEntry4 = new StreamRecordQueueEntry(new StreamRecord(4, 4L));
        StreamElementQueueEntry watermarkQueueEntry2 = new WatermarkQueueEntry(new Watermark(5L));
        StreamElementQueueEntry streamRecordQueueEntry5 = new StreamRecordQueueEntry(new StreamRecord(5, 6L));
        StreamElementQueueEntry streamRecordQueueEntry6 = new StreamRecordQueueEntry(new StreamRecord(6, 7L));
        Iterator it = Arrays.asList(streamRecordQueueEntry, streamRecordQueueEntry2, watermarkQueueEntry, streamRecordQueueEntry3, streamRecordQueueEntry4, watermarkQueueEntry2, streamRecordQueueEntry5, streamRecordQueueEntry6).iterator();
        while (it.hasNext()) {
            unorderedStreamElementQueue.put((StreamElementQueueEntry) it.next());
        }
        Assert.assertTrue(8 == unorderedStreamElementQueue.size());
        Future supplyAsync = FlinkFuture.supplyAsync(new Callable<AsyncResult>() { // from class: org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueueTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public AsyncResult call() throws Exception {
                return unorderedStreamElementQueue.poll();
            }
        }, executor);
        streamRecordQueueEntry3.collect(Collections.emptyList());
        Thread.sleep(10L);
        Assert.assertFalse(supplyAsync.isDone());
        streamRecordQueueEntry2.collect(Collections.emptyList());
        Assert.assertEquals(streamRecordQueueEntry2, supplyAsync.get());
        Future supplyAsync2 = FlinkFuture.supplyAsync(new Callable<AsyncResult>() { // from class: org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueueTest.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public AsyncResult call() throws Exception {
                return unorderedStreamElementQueue.poll();
            }
        }, executor);
        streamRecordQueueEntry6.collect(Collections.emptyList());
        streamRecordQueueEntry4.collect(Collections.emptyList());
        Thread.sleep(10L);
        Assert.assertFalse(supplyAsync2.isDone());
        streamRecordQueueEntry.collect(Collections.emptyList());
        Assert.assertEquals(streamRecordQueueEntry, supplyAsync2.get());
        Assert.assertEquals(watermarkQueueEntry, unorderedStreamElementQueue.poll());
        HashSet hashSet = new HashSet(2);
        hashSet.add(streamRecordQueueEntry3);
        hashSet.add(streamRecordQueueEntry4);
        HashSet hashSet2 = new HashSet(2);
        hashSet2.add(unorderedStreamElementQueue.poll());
        hashSet2.add(unorderedStreamElementQueue.poll());
        Assert.assertEquals(hashSet, hashSet2);
        Assert.assertEquals(watermarkQueueEntry2, unorderedStreamElementQueue.poll());
        Assert.assertEquals(streamRecordQueueEntry6, unorderedStreamElementQueue.poll());
        Assert.assertTrue(1 == unorderedStreamElementQueue.size());
        Future supplyAsync3 = FlinkFuture.supplyAsync(new Callable<AsyncResult>() { // from class: org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueueTest.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public AsyncResult call() throws Exception {
                return unorderedStreamElementQueue.poll();
            }
        }, executor);
        Thread.sleep(10L);
        Assert.assertFalse(supplyAsync3.isDone());
        streamRecordQueueEntry5.collect(Collections.emptyList());
        Assert.assertEquals(streamRecordQueueEntry5, supplyAsync3.get());
        Assert.assertTrue(unorderedStreamElementQueue.isEmpty());
        ((OperatorActions) Mockito.verify(operatorActions, Mockito.never())).failOperator((Throwable) Matchers.any(Exception.class));
    }
}
