package org.apache.flink.runtime.io.network.partition;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.class */
public class InputGateConcurrentTest {

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest$ConsumerThread.class */
    private static class ConsumerThread extends CheckedThread {
        private final SingleInputGate gate;
        private final int numBuffers;

        ConsumerThread(SingleInputGate singleInputGate, int i) {
            super("consumer");
            this.gate = singleInputGate;
            this.numBuffers = i;
        }

        public void go() throws Exception {
            for (int i = this.numBuffers; i > 0; i--) {
                Assert.assertNotNull(this.gate.getNextBufferOrEvent());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest$PipelinedSubpartitionSource.class */
    private static class PipelinedSubpartitionSource extends Source {
        final PipelinedSubpartition partition;

        PipelinedSubpartitionSource(PipelinedSubpartition pipelinedSubpartition) {
            super();
            this.partition = pipelinedSubpartition;
        }

        @Override // org.apache.flink.runtime.io.network.partition.InputGateConcurrentTest.Source
        void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
            this.partition.add(bufferConsumer);
        }

        @Override // org.apache.flink.runtime.io.network.partition.InputGateConcurrentTest.Source
        void flush() {
            this.partition.flush();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest$ProducerThread.class */
    private static class ProducerThread extends CheckedThread {
        private final Random rnd;
        private final Source[] sources;
        private final int numTotal;
        private final int maxChunk;
        private final int yieldAfter;

        ProducerThread(Source[] sourceArr, int i, int i2, int i3) {
            super("producer");
            this.rnd = new Random();
            this.sources = sourceArr;
            this.numTotal = i;
            this.maxChunk = i2;
            this.yieldAfter = i3;
        }

        public void go() throws Exception {
            BufferConsumer createFilledBufferConsumer = BufferBuilderTestUtils.createFilledBufferConsumer(100);
            int i = this.numTotal - this.yieldAfter;
            int i2 = this.numTotal;
            while (i2 > 0) {
                int nextInt = this.rnd.nextInt(this.sources.length);
                int min = Math.min(i2, this.rnd.nextInt(this.maxChunk) + 1);
                Source source = this.sources[nextInt];
                for (int i3 = min; i3 > 0; i3--) {
                    source.addBufferConsumer(createFilledBufferConsumer.copy());
                }
                i2 -= min;
                if (i2 <= i) {
                    i -= this.yieldAfter;
                    Thread.yield();
                }
            }
            for (Source source2 : this.sources) {
                source2.flush();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest$RemoteChannelSource.class */
    private static class RemoteChannelSource extends Source {
        final RemoteInputChannel channel;
        private int seq;

        RemoteChannelSource(RemoteInputChannel remoteInputChannel) {
            super();
            this.seq = 0;
            this.channel = remoteInputChannel;
        }

        @Override // org.apache.flink.runtime.io.network.partition.InputGateConcurrentTest.Source
        void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception {
            try {
                Buffer build = bufferConsumer.build();
                Preconditions.checkState(bufferConsumer.isFinished(), "Handling of non finished buffers is not yet implemented");
                RemoteInputChannel remoteInputChannel = this.channel;
                int i = this.seq;
                this.seq = i + 1;
                remoteInputChannel.onBuffer(build, i, -1);
            } finally {
                bufferConsumer.close();
            }
        }

        @Override // org.apache.flink.runtime.io.network.partition.InputGateConcurrentTest.Source
        void flush() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest$Source.class */
    private static abstract class Source {
        private Source() {
        }

        abstract void addBufferConsumer(BufferConsumer bufferConsumer) throws Exception;

        abstract void flush();
    }

    @Test
    public void testConsumptionWithLocalChannels() throws Exception {
        ResultPartition resultPartition = (ResultPartition) Mockito.mock(ResultPartition.class);
        PipelinedSubpartition[] pipelinedSubpartitionArr = new PipelinedSubpartition[11];
        Source[] sourceArr = new Source[11];
        ResultPartitionManager createResultPartitionManager = InputChannelTestUtils.createResultPartitionManager(pipelinedSubpartitionArr);
        SingleInputGate singleInputGate = new SingleInputGate("Test Task Name", new JobID(), new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, 11, (TaskActions) Mockito.mock(TaskActions.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), true);
        for (int i = 0; i < 11; i++) {
            singleInputGate.setInputChannel(new IntermediateResultPartitionID(), new LocalInputChannel(singleInputGate, i, new ResultPartitionID(), createResultPartitionManager, (TaskEventDispatcher) Mockito.mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()));
            pipelinedSubpartitionArr[i] = new PipelinedSubpartition(0, resultPartition);
            sourceArr[i] = new PipelinedSubpartitionSource(pipelinedSubpartitionArr[i]);
        }
        ProducerThread producerThread = new ProducerThread(sourceArr, 11000, 4, 10);
        ConsumerThread consumerThread = new ConsumerThread(singleInputGate, 11000);
        producerThread.start();
        consumerThread.start();
        producerThread.sync();
        consumerThread.sync();
    }

    @Test
    public void testConsumptionWithRemoteChannels() throws Exception {
        ConnectionManager createDummyConnectionManager = InputChannelTestUtils.createDummyConnectionManager();
        Source[] sourceArr = new Source[11];
        SingleInputGate singleInputGate = new SingleInputGate("Test Task Name", new JobID(), new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, 11, (TaskActions) Mockito.mock(TaskActions.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), true);
        for (int i = 0; i < 11; i++) {
            RemoteInputChannel remoteInputChannel = new RemoteInputChannel(singleInputGate, i, new ResultPartitionID(), (ConnectionID) Mockito.mock(ConnectionID.class), createDummyConnectionManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
            singleInputGate.setInputChannel(new IntermediateResultPartitionID(), remoteInputChannel);
            sourceArr[i] = new RemoteChannelSource(remoteInputChannel);
        }
        ProducerThread producerThread = new ProducerThread(sourceArr, 11000, 4, 10);
        ConsumerThread consumerThread = new ConsumerThread(singleInputGate, 11000);
        producerThread.start();
        consumerThread.start();
        producerThread.sync();
        consumerThread.sync();
    }

    @Test
    public void testConsumptionWithMixedChannels() throws Exception {
        ArrayList arrayList = new ArrayList(61);
        int i = 0;
        while (i < 61) {
            arrayList.add(Boolean.valueOf(i < 20));
            i++;
        }
        Collections.shuffle(arrayList);
        ConnectionManager createDummyConnectionManager = InputChannelTestUtils.createDummyConnectionManager();
        ResultPartition resultPartition = (ResultPartition) Mockito.mock(ResultPartition.class);
        PipelinedSubpartition[] pipelinedSubpartitionArr = new PipelinedSubpartition[20];
        ResultPartitionManager createResultPartitionManager = InputChannelTestUtils.createResultPartitionManager(pipelinedSubpartitionArr);
        Source[] sourceArr = new Source[61];
        SingleInputGate singleInputGate = new SingleInputGate("Test Task Name", new JobID(), new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, 61, (TaskActions) Mockito.mock(TaskActions.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), true);
        int i2 = 0;
        for (int i3 = 0; i3 < 61; i3++) {
            if (((Boolean) arrayList.get(i3)).booleanValue()) {
                PipelinedSubpartition pipelinedSubpartition = new PipelinedSubpartition(0, resultPartition);
                int i4 = i2;
                i2++;
                pipelinedSubpartitionArr[i4] = pipelinedSubpartition;
                sourceArr[i3] = new PipelinedSubpartitionSource(pipelinedSubpartition);
                singleInputGate.setInputChannel(new IntermediateResultPartitionID(), new LocalInputChannel(singleInputGate, i3, new ResultPartitionID(), createResultPartitionManager, (TaskEventDispatcher) Mockito.mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()));
            } else {
                RemoteInputChannel remoteInputChannel = new RemoteInputChannel(singleInputGate, i3, new ResultPartitionID(), (ConnectionID) Mockito.mock(ConnectionID.class), createDummyConnectionManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
                singleInputGate.setInputChannel(new IntermediateResultPartitionID(), remoteInputChannel);
                sourceArr[i3] = new RemoteChannelSource(remoteInputChannel);
            }
        }
        ProducerThread producerThread = new ProducerThread(sourceArr, 61000, 4, 10);
        ConsumerThread consumerThread = new ConsumerThread(singleInputGate, 61000);
        producerThread.start();
        consumerThread.start();
        producerThread.sync();
        consumerThread.sync();
    }
}
