package org.apache.flink.runtime.io.network;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/io/network/NetworkEnvironmentTest.class */
public class NetworkEnvironmentTest {
    private static final int numBuffers = 1024;
    private static final int memorySegmentSize = 128;

    @Parameterized.Parameter
    public boolean enableCreditBasedFlowControl;

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Parameterized.Parameters(name = "Credit-based = {0}")
    public static List<Boolean> parameters() {
        return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
    }

    @Test
    public void testRegisterTaskUsesBoundedBuffers() throws Exception {
        NetworkEnvironment networkEnvironment = new NetworkEnvironment(new NetworkBufferPool(numBuffers, memorySegmentSize), new LocalConnectionManager(), new ResultPartitionManager(), new TaskEventDispatcher(), new KvStateRegistry(), (KvStateServer) null, (KvStateClientProxy) null, IOManager.IOMode.SYNC, 0, 0, 2, 8, this.enableCreditBasedFlowControl);
        ResultPartition[] resultPartitionArr = {createResultPartition(ResultPartitionType.PIPELINED, 2), createResultPartition(ResultPartitionType.BLOCKING, 2), createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 2), createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 8)};
        SingleInputGate createSingleInputGate = createSingleInputGate(ResultPartitionType.PIPELINED, 2);
        SingleInputGate createSingleInputGate2 = createSingleInputGate(ResultPartitionType.BLOCKING, 2);
        SingleInputGate createSingleInputGate3 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 2);
        SingleInputGate createSingleInputGate4 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 8);
        SingleInputGate[] singleInputGateArr = {createSingleInputGate, createSingleInputGate2, createSingleInputGate3, createSingleInputGate4};
        Task task = (Task) Mockito.mock(Task.class);
        Mockito.when(task.getProducedPartitions()).thenReturn(resultPartitionArr);
        Mockito.when(task.getAllInputGates()).thenReturn(singleInputGateArr);
        networkEnvironment.registerTask(task);
        Assert.assertEquals(r0.getNumberOfSubpartitions(), r0.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals(r0.getNumberOfSubpartitions(), r0.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals(r0.getNumberOfSubpartitions(), r0.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals(r0.getNumberOfSubpartitions(), r0.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals(2147483647L, r0.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals(2147483647L, r0.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals(12L, r0.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals(24L, r0.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals(this.enableCreditBasedFlowControl ? 0L : 2L, createSingleInputGate.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals(this.enableCreditBasedFlowControl ? 0L : 2L, createSingleInputGate2.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals(this.enableCreditBasedFlowControl ? 0L : 2L, createSingleInputGate3.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals(this.enableCreditBasedFlowControl ? 0L : 8L, createSingleInputGate4.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals(2147483647L, createSingleInputGate.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals(2147483647L, createSingleInputGate2.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals(this.enableCreditBasedFlowControl ? 8L : 12L, createSingleInputGate3.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals(this.enableCreditBasedFlowControl ? 8L : 24L, createSingleInputGate4.getBufferPool().getMaxNumberOfMemorySegments());
        int i = this.enableCreditBasedFlowControl ? 1 : 0;
        ((SingleInputGate) Mockito.verify(createSingleInputGate, Mockito.times(i))).assignExclusiveSegments(networkEnvironment.getNetworkBufferPool(), 2);
        ((SingleInputGate) Mockito.verify(createSingleInputGate2, Mockito.times(i))).assignExclusiveSegments(networkEnvironment.getNetworkBufferPool(), 2);
        ((SingleInputGate) Mockito.verify(createSingleInputGate3, Mockito.times(i))).assignExclusiveSegments(networkEnvironment.getNetworkBufferPool(), 2);
        ((SingleInputGate) Mockito.verify(createSingleInputGate4, Mockito.times(i))).assignExclusiveSegments(networkEnvironment.getNetworkBufferPool(), 2);
        for (ResultPartition resultPartition : resultPartitionArr) {
            resultPartition.release();
        }
        for (SingleInputGate singleInputGate : singleInputGateArr) {
            singleInputGate.releaseAllResources();
        }
        networkEnvironment.shutdown();
    }

    @Test
    public void testRegisterTaskWithLimitedBuffers() throws Exception {
        testRegisterTaskWithLimitedBuffers(!this.enableCreditBasedFlowControl ? 20 : 10 + (10 * ((Integer) TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue()).intValue()));
    }

    @Test
    public void testRegisterTaskWithInsufficientBuffers() throws Exception {
        int intValue = !this.enableCreditBasedFlowControl ? 19 : (10 + (10 * ((Integer) TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue()).intValue())) - 1;
        this.expectedException.expect(IOException.class);
        this.expectedException.expectMessage("Insufficient number of network buffers");
        testRegisterTaskWithLimitedBuffers(intValue);
    }

    private void testRegisterTaskWithLimitedBuffers(int i) throws Exception {
        NetworkEnvironment networkEnvironment = new NetworkEnvironment(new NetworkBufferPool(i, memorySegmentSize), new LocalConnectionManager(), new ResultPartitionManager(), new TaskEventDispatcher(), new KvStateRegistry(), (KvStateServer) null, (KvStateClientProxy) null, IOManager.IOMode.SYNC, 0, 0, 2, 8, this.enableCreditBasedFlowControl);
        ConnectionManager createDummyConnectionManager = InputChannelTestUtils.createDummyConnectionManager();
        ResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED, 2);
        ResultPartition createResultPartition2 = createResultPartition(ResultPartitionType.BLOCKING, 2);
        ResultPartition createResultPartition3 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 2);
        ResultPartition createResultPartition4 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 4);
        ResultPartition[] resultPartitionArr = {createResultPartition, createResultPartition2, createResultPartition3, createResultPartition4};
        SingleInputGate createSingleInputGate = createSingleInputGate(ResultPartitionType.PIPELINED, 2);
        SingleInputGate createSingleInputGate2 = createSingleInputGate(ResultPartitionType.BLOCKING, 2);
        SingleInputGate createSingleInputGate3 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 2);
        SingleInputGate createSingleInputGate4 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 4);
        SingleInputGate[] singleInputGateArr = {createSingleInputGate, createSingleInputGate2, createSingleInputGate3, createSingleInputGate4};
        if (this.enableCreditBasedFlowControl) {
            createRemoteInputChannel(createSingleInputGate4, 0, createResultPartition, createDummyConnectionManager);
            createRemoteInputChannel(createSingleInputGate4, 0, createResultPartition2, createDummyConnectionManager);
            createRemoteInputChannel(createSingleInputGate4, 0, createResultPartition3, createDummyConnectionManager);
            createRemoteInputChannel(createSingleInputGate4, 0, createResultPartition4, createDummyConnectionManager);
            createRemoteInputChannel(createSingleInputGate, 1, createResultPartition, createDummyConnectionManager);
            createRemoteInputChannel(createSingleInputGate, 1, createResultPartition4, createDummyConnectionManager);
            createRemoteInputChannel(createSingleInputGate2, 1, createResultPartition2, createDummyConnectionManager);
            createRemoteInputChannel(createSingleInputGate2, 2, createResultPartition4, createDummyConnectionManager);
            createRemoteInputChannel(createSingleInputGate3, 1, createResultPartition3, createDummyConnectionManager);
            createRemoteInputChannel(createSingleInputGate3, 3, createResultPartition4, createDummyConnectionManager);
        }
        Task task = (Task) Mockito.mock(Task.class);
        Mockito.when(task.getProducedPartitions()).thenReturn(resultPartitionArr);
        Mockito.when(task.getAllInputGates()).thenReturn(singleInputGateArr);
        networkEnvironment.registerTask(task);
        Assert.assertEquals(2147483647L, createResultPartition.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals(2147483647L, createResultPartition2.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals(12L, createResultPartition3.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals(16L, createResultPartition4.getBufferPool().getMaxNumberOfMemorySegments());
        for (ResultPartition resultPartition : resultPartitionArr) {
            Assert.assertEquals(resultPartition.getNumberOfSubpartitions(), resultPartition.getBufferPool().getNumberOfRequiredMemorySegments());
            Assert.assertEquals(resultPartition.getNumberOfSubpartitions(), resultPartition.getBufferPool().getNumBuffers());
        }
        Assert.assertEquals(this.enableCreditBasedFlowControl ? 0L : 2L, createSingleInputGate.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals(this.enableCreditBasedFlowControl ? 0L : 2L, createSingleInputGate2.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals(this.enableCreditBasedFlowControl ? 0L : 2L, createSingleInputGate3.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals(this.enableCreditBasedFlowControl ? 0L : 4L, createSingleInputGate4.getBufferPool().getNumberOfRequiredMemorySegments());
        Assert.assertEquals(2147483647L, createSingleInputGate.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals(2147483647L, createSingleInputGate2.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals(this.enableCreditBasedFlowControl ? 8L : 12L, createSingleInputGate3.getBufferPool().getMaxNumberOfMemorySegments());
        Assert.assertEquals(this.enableCreditBasedFlowControl ? 8L : 16L, createSingleInputGate4.getBufferPool().getMaxNumberOfMemorySegments());
        int i2 = this.enableCreditBasedFlowControl ? 1 : 0;
        ((SingleInputGate) Mockito.verify(createSingleInputGate, Mockito.times(i2))).assignExclusiveSegments(networkEnvironment.getNetworkBufferPool(), 2);
        ((SingleInputGate) Mockito.verify(createSingleInputGate2, Mockito.times(i2))).assignExclusiveSegments(networkEnvironment.getNetworkBufferPool(), 2);
        ((SingleInputGate) Mockito.verify(createSingleInputGate3, Mockito.times(i2))).assignExclusiveSegments(networkEnvironment.getNetworkBufferPool(), 2);
        ((SingleInputGate) Mockito.verify(createSingleInputGate4, Mockito.times(i2))).assignExclusiveSegments(networkEnvironment.getNetworkBufferPool(), 2);
        for (ResultPartition resultPartition2 : resultPartitionArr) {
            resultPartition2.release();
        }
        for (SingleInputGate singleInputGate : singleInputGateArr) {
            singleInputGate.releaseAllResources();
        }
        networkEnvironment.shutdown();
    }

    private static ResultPartition createResultPartition(ResultPartitionType resultPartitionType, int i) {
        return new ResultPartition("TestTask-" + resultPartitionType + ":" + i, (TaskActions) Mockito.mock(TaskActions.class), new JobID(), new ResultPartitionID(), resultPartitionType, i, i, (ResultPartitionManager) Mockito.mock(ResultPartitionManager.class), (ResultPartitionConsumableNotifier) Mockito.mock(ResultPartitionConsumableNotifier.class), (IOManager) Mockito.mock(IOManager.class), false);
    }

    private SingleInputGate createSingleInputGate(ResultPartitionType resultPartitionType, int i) {
        return (SingleInputGate) PowerMockito.spy(new SingleInputGate("Test Task Name", new JobID(), new IntermediateDataSetID(), resultPartitionType, 0, i, (TaskActions) Mockito.mock(TaskActions.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), this.enableCreditBasedFlowControl));
    }

    private static void createRemoteInputChannel(SingleInputGate singleInputGate, int i, ResultPartition resultPartition, ConnectionManager connectionManager) {
        singleInputGate.setInputChannel(resultPartition.getPartitionId().getPartitionId(), new RemoteInputChannel(singleInputGate, i, resultPartition.getPartitionId(), (ConnectionID) Mockito.mock(ConnectionID.class), connectionManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()));
    }
}
