/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.async.queue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.operators.async.queue.AsyncResult;
import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
import org.apache.flink.streaming.api.operators.async.queue.WatermarkQueueEntry;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class OrderedStreamElementQueueTest
extends TestLogger {
    private static final long timeout = 10000L;
    private static ExecutorService executor;

    @BeforeClass
    public static void setup() {
        executor = Executors.newFixedThreadPool(3);
    }

    @AfterClass
    public static void shutdown() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                executor.shutdownNow();
            }
        }
        catch (InterruptedException interrupted) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Test
    public void testCompletionOrder() throws Exception {
        OperatorActions operatorActions = (OperatorActions)Mockito.mock(OperatorActions.class);
        OrderedStreamElementQueue queue = new OrderedStreamElementQueue(4, (Executor)executor, operatorActions);
        StreamRecordQueueEntry entry1 = new StreamRecordQueueEntry(new StreamRecord((Object)1, 0L));
        StreamRecordQueueEntry entry2 = new StreamRecordQueueEntry(new StreamRecord((Object)2, 1L));
        WatermarkQueueEntry entry3 = new WatermarkQueueEntry(new Watermark(2L));
        StreamRecordQueueEntry entry4 = new StreamRecordQueueEntry(new StreamRecord((Object)3, 3L));
        List<StreamElementQueueEntry> expected = Arrays.asList(entry1, entry2, entry3, entry4);
        for (StreamElementQueueEntry entry : expected) {
            queue.put(entry);
        }
        CompletableFuture<List> pollOperation = CompletableFuture.supplyAsync(() -> {
            ArrayList<AsyncResult> result = new ArrayList<AsyncResult>(4);
            while (!queue.isEmpty()) {
                try {
                    result.add(queue.poll());
                }
                catch (InterruptedException e) {
                    throw new CompletionException(e);
                }
            }
            return result;
        }, executor);
        Thread.sleep(10L);
        Assert.assertFalse((boolean)pollOperation.isDone());
        entry2.complete(Collections.emptyList());
        entry4.complete(Collections.emptyList());
        Thread.sleep(10L);
        Assert.assertEquals((long)4L, (long)queue.size());
        entry1.complete(Collections.emptyList());
        Assert.assertEquals(expected, (Object)pollOperation.get());
        ((OperatorActions)Mockito.verify((Object)operatorActions, (VerificationMode)Mockito.never())).failOperator((Throwable)Matchers.any(Exception.class));
    }
}

