/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.async;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.operators.async.Emitter;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
import org.apache.flink.streaming.api.operators.async.queue.WatermarkQueueEntry;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

public class EmitterTest
extends TestLogger {
    private static final long timeout = 10000L;
    private static ExecutorService executor;

    @BeforeClass
    public static void setup() {
        executor = Executors.newFixedThreadPool(3);
    }

    @AfterClass
    public static void shutdown() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                executor.shutdownNow();
            }
        }
        catch (InterruptedException interrupted) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEmitterWithOrderedQueue() throws Exception {
        Object lock = new Object();
        ArrayList<StreamElement> list = new ArrayList<StreamElement>();
        CollectorOutput output = new CollectorOutput(list);
        List<StreamElement> expected = Arrays.asList(new StreamRecord((Object)1, 0L), new StreamRecord((Object)2, 0L), new StreamRecord((Object)3, 1L), new StreamRecord((Object)4, 1L), new Watermark(3L), new StreamRecord((Object)5, 4L), new StreamRecord((Object)6, 4L));
        OperatorActions operatorActions = (OperatorActions)Mockito.mock(OperatorActions.class);
        int capacity = 5;
        OrderedStreamElementQueue queue = new OrderedStreamElementQueue(5, (Executor)executor, operatorActions);
        Emitter emitter = new Emitter(lock, output, (StreamElementQueue)queue, operatorActions);
        Thread emitterThread = new Thread((Runnable)emitter);
        emitterThread.start();
        try {
            StreamRecordQueueEntry record1 = new StreamRecordQueueEntry(new StreamRecord((Object)1, 0L));
            StreamRecordQueueEntry record2 = new StreamRecordQueueEntry(new StreamRecord((Object)2, 1L));
            WatermarkQueueEntry watermark1 = new WatermarkQueueEntry(new Watermark(3L));
            StreamRecordQueueEntry record3 = new StreamRecordQueueEntry(new StreamRecord((Object)3, 4L));
            queue.put((StreamElementQueueEntry)record1);
            queue.put((StreamElementQueueEntry)record2);
            queue.put((StreamElementQueueEntry)watermark1);
            queue.put((StreamElementQueueEntry)record3);
            record2.complete(Arrays.asList(3, 4));
            record1.complete(Arrays.asList(1, 2));
            record3.complete(Arrays.asList(5, 6));
            Object object = lock;
            synchronized (object) {
                while (!queue.isEmpty()) {
                    lock.wait();
                }
            }
            Assert.assertEquals(expected, list);
        }
        finally {
            emitter.stop();
            emitterThread.interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEmitterWithExceptions() throws Exception {
        Object lock = new Object();
        ArrayList<StreamElement> list = new ArrayList<StreamElement>();
        CollectorOutput output = new CollectorOutput(list);
        List<StreamElement> expected = Arrays.asList(new StreamRecord((Object)1, 0L), new Watermark(3L));
        OperatorActions operatorActions = (OperatorActions)Mockito.mock(OperatorActions.class);
        int capacity = 3;
        OrderedStreamElementQueue queue = new OrderedStreamElementQueue(3, (Executor)executor, operatorActions);
        Emitter emitter = new Emitter(lock, output, (StreamElementQueue)queue, operatorActions);
        Thread emitterThread = new Thread((Runnable)emitter);
        emitterThread.start();
        Exception testException = new Exception("Test exception");
        try {
            StreamRecordQueueEntry record1 = new StreamRecordQueueEntry(new StreamRecord((Object)1, 0L));
            StreamRecordQueueEntry record2 = new StreamRecordQueueEntry(new StreamRecord((Object)2, 1L));
            WatermarkQueueEntry watermark1 = new WatermarkQueueEntry(new Watermark(3L));
            queue.put((StreamElementQueueEntry)record1);
            queue.put((StreamElementQueueEntry)record2);
            queue.put((StreamElementQueueEntry)watermark1);
            record2.completeExceptionally((Throwable)testException);
            record1.complete(Arrays.asList(1));
            Object object = lock;
            synchronized (object) {
                while (!queue.isEmpty()) {
                    lock.wait();
                }
            }
            Assert.assertEquals(expected, list);
            ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Throwable.class);
            ((OperatorActions)Mockito.verify((Object)operatorActions)).failOperator((Throwable)argumentCaptor.capture());
            Throwable failureCause = (Throwable)argumentCaptor.getValue();
            Assert.assertNotNull((Object)failureCause.getCause());
            Assert.assertTrue((boolean)(failureCause.getCause() instanceof ExecutionException));
            Assert.assertNotNull((Object)failureCause.getCause().getCause());
            Assert.assertEquals((Object)testException, (Object)failureCause.getCause().getCause());
        }
        finally {
            emitter.stop();
            emitterThread.interrupt();
        }
    }
}

