package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.util.LinkedList;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/mailbox/MailboxImplTest.class */
public class MailboxImplTest {
    private static final Runnable POISON_LETTER = () -> {
    };
    private static final int CAPACITY_POW_2 = 1;
    private static final int CAPACITY = 2;
    private Mailbox mailbox;

    @Before
    public void setUp() throws Exception {
        this.mailbox = new MailboxImpl(CAPACITY_POW_2);
    }

    @Test
    public void testClearAndPut() {
        for (int i = 0; i < CAPACITY; i += CAPACITY_POW_2) {
            Assert.assertTrue(this.mailbox.tryPutMail(() -> {
            }));
        }
        this.mailbox.clearAndPut(POISON_LETTER);
        Assert.assertTrue(this.mailbox.hasMail());
        Assert.assertEquals(POISON_LETTER, this.mailbox.tryTakeMail().get());
        Assert.assertFalse(this.mailbox.hasMail());
    }

    @Test
    public void testContracts() throws Exception {
        LinkedList linkedList = new LinkedList();
        Assert.assertFalse(this.mailbox.hasMail());
        for (int i = 0; i < CAPACITY; i += CAPACITY_POW_2) {
            Runnable runnable = () -> {
            };
            linkedList.add(runnable);
            Assert.assertTrue(this.mailbox.tryPutMail(runnable));
            Assert.assertTrue(this.mailbox.hasMail());
        }
        Assert.assertFalse(this.mailbox.tryPutMail(() -> {
        }));
        while (!linkedList.isEmpty()) {
            Assert.assertEquals(linkedList.remove(), this.mailbox.tryTakeMail().get());
            Assert.assertEquals(Boolean.valueOf(!linkedList.isEmpty()), Boolean.valueOf(this.mailbox.hasMail()));
            this.mailbox.waitUntilHasCapacity();
        }
        Thread thread = new Thread(ThrowingRunnable.unchecked(() -> {
            this.mailbox.waitUntilHasMail();
        }));
        thread.start();
        Thread.sleep(1L);
        Assert.assertTrue(thread.isAlive());
        this.mailbox.tryPutMail(() -> {
        });
        thread.join();
        do {
        } while (this.mailbox.tryPutMail(() -> {
        }));
        Thread thread2 = new Thread(ThrowingRunnable.unchecked(() -> {
            this.mailbox.waitUntilHasCapacity();
        }));
        thread2.start();
        Thread.sleep(1L);
        Assert.assertTrue(thread2.isAlive());
        this.mailbox.takeMail();
        thread2.join();
    }

    @Test
    public void testConcurrentPutTakeBlocking() throws Exception {
        testPutTake((v0) -> {
            return v0.takeMail();
        }, (v0, v1) -> {
            v0.putMail(v1);
        });
    }

    @Test
    public void testConcurrentPutTakeNonBlockingAndWait() throws Exception {
        testPutTake(mailbox -> {
            mailbox.waitUntilHasMail();
            return (Runnable) mailbox.tryTakeMail().get();
        }, (mailbox2, runnable) -> {
            while (!mailbox2.tryPutMail(runnable)) {
                mailbox2.waitUntilHasCapacity();
            }
        });
    }

    private void testPutTake(FunctionWithException<Mailbox, Runnable, Exception> functionWithException, BiConsumerWithException<Mailbox, Runnable, Exception> biConsumerWithException) throws Exception {
        int[] iArr = new int[10];
        Thread[] threadArr = new Thread[10];
        Thread thread = new Thread(ThrowingRunnable.unchecked(() -> {
            while (true) {
                Runnable runnable = (Runnable) functionWithException.apply(this.mailbox);
                if (runnable == POISON_LETTER) {
                    return;
                } else {
                    runnable.run();
                }
            }
        }));
        thread.start();
        for (int i = 0; i < threadArr.length; i += CAPACITY_POW_2) {
            int i2 = i;
            threadArr[i] = new Thread(ThrowingRunnable.unchecked(() -> {
                for (int i3 = 0; i3 < 1000; i3 += CAPACITY_POW_2) {
                    biConsumerWithException.accept(this.mailbox, () -> {
                        iArr[i2] = iArr[i2] + CAPACITY_POW_2;
                    });
                }
            }));
        }
        int length = threadArr.length;
        for (int i3 = 0; i3 < length; i3 += CAPACITY_POW_2) {
            threadArr[i3].start();
        }
        int length2 = threadArr.length;
        for (int i4 = 0; i4 < length2; i4 += CAPACITY_POW_2) {
            threadArr[i4].join();
        }
        this.mailbox.putMail(POISON_LETTER);
        thread.join();
        int length3 = iArr.length;
        for (int i5 = 0; i5 < length3; i5 += CAPACITY_POW_2) {
            Assert.assertEquals(1000L, iArr[i5]);
        }
    }
}
