/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TaskMailboxImplTest {
    private static final RunnableWithException NO_OP = () -> {};
    private static final int DEFAULT_PRIORITY = 0;
    private TaskMailbox taskMailbox;

    @Before
    public void setUp() {
        this.taskMailbox = new TaskMailboxImpl();
    }

    @After
    public void tearDown() {
        this.taskMailbox.close();
    }

    @Test
    public void testPutAsHead() throws InterruptedException {
        Mail mailA = new Mail(() -> {}, Integer.MAX_VALUE, "mailA", new Object[0]);
        Mail mailB = new Mail(() -> {}, Integer.MAX_VALUE, "mailB", new Object[0]);
        Mail mailC = new Mail(() -> {}, 0, "mailC, DEFAULT_PRIORITY", new Object[0]);
        Mail mailD = new Mail(() -> {}, 0, "mailD, DEFAULT_PRIORITY", new Object[0]);
        this.taskMailbox.put(mailC);
        this.taskMailbox.putFirst(mailB);
        this.taskMailbox.put(mailD);
        this.taskMailbox.putFirst(mailA);
        Assert.assertSame((Object)mailA, (Object)this.taskMailbox.take(0));
        Assert.assertSame((Object)mailB, (Object)this.taskMailbox.take(0));
        Assert.assertSame((Object)mailC, (Object)this.taskMailbox.take(0));
        Assert.assertSame((Object)mailD, (Object)this.taskMailbox.take(0));
        Assert.assertFalse((boolean)this.taskMailbox.tryTake(0).isPresent());
    }

    @Test
    public void testContracts() throws InterruptedException {
        LinkedList<Mail> testObjects = new LinkedList<Mail>();
        Assert.assertFalse((boolean)this.taskMailbox.hasMail());
        for (int i = 0; i < 10; ++i) {
            Mail mail = new Mail((ThrowingRunnable)NO_OP, 0, "mail, DEFAULT_PRIORITY", new Object[0]);
            testObjects.add(mail);
            this.taskMailbox.put(mail);
            Assert.assertTrue((boolean)this.taskMailbox.hasMail());
        }
        while (!testObjects.isEmpty()) {
            Assert.assertEquals(testObjects.remove(), (Object)this.taskMailbox.take(0));
            Assert.assertEquals((Object)(!testObjects.isEmpty() ? 1 : 0), (Object)this.taskMailbox.hasMail());
        }
    }

    @Test
    public void testConcurrentPutTakeBlocking() throws Exception {
        this.testPutTake((FunctionWithException<TaskMailbox, Mail, InterruptedException>)((FunctionWithException)mailbox -> mailbox.take(0)));
    }

    @Test
    public void testConcurrentPutTakeNonBlockingAndWait() throws Exception {
        this.testPutTake((FunctionWithException<TaskMailbox, Mail, InterruptedException>)((FunctionWithException)mailbox -> {
            Optional optionalMail = mailbox.tryTake(0);
            while (!optionalMail.isPresent()) {
                optionalMail = mailbox.tryTake(0);
            }
            return (Mail)optionalMail.get();
        }));
    }

    @Test
    public void testCloseUnblocks() throws InterruptedException {
        this.testAllPuttingUnblocksInternal(TaskMailbox::close);
        this.setUp();
        this.testUnblocksInternal(() -> this.taskMailbox.take(0), TaskMailbox::close);
    }

    @Test
    public void testQuiesceUnblocks() throws InterruptedException {
        this.testAllPuttingUnblocksInternal(TaskMailbox::quiesce);
    }

    @Test
    public void testLifeCycleQuiesce() throws InterruptedException {
        this.taskMailbox.put(new Mail((ThrowingRunnable)NO_OP, 0, "NO_OP, DEFAULT_PRIORITY", new Object[0]));
        this.taskMailbox.put(new Mail((ThrowingRunnable)NO_OP, 0, "NO_OP, DEFAULT_PRIORITY", new Object[0]));
        this.taskMailbox.quiesce();
        this.testLifecyclePuttingInternal();
        this.taskMailbox.take(0);
        Assert.assertTrue((boolean)this.taskMailbox.tryTake(0).isPresent());
        Assert.assertFalse((boolean)this.taskMailbox.tryTake(0).isPresent());
    }

    @Test
    public void testLifeCycleClose() throws InterruptedException {
        this.taskMailbox.close();
        this.testLifecyclePuttingInternal();
        try {
            this.taskMailbox.take(0);
            Assert.fail();
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        try {
            this.taskMailbox.tryTake(0);
            Assert.fail();
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    private void testLifecyclePuttingInternal() {
        try {
            this.taskMailbox.put(new Mail((ThrowingRunnable)NO_OP, 0, "NO_OP, DEFAULT_PRIORITY", new Object[0]));
            Assert.fail();
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        try {
            this.taskMailbox.putFirst(new Mail((ThrowingRunnable)NO_OP, Integer.MAX_VALUE, "NO_OP", new Object[0]));
            Assert.fail();
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    private void testAllPuttingUnblocksInternal(Consumer<TaskMailbox> unblockMethod) throws InterruptedException {
        this.testUnblocksInternal(() -> this.taskMailbox.put(new Mail((ThrowingRunnable)NO_OP, 0, "NO_OP, DEFAULT_PRIORITY", new Object[0])), unblockMethod);
        this.setUp();
        this.testUnblocksInternal(() -> this.taskMailbox.putFirst(new Mail((ThrowingRunnable)NO_OP, Integer.MAX_VALUE, "NO_OP", new Object[0])), unblockMethod);
    }

    private void testUnblocksInternal(RunnableWithException testMethod, Consumer<TaskMailbox> unblockMethod) throws InterruptedException {
        Thread[] blockedThreads = new Thread[8];
        Exception[] exceptions = new Exception[blockedThreads.length];
        CountDownLatch countDownLatch = new CountDownLatch(blockedThreads.length);
        for (int i = 0; i < blockedThreads.length; ++i) {
            Thread blocked;
            int id = i;
            blockedThreads[i] = blocked = new Thread(() -> {
                try {
                    countDownLatch.countDown();
                    while (true) {
                        testMethod.run();
                    }
                }
                catch (Exception ex) {
                    exceptions[id] = ex;
                    return;
                }
            });
            blocked.start();
        }
        countDownLatch.await();
        unblockMethod.accept(this.taskMailbox);
        for (Thread blockedThread : blockedThreads) {
            blockedThread.join();
        }
        for (Exception exception : exceptions) {
            Assert.assertEquals(IllegalStateException.class, exception.getClass());
        }
    }

    private void testPutTake(FunctionWithException<TaskMailbox, Mail, InterruptedException> takeMethod) throws Exception {
        int numThreads = 10;
        int numMailsPerThread = 1000;
        int[] results = new int[10];
        Thread[] writerThreads = new Thread[10];
        for (int i = 0; i < writerThreads.length; ++i) {
            int threadId = i;
            writerThreads[i] = new Thread(ThrowingRunnable.unchecked(() -> {
                for (int k = 0; k < 1000; ++k) {
                    this.taskMailbox.put(new Mail(() -> {
                        int n = threadId;
                        results[n] = results[n] + 1;
                    }, 0, "result " + k, new Object[0]));
                }
            }));
        }
        for (Thread writerThread : writerThreads) {
            writerThread.start();
        }
        for (Thread writerThread : writerThreads) {
            writerThread.join();
        }
        AtomicBoolean isRunning = new AtomicBoolean(true);
        this.taskMailbox.put(new Mail(() -> isRunning.set(false), 0, "POISON_MAIL, DEFAULT_PRIORITY", new Object[0]));
        while (isRunning.get()) {
            ((Mail)takeMethod.apply((Object)this.taskMailbox)).run();
        }
        for (int perThreadResult : results) {
            Assert.assertEquals((long)1000L, (long)perThreadResult);
        }
    }

    @Test
    public void testPutAsHeadWithPriority() throws InterruptedException {
        Mail mailA = new Mail(() -> {}, 2, "mailA", new Object[0]);
        Mail mailB = new Mail(() -> {}, 2, "mailB", new Object[0]);
        Mail mailC = new Mail(() -> {}, 1, "mailC", new Object[0]);
        Mail mailD = new Mail(() -> {}, 1, "mailD", new Object[0]);
        this.taskMailbox.put(mailC);
        this.taskMailbox.put(mailB);
        this.taskMailbox.put(mailD);
        this.taskMailbox.putFirst(mailA);
        Assert.assertSame((Object)mailA, (Object)this.taskMailbox.take(2));
        Assert.assertSame((Object)mailB, (Object)this.taskMailbox.take(2));
        Assert.assertFalse((boolean)this.taskMailbox.tryTake(2).isPresent());
        Assert.assertSame((Object)mailC, (Object)this.taskMailbox.take(1));
        Assert.assertSame((Object)mailD, (Object)this.taskMailbox.take(1));
        Assert.assertFalse((boolean)this.taskMailbox.tryTake(1).isPresent());
    }

    @Test
    public void testPutWithPriorityAndReadingFromMainMailbox() throws InterruptedException {
        Mail mailA = new Mail(() -> {}, 2, "mailA", new Object[0]);
        Mail mailB = new Mail(() -> {}, 2, "mailB", new Object[0]);
        Mail mailC = new Mail(() -> {}, 1, "mailC", new Object[0]);
        Mail mailD = new Mail(() -> {}, 1, "mailD", new Object[0]);
        this.taskMailbox.put(mailC);
        this.taskMailbox.put(mailB);
        this.taskMailbox.put(mailD);
        this.taskMailbox.putFirst(mailA);
        Assert.assertSame((Object)mailA, (Object)this.taskMailbox.take(-1));
        Assert.assertSame((Object)mailC, (Object)this.taskMailbox.take(-1));
        Assert.assertSame((Object)mailB, (Object)this.taskMailbox.take(-1));
        Assert.assertSame((Object)mailD, (Object)this.taskMailbox.take(-1));
    }

    @Test
    public void testBatchAndNonBatchTake() throws InterruptedException {
        List mails = IntStream.range(0, 6).mapToObj(i -> new Mail((ThrowingRunnable)NO_OP, 0, String.valueOf(i), new Object[0])).collect(Collectors.toList());
        mails.subList(0, 3).forEach(arg_0 -> ((TaskMailbox)this.taskMailbox).put(arg_0));
        Assert.assertTrue((boolean)this.taskMailbox.createBatch());
        mails.subList(3, 6).forEach(arg_0 -> ((TaskMailbox)this.taskMailbox).put(arg_0));
        Assert.assertEquals(Optional.ofNullable(mails.get(0)), (Object)this.taskMailbox.tryTakeFromBatch());
        Assert.assertEquals(Optional.ofNullable(mails.get(1)), (Object)this.taskMailbox.tryTake(0));
        Assert.assertEquals(mails.get(2), (Object)this.taskMailbox.take(0));
        Assert.assertEquals(Optional.empty(), (Object)this.taskMailbox.tryTakeFromBatch());
        Assert.assertEquals(Optional.ofNullable(mails.get(3)), (Object)this.taskMailbox.tryTake(0));
        Assert.assertEquals(mails.get(4), (Object)this.taskMailbox.take(0));
        Assert.assertEquals(Collections.singletonList(mails.get(5)), (Object)this.taskMailbox.close());
    }

    @Test
    public void testBatchDrain() throws Exception {
        Mail mailA = new Mail(() -> {}, Integer.MAX_VALUE, "mailA", new Object[0]);
        Mail mailB = new Mail(() -> {}, Integer.MAX_VALUE, "mailB", new Object[0]);
        this.taskMailbox.put(mailA);
        Assert.assertTrue((boolean)this.taskMailbox.createBatch());
        this.taskMailbox.put(mailB);
        Assert.assertEquals(Arrays.asList(mailA, mailB), (Object)this.taskMailbox.drain());
    }

    @Test
    public void testBatchPriority() throws Exception {
        Mail mailA = new Mail(() -> {}, 1, "mailA", new Object[0]);
        Mail mailB = new Mail(() -> {}, 2, "mailB", new Object[0]);
        this.taskMailbox.put(mailA);
        Assert.assertTrue((boolean)this.taskMailbox.createBatch());
        this.taskMailbox.put(mailB);
        Assert.assertEquals((Object)mailB, (Object)this.taskMailbox.take(2));
        Assert.assertEquals(Optional.of(mailA), (Object)this.taskMailbox.tryTakeFromBatch());
    }

    @Test
    public void testRunExclusively() throws InterruptedException {
        CountDownLatch exclusiveCodeStarted = new CountDownLatch(1);
        int numMails = 10;
        new Thread(() -> this.taskMailbox.runExclusively(() -> {
            exclusiveCodeStarted.countDown();
            for (int index = 0; index < 10; ++index) {
                try {
                    this.taskMailbox.put(new Mail(() -> {}, 1, "mailD", new Object[0]));
                    Thread.sleep(1L);
                    continue;
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        })).start();
        exclusiveCodeStarted.await();
        Assert.assertEquals((long)10L, (long)this.taskMailbox.close().size());
    }
}

