/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.async.queue;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.operators.async.queue.AsyncResult;
import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
import org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.WatermarkQueueEntry;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

@RunWith(value=Parameterized.class)
public class StreamElementQueueTest
extends TestLogger {
    private static final long timeout = 10000L;
    private static ExecutorService executor;
    private final StreamElementQueueType streamElementQueueType;

    @BeforeClass
    public static void setup() {
        executor = Executors.newFixedThreadPool(3);
    }

    @AfterClass
    public static void shutdown() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                executor.shutdownNow();
            }
        }
        catch (InterruptedException interrupted) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Parameterized.Parameters
    public static Collection<StreamElementQueueType> streamElementQueueTypes() {
        return Arrays.asList(StreamElementQueueType.OrderedStreamElementQueueType, StreamElementQueueType.UnorderedStreamElementQueueType);
    }

    public StreamElementQueueTest(StreamElementQueueType streamElementQueueType) {
        this.streamElementQueueType = (StreamElementQueueType)((Object)Preconditions.checkNotNull((Object)((Object)streamElementQueueType)));
    }

    public StreamElementQueue createStreamElementQueue(int capacity, OperatorActions operatorActions) {
        switch (this.streamElementQueueType) {
            case OrderedStreamElementQueueType: {
                return new OrderedStreamElementQueue(capacity, (Executor)executor, operatorActions);
            }
            case UnorderedStreamElementQueueType: {
                return new UnorderedStreamElementQueue(capacity, (Executor)executor, operatorActions);
            }
        }
        throw new IllegalStateException("Unknown stream element queue type: " + (Object)((Object)this.streamElementQueueType));
    }

    @Test
    public void testPut() throws InterruptedException {
        OperatorActions operatorActions = (OperatorActions)Mockito.mock(OperatorActions.class);
        StreamElementQueue queue = this.createStreamElementQueue(2, operatorActions);
        Watermark watermark = new Watermark(0L);
        StreamRecord streamRecord = new StreamRecord((Object)42, 1L);
        Watermark nextWatermark = new Watermark(2L);
        WatermarkQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(watermark);
        StreamRecordQueueEntry streamRecordQueueEntry = new StreamRecordQueueEntry(streamRecord);
        queue.put((StreamElementQueueEntry)watermarkQueueEntry);
        queue.put((StreamElementQueueEntry)streamRecordQueueEntry);
        Assert.assertEquals((long)2L, (long)queue.size());
        Assert.assertFalse((boolean)queue.tryPut((StreamElementQueueEntry)new WatermarkQueueEntry(nextWatermark)));
        Collection actualValues = queue.values();
        List<StreamElementQueueEntry> expectedValues = Arrays.asList(watermarkQueueEntry, streamRecordQueueEntry);
        Assert.assertEquals(expectedValues, (Object)actualValues);
        ((OperatorActions)Mockito.verify((Object)operatorActions, (VerificationMode)Mockito.never())).failOperator((Throwable)Matchers.any(Exception.class));
    }

    @Test
    public void testPoll() throws InterruptedException {
        OperatorActions operatorActions = (OperatorActions)Mockito.mock(OperatorActions.class);
        StreamElementQueue queue = this.createStreamElementQueue(2, operatorActions);
        WatermarkQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(new Watermark(0L));
        StreamRecordQueueEntry streamRecordQueueEntry = new StreamRecordQueueEntry(new StreamRecord((Object)42, 1L));
        queue.put((StreamElementQueueEntry)watermarkQueueEntry);
        queue.put((StreamElementQueueEntry)streamRecordQueueEntry);
        Assert.assertEquals((Object)watermarkQueueEntry, (Object)queue.peekBlockingly());
        Assert.assertEquals((long)2L, (long)queue.size());
        Assert.assertEquals((Object)watermarkQueueEntry, (Object)queue.poll());
        Assert.assertEquals((long)1L, (long)queue.size());
        streamRecordQueueEntry.complete(Collections.emptyList());
        Assert.assertEquals((Object)streamRecordQueueEntry, (Object)queue.poll());
        Assert.assertEquals((long)0L, (long)queue.size());
        Assert.assertTrue((boolean)queue.isEmpty());
        ((OperatorActions)Mockito.verify((Object)operatorActions, (VerificationMode)Mockito.never())).failOperator((Throwable)Matchers.any(Exception.class));
    }

    @Test
    public void testBlockingPut() throws Exception {
        OperatorActions operatorActions = (OperatorActions)Mockito.mock(OperatorActions.class);
        StreamElementQueue queue = this.createStreamElementQueue(1, operatorActions);
        StreamRecordQueueEntry streamRecordQueueEntry = new StreamRecordQueueEntry(new StreamRecord((Object)42, 0L));
        StreamRecordQueueEntry streamRecordQueueEntry2 = new StreamRecordQueueEntry(new StreamRecord((Object)43, 1L));
        queue.put((StreamElementQueueEntry)streamRecordQueueEntry);
        Assert.assertEquals((long)1L, (long)queue.size());
        CompletableFuture<Void> putOperation = CompletableFuture.runAsync(() -> {
            try {
                queue.put((StreamElementQueueEntry)streamRecordQueueEntry2);
            }
            catch (InterruptedException e) {
                throw new CompletionException(e);
            }
        }, executor);
        Thread.sleep(10L);
        Assert.assertFalse((boolean)putOperation.isDone());
        streamRecordQueueEntry.complete(Collections.emptyList());
        Assert.assertEquals((Object)streamRecordQueueEntry, (Object)queue.poll());
        putOperation.get();
        ((OperatorActions)Mockito.verify((Object)operatorActions, (VerificationMode)Mockito.never())).failOperator((Throwable)Matchers.any(Exception.class));
    }

    @Test
    public void testBlockingPoll() throws Exception {
        OperatorActions operatorActions = (OperatorActions)Mockito.mock(OperatorActions.class);
        StreamElementQueue queue = this.createStreamElementQueue(1, operatorActions);
        WatermarkQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(new Watermark(1L));
        StreamRecordQueueEntry streamRecordQueueEntry = new StreamRecordQueueEntry(new StreamRecord((Object)1, 2L));
        Assert.assertTrue((boolean)queue.isEmpty());
        CompletableFuture<AsyncResult> peekOperation = CompletableFuture.supplyAsync(() -> {
            try {
                return queue.peekBlockingly();
            }
            catch (InterruptedException e) {
                throw new CompletionException(e);
            }
        }, executor);
        Thread.sleep(10L);
        Assert.assertFalse((boolean)peekOperation.isDone());
        queue.put((StreamElementQueueEntry)watermarkQueueEntry);
        AsyncResult watermarkResult = peekOperation.get();
        Assert.assertEquals((Object)watermarkQueueEntry, (Object)watermarkResult);
        Assert.assertEquals((long)1L, (long)queue.size());
        Assert.assertEquals((Object)watermarkQueueEntry, (Object)queue.poll());
        Assert.assertTrue((boolean)queue.isEmpty());
        CompletableFuture<AsyncResult> pollOperation = CompletableFuture.supplyAsync(() -> {
            try {
                return queue.poll();
            }
            catch (InterruptedException e) {
                throw new CompletionException(e);
            }
        }, executor);
        Thread.sleep(10L);
        Assert.assertFalse((boolean)pollOperation.isDone());
        queue.put((StreamElementQueueEntry)streamRecordQueueEntry);
        Thread.sleep(10L);
        Assert.assertFalse((boolean)pollOperation.isDone());
        streamRecordQueueEntry.complete(Collections.emptyList());
        Assert.assertEquals((Object)streamRecordQueueEntry, (Object)pollOperation.get());
        Assert.assertTrue((boolean)queue.isEmpty());
        ((OperatorActions)Mockito.verify((Object)operatorActions, (VerificationMode)Mockito.never())).failOperator((Throwable)Matchers.any(Exception.class));
    }

    static enum StreamElementQueueType {
        OrderedStreamElementQueueType,
        UnorderedStreamElementQueueType;

    }
}

