package org.apache.flink.streaming.api.operators.async.queue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.class */
public class OrderedStreamElementQueueTest extends TestLogger {
    private static final long timeout = 10000;
    private static ExecutorService executor;

    @BeforeClass
    public static void setup() {
        executor = Executors.newFixedThreadPool(3);
    }

    @AfterClass
    public static void shutdown() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Test
    public void testCompletionOrder() throws Exception {
        OperatorActions operatorActions = (OperatorActions) Mockito.mock(OperatorActions.class);
        final OrderedStreamElementQueue orderedStreamElementQueue = new OrderedStreamElementQueue(4, executor, operatorActions);
        StreamElementQueueEntry streamRecordQueueEntry = new StreamRecordQueueEntry(new StreamRecord(1, 0L));
        StreamElementQueueEntry streamRecordQueueEntry2 = new StreamRecordQueueEntry(new StreamRecord(2, 1L));
        StreamElementQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(new Watermark(2L));
        StreamElementQueueEntry streamRecordQueueEntry3 = new StreamRecordQueueEntry(new StreamRecord(3, 3L));
        List asList = Arrays.asList(streamRecordQueueEntry, streamRecordQueueEntry2, watermarkQueueEntry, streamRecordQueueEntry3);
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            orderedStreamElementQueue.put((StreamElementQueueEntry) it.next());
        }
        Future supplyAsync = FlinkFuture.supplyAsync(new Callable<List<AsyncResult>>() { // from class: org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueueTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public List<AsyncResult> call() throws Exception {
                ArrayList arrayList = new ArrayList(4);
                while (!orderedStreamElementQueue.isEmpty()) {
                    arrayList.add(orderedStreamElementQueue.poll());
                }
                return arrayList;
            }
        }, executor);
        Thread.sleep(10L);
        Assert.assertFalse(supplyAsync.isDone());
        streamRecordQueueEntry2.collect(Collections.emptyList());
        streamRecordQueueEntry3.collect(Collections.emptyList());
        Thread.sleep(10L);
        Assert.assertEquals(4L, orderedStreamElementQueue.size());
        streamRecordQueueEntry.collect(Collections.emptyList());
        Assert.assertEquals(asList, supplyAsync.get());
        ((OperatorActions) Mockito.verify(operatorActions, Mockito.never())).failOperator((Throwable) Matchers.any(Exception.class));
    }
}
