package org.apache.flink.streaming.runtime.operators;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/MailboxOperatorTest.class */
public class MailboxOperatorTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/MailboxOperatorTest$ReplicatingMail.class */
    private static class ReplicatingMail implements RunnableWithException {
        private int mailCount = -1;
        private final MailboxExecutor mailboxExecutor;

        ReplicatingMail(MailboxExecutor mailboxExecutor) {
            this.mailboxExecutor = mailboxExecutor;
        }

        public void run() {
            try {
                MailboxExecutor mailboxExecutor = this.mailboxExecutor;
                StringBuilder append = new StringBuilder().append("Blocking mail");
                int i = this.mailCount + 1;
                this.mailCount = i;
                mailboxExecutor.execute(this, append.append(i).toString());
            } catch (RejectedExecutionException e) {
            }
        }

        boolean hasBeenEnqueued() {
            return this.mailCount > -1;
        }

        int getMailCount() {
            return this.mailCount;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/MailboxOperatorTest$ReplicatingMailOperator.class */
    private static class ReplicatingMailOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
        private final ReplicatingMail replicatingMail;

        ReplicatingMailOperator(MailboxExecutor mailboxExecutor) {
            this.replicatingMail = new ReplicatingMail(mailboxExecutor);
        }

        public void processElement(StreamRecord<Integer> streamRecord) throws Exception {
            if (!this.replicatingMail.hasBeenEnqueued()) {
                this.replicatingMail.run();
            }
            this.output.collect(new StreamRecord(Integer.valueOf(this.replicatingMail.getMailCount() + ((Integer) streamRecord.getValue()).intValue())));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/MailboxOperatorTest$ReplicatingMailOperatorFactory.class */
    private static class ReplicatingMailOperatorFactory implements OneInputStreamOperatorFactory<Integer, Integer>, YieldingOperatorFactory<Integer> {
        private MailboxExecutor mailboxExecutor;

        private ReplicatingMailOperatorFactory() {
        }

        public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
            this.mailboxExecutor = mailboxExecutor;
        }

        public <Operator extends StreamOperator<Integer>> Operator createStreamOperator(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Integer>> output) {
            ReplicatingMailOperator replicatingMailOperator = new ReplicatingMailOperator(this.mailboxExecutor);
            replicatingMailOperator.setup(streamTask, streamConfig, output);
            return replicatingMailOperator;
        }

        public void setChainingStrategy(ChainingStrategy chainingStrategy) {
        }

        public ChainingStrategy getChainingStrategy() {
            return ChainingStrategy.ALWAYS;
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return ReplicatingMailOperator.class;
        }
    }

    @Test
    public void testAvoidTaskStarvation() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOperatorChain(new OperatorID(), (StreamOperatorFactory<?>) new ReplicatingMailOperatorFactory()).chain(new OperatorID(), (OneInputStreamOperatorFactory) new ReplicatingMailOperatorFactory(), (TypeSerializer) IntSerializer.INSTANCE).finish();
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(0));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(0));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(0));
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        MatcherAssert.assertThat((List) oneInputStreamTaskTestHarness.getOutput().stream().map(obj -> {
            return (Integer) ((StreamRecord) obj).getValue();
        }).collect(Collectors.toList()), CoreMatchers.is(Arrays.asList(0, 2, 4)));
    }
}
