/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class MailboxExecutorImplTest {
    public static final int DEFAULT_PRIORITY = 0;
    private MailboxExecutor mailboxExecutor;
    private ExecutorService otherThreadExecutor;
    private TaskMailboxImpl mailbox;
    private MailboxProcessor mailboxProcessor;
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        this.mailbox = new TaskMailboxImpl();
        this.mailboxExecutor = new MailboxExecutorImpl((TaskMailbox)this.mailbox, 0, StreamTaskActionExecutor.IMMEDIATE);
        this.otherThreadExecutor = Executors.newSingleThreadScheduledExecutor();
        this.mailboxProcessor = new MailboxProcessor(c -> {}, (TaskMailbox)this.mailbox, StreamTaskActionExecutor.IMMEDIATE);
    }

    @After
    public void tearDown() {
        this.otherThreadExecutor.shutdown();
        try {
            if (!this.otherThreadExecutor.awaitTermination(60L, TimeUnit.SECONDS)) {
                this.otherThreadExecutor.shutdownNow();
                if (!this.otherThreadExecutor.awaitTermination(60L, TimeUnit.SECONDS)) {
                    throw new IllegalStateException("Thread pool did not terminate on time!");
                }
            }
        }
        catch (InterruptedException ie) {
            this.otherThreadExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Test
    public void testIsIdle() throws Exception {
        MailboxProcessor processor = new MailboxProcessor(MailboxDefaultAction.Controller::suspendDefaultAction);
        MailboxExecutorImpl executor = (MailboxExecutorImpl)processor.getMailboxExecutor(0);
        Assert.assertFalse((boolean)executor.isIdle());
        processor.runMailboxStep();
        processor.mailbox.drain();
        Assert.assertTrue((boolean)executor.isIdle());
        executor.execute(() -> {}, "");
        Assert.assertFalse((boolean)executor.isIdle());
        processor.mailbox.drain();
        processor.mailbox.quiesce();
        Assert.assertFalse((boolean)executor.isIdle());
    }

    @Test
    public void testOperations() throws Exception {
        AtomicBoolean wasExecuted = new AtomicBoolean(false);
        CompletableFuture.runAsync(() -> this.mailboxExecutor.execute(() -> wasExecuted.set(true), ""), this.otherThreadExecutor).get();
        this.mailbox.take(0).run();
        Assert.assertTrue((boolean)wasExecuted.get());
    }

    @Test
    public void testClose() throws Exception {
        TestRunnable yieldRun = new TestRunnable();
        TestRunnable leftoverRun = new TestRunnable();
        this.mailboxExecutor.execute((ThrowingRunnable)yieldRun, "yieldRun");
        Future leftoverFuture = CompletableFuture.supplyAsync(() -> this.mailboxExecutor.submit((RunnableWithException)leftoverRun, "leftoverRun"), this.otherThreadExecutor).get();
        Assert.assertTrue((boolean)this.mailboxExecutor.tryYield());
        Assert.assertEquals((Object)Thread.currentThread(), (Object)yieldRun.wasExecutedBy());
        Assert.assertFalse((boolean)leftoverFuture.isDone());
        Assert.assertFalse((boolean)leftoverFuture.isCancelled());
        this.mailboxProcessor.close();
        Assert.assertTrue((boolean)leftoverFuture.isCancelled());
        try {
            this.mailboxExecutor.tryYield();
            Assert.fail((String)"yielding should not work after shutdown().");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        try {
            this.mailboxExecutor.yield();
            Assert.fail((String)"yielding should not work after shutdown().");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testTryYield() throws Exception {
        TestRunnable testRunnable = new TestRunnable();
        CompletableFuture.runAsync(() -> this.mailboxExecutor.execute((ThrowingRunnable)testRunnable, "testRunnable"), this.otherThreadExecutor).get();
        Assert.assertTrue((boolean)this.mailboxExecutor.tryYield());
        Assert.assertFalse((boolean)this.mailboxExecutor.tryYield());
        Assert.assertEquals((Object)Thread.currentThread(), (Object)testRunnable.wasExecutedBy());
    }

    @Test
    public void testYield() throws Exception {
        AtomicReference exceptionReference = new AtomicReference();
        TestRunnable testRunnable = new TestRunnable();
        Thread submitThread = new Thread(() -> {
            try {
                this.mailboxExecutor.execute((ThrowingRunnable)testRunnable, "testRunnable");
            }
            catch (Exception e) {
                exceptionReference.set(e);
            }
        });
        submitThread.start();
        this.mailboxExecutor.yield();
        submitThread.join();
        Assert.assertNull(exceptionReference.get());
        Assert.assertEquals((Object)Thread.currentThread(), (Object)testRunnable.wasExecutedBy());
    }

    static class TestRunnable
    implements RunnableWithException {
        private Thread executedByThread = null;

        TestRunnable() {
        }

        public void run() {
            Preconditions.checkState((!this.isExecuted() ? 1 : 0) != 0, (Object)("Runnable was already executed before by " + this.executedByThread));
            this.executedByThread = Thread.currentThread();
        }

        boolean isExecuted() {
            return this.executedByThread != null;
        }

        Thread wasExecutedBy() {
            return this.executedByThread;
        }
    }
}

