/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Test;

public class MailboxOperatorTest
extends TestLogger {
    @Test
    public void testAvoidTaskStarvation() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        int maxProcessingElements = 3;
        testHarness.setupOperatorChain(new OperatorID(), (StreamOperatorFactory<?>)new ReplicatingMailOperatorFactory(3)).chain(new OperatorID(), new ReplicatingMailOperatorFactory(3), IntSerializer.INSTANCE).finish();
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        for (int i = 0; i < 3; ++i) {
            testHarness.processElement(new StreamRecord((Object)0));
        }
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        ArrayList<Integer> expected = new ArrayList<Integer>();
        for (int i = 0; i < 3; ++i) {
            expected.add(i * 2);
        }
        List numMailsProcessed = testHarness.getOutput().stream().map(element -> (Integer)((StreamRecord)element).getValue()).collect(Collectors.toList());
        MatcherAssert.assertThat(numMailsProcessed, (Matcher)CoreMatchers.is(expected));
    }

    private static class ReplicatingMail
    implements RunnableWithException {
        private int mailCount = -1;
        private boolean stopped = false;
        private final MailboxExecutor mailboxExecutor;

        ReplicatingMail(MailboxExecutor mailboxExecutor) {
            this.mailboxExecutor = mailboxExecutor;
        }

        public void run() {
            try {
                if (!this.stopped) {
                    this.mailboxExecutor.execute((ThrowingRunnable)this, "Blocking mail" + ++this.mailCount);
                }
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }

        boolean hasBeenEnqueued() {
            return this.mailCount > -1;
        }

        int getMailCount() {
            return this.mailCount;
        }

        void stop() {
            this.stopped = true;
        }
    }

    private static class ReplicatingMailOperator
    extends AbstractStreamOperator<Integer>
    implements OneInputStreamOperator<Integer, Integer> {
        private final int maxProcessingElements;
        private final ReplicatingMail replicatingMail;
        private long numProcessedElements = 0L;

        ReplicatingMailOperator(int maxProcessingElements, MailboxExecutor mailboxExecutor) {
            this.maxProcessingElements = maxProcessingElements;
            this.replicatingMail = new ReplicatingMail(mailboxExecutor);
        }

        public void processElement(StreamRecord<Integer> upstreamMailCount) throws Exception {
            if (this.numProcessedElements >= (long)this.maxProcessingElements) {
                return;
            }
            if (!this.replicatingMail.hasBeenEnqueued()) {
                this.replicatingMail.run();
            }
            this.output.collect((Object)new StreamRecord((Object)(this.replicatingMail.getMailCount() + (Integer)upstreamMailCount.getValue())));
            if (++this.numProcessedElements == (long)this.maxProcessingElements) {
                this.replicatingMail.stop();
            }
        }
    }

    private static class ReplicatingMailOperatorFactory
    extends AbstractStreamOperatorFactory<Integer>
    implements OneInputStreamOperatorFactory<Integer, Integer>,
    YieldingOperatorFactory<Integer> {
        private final int maxProcessingElements;
        private MailboxExecutor mailboxExecutor;

        ReplicatingMailOperatorFactory(int maxProcessingElements) {
            this.maxProcessingElements = maxProcessingElements;
        }

        public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
            this.mailboxExecutor = mailboxExecutor;
        }

        public <Operator extends StreamOperator<Integer>> Operator createStreamOperator(StreamOperatorParameters<Integer> parameters) {
            ReplicatingMailOperator operator = new ReplicatingMailOperator(this.maxProcessingElements, this.mailboxExecutor);
            operator.setProcessingTimeService(this.processingTimeService);
            operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
            return (Operator)((Object)operator);
        }

        public void setChainingStrategy(ChainingStrategy strategy) {
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return ReplicatingMailOperator.class;
        }
    }
}

