package org.apache.flink.streaming.api.operators.async;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
import org.apache.flink.streaming.api.operators.async.queue.WatermarkQueueEntry;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/async/EmitterTest.class */
public class EmitterTest extends TestLogger {
    private static final long timeout = 10000;
    private static ExecutorService executor;

    @BeforeClass
    public static void setup() {
        executor = Executors.newFixedThreadPool(3);
    }

    @AfterClass
    public static void shutdown() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Test
    public void testEmitterWithOrderedQueue() throws Exception {
        Object obj = new Object();
        ArrayList arrayList = new ArrayList();
        CollectorOutput collectorOutput = new CollectorOutput(arrayList);
        List asList = Arrays.asList(new StreamRecord(1, 0L), new StreamRecord(2, 0L), new StreamRecord(3, 1L), new StreamRecord(4, 1L), new Watermark(3L), new StreamRecord(5, 4L), new StreamRecord(6, 4L));
        OperatorActions operatorActions = (OperatorActions) Mockito.mock(OperatorActions.class);
        OrderedStreamElementQueue orderedStreamElementQueue = new OrderedStreamElementQueue(5, executor, operatorActions);
        Emitter emitter = new Emitter(obj, collectorOutput, orderedStreamElementQueue, operatorActions);
        Thread thread = new Thread((Runnable) emitter);
        thread.start();
        try {
            StreamRecordQueueEntry streamRecordQueueEntry = new StreamRecordQueueEntry(new StreamRecord(1, 0L));
            StreamRecordQueueEntry streamRecordQueueEntry2 = new StreamRecordQueueEntry(new StreamRecord(2, 1L));
            WatermarkQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(new Watermark(3L));
            StreamRecordQueueEntry streamRecordQueueEntry3 = new StreamRecordQueueEntry(new StreamRecord(3, 4L));
            orderedStreamElementQueue.put(streamRecordQueueEntry);
            orderedStreamElementQueue.put(streamRecordQueueEntry2);
            orderedStreamElementQueue.put(watermarkQueueEntry);
            orderedStreamElementQueue.put(streamRecordQueueEntry3);
            streamRecordQueueEntry2.complete(Arrays.asList(3, 4));
            streamRecordQueueEntry.complete(Arrays.asList(1, 2));
            streamRecordQueueEntry3.complete(Arrays.asList(5, 6));
            synchronized (obj) {
                while (!orderedStreamElementQueue.isEmpty()) {
                    obj.wait();
                }
            }
            Assert.assertEquals(asList, arrayList);
            emitter.stop();
            thread.interrupt();
        } catch (Throwable th) {
            emitter.stop();
            thread.interrupt();
            throw th;
        }
    }

    @Test
    public void testEmitterWithExceptions() throws Exception {
        Object obj = new Object();
        ArrayList arrayList = new ArrayList();
        CollectorOutput collectorOutput = new CollectorOutput(arrayList);
        List asList = Arrays.asList(new StreamRecord(1, 0L), new Watermark(3L));
        OperatorActions operatorActions = (OperatorActions) Mockito.mock(OperatorActions.class);
        OrderedStreamElementQueue orderedStreamElementQueue = new OrderedStreamElementQueue(3, executor, operatorActions);
        Emitter emitter = new Emitter(obj, collectorOutput, orderedStreamElementQueue, operatorActions);
        Thread thread = new Thread((Runnable) emitter);
        thread.start();
        Exception exc = new Exception("Test exception");
        try {
            StreamRecordQueueEntry streamRecordQueueEntry = new StreamRecordQueueEntry(new StreamRecord(1, 0L));
            StreamRecordQueueEntry streamRecordQueueEntry2 = new StreamRecordQueueEntry(new StreamRecord(2, 1L));
            WatermarkQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(new Watermark(3L));
            orderedStreamElementQueue.put(streamRecordQueueEntry);
            orderedStreamElementQueue.put(streamRecordQueueEntry2);
            orderedStreamElementQueue.put(watermarkQueueEntry);
            streamRecordQueueEntry2.completeExceptionally(exc);
            streamRecordQueueEntry.complete(Arrays.asList(1));
            synchronized (obj) {
                while (!orderedStreamElementQueue.isEmpty()) {
                    obj.wait();
                }
            }
            Assert.assertEquals(asList, arrayList);
            ArgumentCaptor forClass = ArgumentCaptor.forClass(Throwable.class);
            ((OperatorActions) Mockito.verify(operatorActions)).failOperator((Throwable) forClass.capture());
            Throwable th = (Throwable) forClass.getValue();
            Assert.assertNotNull(th.getCause());
            Assert.assertTrue(th.getCause() instanceof ExecutionException);
            Assert.assertNotNull(th.getCause().getCause());
            Assert.assertEquals(exc, th.getCause().getCause());
            emitter.stop();
            thread.interrupt();
        } catch (Throwable th2) {
            emitter.stop();
            thread.interrupt();
            throw th2;
        }
    }
}
