package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.Tuple2;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.class */
public class RemoteInputChannelTest {
    @Test
    public void testExceptionOnReordering() throws Exception {
        SingleInputGate singleInputGate = (SingleInputGate) Mockito.mock(SingleInputGate.class);
        RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel(singleInputGate);
        Buffer createBuffer = TestBufferFactory.createBuffer();
        createRemoteInputChannel.onBuffer(createBuffer.retain(), 0);
        createRemoteInputChannel.onBuffer(createBuffer, 29);
        try {
            createRemoteInputChannel.getNextBuffer();
            Assert.fail("Did not throw expected exception after enqueuing an out-of-order buffer.");
        } catch (Exception e) {
            Assert.assertFalse(createBuffer.isRecycled());
            createRemoteInputChannel.releaseAllResources();
            Assert.assertTrue(createBuffer.isRecycled());
        }
        ((SingleInputGate) Mockito.verify(singleInputGate, Mockito.times(2))).notifyChannelNonEmpty((InputChannel) Matchers.eq(createRemoteInputChannel));
    }

    @Test
    public void testConcurrentOnBufferAndRelease() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        final Buffer createBuffer = TestBufferFactory.createBuffer();
        try {
            SingleInputGate singleInputGate = (SingleInputGate) Mockito.mock(SingleInputGate.class);
            for (int i = 0; i < 8192; i++) {
                final RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel(singleInputGate);
                Callable<Void> callable = new Callable<Void>() { // from class: org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        do {
                            for (int i2 = 0; i2 < 128; i2++) {
                                createRemoteInputChannel.onBuffer(createBuffer.retain(), i2);
                            }
                        } while (!createRemoteInputChannel.isReleased());
                        return null;
                    }
                };
                Callable<Void> callable2 = new Callable<Void>() { // from class: org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest.2
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        createRemoteInputChannel.releaseAllResources();
                        return null;
                    }
                };
                ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(2);
                newArrayListWithCapacity.add(newFixedThreadPool.submit(callable));
                newArrayListWithCapacity.add(newFixedThreadPool.submit(callable2));
                Iterator it = newArrayListWithCapacity.iterator();
                while (it.hasNext()) {
                    ((Future) it.next()).get();
                }
                Assert.assertEquals("Resource leak during concurrent release and enqueue.", 0L, createRemoteInputChannel.getNumberOfQueuedBuffers());
            }
        } finally {
            newFixedThreadPool.shutdown();
            Assert.assertFalse(createBuffer.isRecycled());
            createBuffer.recycle();
            Assert.assertTrue(createBuffer.isRecycled());
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testRetriggerWithoutPartitionRequest() throws Exception {
        Tuple2<Integer, Integer> tuple2 = new Tuple2<>(500, 3000);
        createRemoteInputChannel((SingleInputGate) Mockito.mock(SingleInputGate.class), (PartitionRequestClient) Mockito.mock(PartitionRequestClient.class), tuple2).retriggerSubpartitionRequest(0);
    }

    @Test
    public void testPartitionRequestExponentialBackoff() throws Exception {
        Tuple2<Integer, Integer> tuple2 = new Tuple2<>(500, 3000);
        int[] iArr = {((Integer) tuple2._1()).intValue(), 1000, 2000, ((Integer) tuple2._2()).intValue()};
        PartitionRequestClient partitionRequestClient = (PartitionRequestClient) Mockito.mock(PartitionRequestClient.class);
        RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel((SingleInputGate) Mockito.mock(SingleInputGate.class), partitionRequestClient, tuple2);
        createRemoteInputChannel.requestSubpartition(0);
        ((PartitionRequestClient) Mockito.verify(partitionRequestClient)).requestSubpartition((ResultPartitionID) Matchers.eq(createRemoteInputChannel.partitionId), Matchers.eq(0), (RemoteInputChannel) Matchers.eq(createRemoteInputChannel), Matchers.eq(0));
        for (int i : iArr) {
            createRemoteInputChannel.retriggerSubpartitionRequest(0);
            ((PartitionRequestClient) Mockito.verify(partitionRequestClient)).requestSubpartition((ResultPartitionID) Matchers.eq(createRemoteInputChannel.partitionId), Matchers.eq(0), (RemoteInputChannel) Matchers.eq(createRemoteInputChannel), Matchers.eq(i));
        }
        try {
            createRemoteInputChannel.retriggerSubpartitionRequest(0);
            createRemoteInputChannel.getNextBuffer();
            Assert.fail("Did not throw expected exception.");
        } catch (Exception e) {
        }
    }

    @Test
    public void testPartitionRequestSingleBackoff() throws Exception {
        Tuple2<Integer, Integer> tuple2 = new Tuple2<>(500, 500);
        PartitionRequestClient partitionRequestClient = (PartitionRequestClient) Mockito.mock(PartitionRequestClient.class);
        RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel((SingleInputGate) Mockito.mock(SingleInputGate.class), partitionRequestClient, tuple2);
        createRemoteInputChannel.requestSubpartition(0);
        ((PartitionRequestClient) Mockito.verify(partitionRequestClient)).requestSubpartition((ResultPartitionID) Matchers.eq(createRemoteInputChannel.partitionId), Matchers.eq(0), (RemoteInputChannel) Matchers.eq(createRemoteInputChannel), Matchers.eq(0));
        createRemoteInputChannel.retriggerSubpartitionRequest(0);
        ((PartitionRequestClient) Mockito.verify(partitionRequestClient)).requestSubpartition((ResultPartitionID) Matchers.eq(createRemoteInputChannel.partitionId), Matchers.eq(0), (RemoteInputChannel) Matchers.eq(createRemoteInputChannel), ((Integer) Matchers.eq(tuple2._1())).intValue());
        try {
            createRemoteInputChannel.retriggerSubpartitionRequest(0);
            createRemoteInputChannel.getNextBuffer();
            Assert.fail("Did not throw expected exception.");
        } catch (Exception e) {
        }
    }

    @Test
    public void testPartitionRequestNoBackoff() throws Exception {
        Tuple2<Integer, Integer> tuple2 = new Tuple2<>(0, 0);
        PartitionRequestClient partitionRequestClient = (PartitionRequestClient) Mockito.mock(PartitionRequestClient.class);
        RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel((SingleInputGate) Mockito.mock(SingleInputGate.class), partitionRequestClient, tuple2);
        createRemoteInputChannel.requestSubpartition(0);
        ((PartitionRequestClient) Mockito.verify(partitionRequestClient)).requestSubpartition((ResultPartitionID) Matchers.eq(createRemoteInputChannel.partitionId), Matchers.eq(0), (RemoteInputChannel) Matchers.eq(createRemoteInputChannel), Matchers.eq(0));
        try {
            createRemoteInputChannel.retriggerSubpartitionRequest(0);
            createRemoteInputChannel.getNextBuffer();
            Assert.fail("Did not throw expected exception.");
        } catch (Exception e) {
        }
    }

    @Test
    public void testOnFailedPartitionRequest() throws Exception {
        ConnectionManager connectionManager = (ConnectionManager) Mockito.mock(ConnectionManager.class);
        Mockito.when(connectionManager.createPartitionRequestClient((ConnectionID) Matchers.any(ConnectionID.class))).thenReturn(Mockito.mock(PartitionRequestClient.class));
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        SingleInputGate singleInputGate = (SingleInputGate) Mockito.mock(SingleInputGate.class);
        new RemoteInputChannel(singleInputGate, 0, resultPartitionID, (ConnectionID) Mockito.mock(ConnectionID.class), connectionManager, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()).onFailedPartitionRequest();
        ((SingleInputGate) Mockito.verify(singleInputGate)).triggerPartitionStateCheck((ResultPartitionID) Matchers.eq(resultPartitionID));
    }

    @Test(expected = CancelTaskException.class)
    public void testProducerFailedException() throws Exception {
        ConnectionManager connectionManager = (ConnectionManager) Mockito.mock(ConnectionManager.class);
        Mockito.when(connectionManager.createPartitionRequestClient((ConnectionID) Matchers.any(ConnectionID.class))).thenReturn(Mockito.mock(PartitionRequestClient.class));
        RemoteInputChannel remoteInputChannel = new RemoteInputChannel((SingleInputGate) Mockito.mock(SingleInputGate.class), 0, new ResultPartitionID(), (ConnectionID) Mockito.mock(ConnectionID.class), connectionManager, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
        remoteInputChannel.onError(new ProducerFailedException(new RuntimeException("Expected test exception.")));
        remoteInputChannel.requestSubpartition(0);
        remoteInputChannel.getNextBuffer();
    }

    @Test
    public void testRecycleExclusiveBufferBeforeReleased() throws Exception {
        RemoteInputChannel remoteInputChannel = (RemoteInputChannel) Mockito.spy(createRemoteInputChannel((SingleInputGate) Mockito.mock(SingleInputGate.class)));
        remoteInputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, remoteInputChannel));
        Assert.assertEquals("There should be one buffer available after recycle.", 1L, remoteInputChannel.getNumberOfAvailableBuffers());
        ((RemoteInputChannel) Mockito.verify(remoteInputChannel, Mockito.times(1))).notifyCreditAvailable();
        remoteInputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, remoteInputChannel));
        Assert.assertEquals("There should be two buffers available after recycle.", 2L, remoteInputChannel.getNumberOfAvailableBuffers());
        ((RemoteInputChannel) Mockito.verify(remoteInputChannel, Mockito.times(1))).notifyCreditAvailable();
    }

    @Test
    public void testRecycleExclusiveBufferAfterReleased() throws Exception {
        SingleInputGate singleInputGate = (SingleInputGate) Mockito.mock(SingleInputGate.class);
        RemoteInputChannel remoteInputChannel = (RemoteInputChannel) Mockito.spy(createRemoteInputChannel(singleInputGate));
        remoteInputChannel.releaseAllResources();
        remoteInputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, remoteInputChannel));
        Assert.assertEquals("Resource leak during recycling buffer after channel is released.", 0L, remoteInputChannel.getNumberOfAvailableBuffers());
        ((RemoteInputChannel) Mockito.verify(remoteInputChannel, Mockito.times(0))).notifyCreditAvailable();
        ((SingleInputGate) Mockito.verify(singleInputGate, Mockito.times(1))).returnExclusiveSegments(Matchers.anyListOf(MemorySegment.class));
    }

    @Test
    public void testReleaseExclusiveBuffers() throws Exception {
        SingleInputGate singleInputGate = (SingleInputGate) Mockito.mock(SingleInputGate.class);
        RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel(singleInputGate);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            arrayList.add(MemorySegmentFactory.allocateUnpooledSegment(1024, createRemoteInputChannel));
        }
        createRemoteInputChannel.assignExclusiveSegments(arrayList);
        Assert.assertEquals("The number of available buffers is not equal to the assigned amount.", 2L, createRemoteInputChannel.getNumberOfAvailableBuffers());
        createRemoteInputChannel.releaseAllResources();
        Assert.assertEquals("Resource leak after channel is released.", 0L, createRemoteInputChannel.getNumberOfAvailableBuffers());
        ((SingleInputGate) Mockito.verify(singleInputGate, Mockito.times(1))).returnExclusiveSegments(Matchers.anyListOf(MemorySegment.class));
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate singleInputGate) throws IOException, InterruptedException {
        return createRemoteInputChannel(singleInputGate, (PartitionRequestClient) Mockito.mock(PartitionRequestClient.class), new Tuple2<>(0, 0));
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate singleInputGate, PartitionRequestClient partitionRequestClient, Tuple2<Integer, Integer> tuple2) throws IOException, InterruptedException {
        ConnectionManager connectionManager = (ConnectionManager) Mockito.mock(ConnectionManager.class);
        Mockito.when(connectionManager.createPartitionRequestClient((ConnectionID) Matchers.any(ConnectionID.class))).thenReturn(partitionRequestClient);
        return new RemoteInputChannel(singleInputGate, 0, new ResultPartitionID(), (ConnectionID) Mockito.mock(ConnectionID.class), connectionManager, ((Integer) tuple2._1()).intValue(), ((Integer) tuple2._2()).intValue(), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
    }
}
