/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.util.LinkedList;
import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxReceiver;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxSender;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class MailboxImplTest {
    private static final Runnable POISON_LETTER = () -> {};
    private static final int CAPACITY_POW_2 = 1;
    private static final int CAPACITY = 2;
    private Mailbox mailbox;

    @Before
    public void setUp() throws Exception {
        this.mailbox = new MailboxImpl(1);
    }

    @Test
    public void testClearAndPut() {
        for (int i = 0; i < 2; ++i) {
            Assert.assertTrue((boolean)this.mailbox.tryPutMail(() -> {}));
        }
        this.mailbox.clearAndPut(POISON_LETTER);
        Assert.assertTrue((boolean)this.mailbox.hasMail());
        Assert.assertEquals((Object)POISON_LETTER, this.mailbox.tryTakeMail().get());
        Assert.assertFalse((boolean)this.mailbox.hasMail());
    }

    @Test
    public void testContracts() throws Exception {
        LinkedList<Runnable> testObjects = new LinkedList<Runnable>();
        Assert.assertFalse((boolean)this.mailbox.hasMail());
        for (int i = 0; i < 2; ++i) {
            Runnable letter = () -> {};
            testObjects.add(letter);
            Assert.assertTrue((boolean)this.mailbox.tryPutMail(letter));
            Assert.assertTrue((boolean)this.mailbox.hasMail());
        }
        Assert.assertFalse((boolean)this.mailbox.tryPutMail(() -> {}));
        while (!testObjects.isEmpty()) {
            Assert.assertEquals(testObjects.remove(), this.mailbox.tryTakeMail().get());
            Assert.assertEquals((Object)(!testObjects.isEmpty() ? 1 : 0), (Object)this.mailbox.hasMail());
            this.mailbox.waitUntilHasCapacity();
        }
        Thread waitingReader = new Thread(ThrowingRunnable.unchecked(() -> this.mailbox.waitUntilHasMail()));
        waitingReader.start();
        Thread.sleep(1L);
        Assert.assertTrue((boolean)waitingReader.isAlive());
        this.mailbox.tryPutMail(() -> {});
        waitingReader.join();
        while (this.mailbox.tryPutMail(() -> {})) {
        }
        Thread waitingWriter = new Thread(ThrowingRunnable.unchecked(() -> this.mailbox.waitUntilHasCapacity()));
        waitingWriter.start();
        Thread.sleep(1L);
        Assert.assertTrue((boolean)waitingWriter.isAlive());
        this.mailbox.takeMail();
        waitingWriter.join();
    }

    @Test
    public void testConcurrentPutTakeBlocking() throws Exception {
        this.testPutTake((FunctionWithException<Mailbox, Runnable, Exception>)((FunctionWithException)MailboxReceiver::takeMail), (BiConsumerWithException<Mailbox, Runnable, Exception>)((BiConsumerWithException)MailboxSender::putMail));
    }

    @Test
    public void testConcurrentPutTakeNonBlockingAndWait() throws Exception {
        this.testPutTake((FunctionWithException<Mailbox, Runnable, Exception>)((FunctionWithException)mailbox -> {
            mailbox.waitUntilHasMail();
            return (Runnable)mailbox.tryTakeMail().get();
        }), (BiConsumerWithException<Mailbox, Runnable, Exception>)((BiConsumerWithException)(mailbox, runnable) -> {
            while (!mailbox.tryPutMail(runnable)) {
                mailbox.waitUntilHasCapacity();
            }
        }));
    }

    /*
     * WARNING - void declaration
     */
    private void testPutTake(FunctionWithException<Mailbox, Runnable, Exception> takeMethod, BiConsumerWithException<Mailbox, Runnable, Exception> putMethod) throws Exception {
        void var8_9;
        int numThreads = 10;
        int numLettersPerThread = 1000;
        int[] results = new int[10];
        Thread[] writerThreads = new Thread[10];
        Thread readerThread = new Thread(ThrowingRunnable.unchecked(() -> {
            Runnable letter;
            while ((letter = (Runnable)takeMethod.apply((Object)this.mailbox)) != POISON_LETTER) {
                letter.run();
            }
        }));
        readerThread.start();
        boolean bl = false;
        while (var8_9 < writerThreads.length) {
            void threadId = var8_9;
            writerThreads[var8_9] = new Thread(ThrowingRunnable.unchecked(() -> this.lambda$testPutTake$12(putMethod, results, (int)threadId)));
            ++var8_9;
        }
        for (Thread writerThread : writerThreads) {
            writerThread.start();
        }
        for (Thread writerThread : writerThreads) {
            writerThread.join();
        }
        this.mailbox.putMail(POISON_LETTER);
        readerThread.join();
        for (int perThreadResult : results) {
            Assert.assertEquals((long)1000L, (long)perThreadResult);
        }
    }
}

