package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionTest;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.class */
public class RecoveredInputChannelTest {
    private final boolean isRemote;

    @Parameterized.Parameters(name = "isRemote = {0}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{true}, new Object[]{false});
    }

    public RecoveredInputChannelTest(boolean z) {
        this.isRemote = z;
    }

    @Test
    public void testConcurrentReadStateAndProcess() throws Exception {
        testConcurrentReadStateAndProcess(this.isRemote);
    }

    @Test
    public void testConcurrentReadStateAndRelease() throws Exception {
        testConcurrentReadStateAndRelease(this.isRemote);
    }

    @Test
    public void testConcurrentReadStateAndProcessAndRelease() throws Exception {
        testConcurrentReadStateAndProcessAndRelease(this.isRemote);
    }

    @Test
    public void testReadEmptyState() throws Exception {
        testReadEmptyStateOrThrowException(this.isRemote, ChannelStateReader.NO_OP);
    }

    @Test(expected = IOException.class)
    public void testReadStateWithException() throws Exception {
        testReadEmptyStateOrThrowException(this.isRemote, new ResultPartitionTest.ChannelStateReaderWithException());
    }

    private void testReadEmptyStateOrThrowException(boolean z, ChannelStateReader channelStateReader) throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
        SingleInputGate createInputGate = createInputGate(networkBufferPool);
        InputChannel createRecoveredChannel = createRecoveredChannel(z, createInputGate);
        try {
            createInputGate.setInputChannels(new InputChannel[]{createRecoveredChannel});
            createInputGate.setup();
            createRecoveredChannel.readRecoveredState(channelStateReader);
            Assert.assertEquals(1L, createRecoveredChannel.getNumberOfQueuedBuffers());
            Assert.assertFalse(createRecoveredChannel.getNextBuffer().isPresent());
            Assert.assertTrue(createRecoveredChannel.getStateConsumedFuture().isDone());
            createInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            Assert.assertEquals(10L, networkBufferPool.getNumberOfAvailableMemorySegments());
            networkBufferPool.destroy();
        } catch (Throwable th) {
            createInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            Assert.assertEquals(10L, networkBufferPool.getNumberOfAvailableMemorySegments());
            networkBufferPool.destroy();
            throw th;
        }
    }

    private void testConcurrentReadStateAndProcess(boolean z) throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
        SingleInputGate createInputGate = createInputGate(networkBufferPool);
        InputChannel createRecoveredChannel = createRecoveredChannel(z, createInputGate);
        int[] iArr = {1, 2, 3, 4};
        ResultPartitionTest.FiniteChannelStateReader finiteChannelStateReader = new ResultPartitionTest.FiniteChannelStateReader(15, iArr);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        try {
            createInputGate.setInputChannels(new InputChannel[]{createRecoveredChannel});
            createInputGate.setup();
            RemoteInputChannelTest.submitTasksAndWaitForResults(newFixedThreadPool, new Callable[]{readRecoveredStateTask(createRecoveredChannel, finiteChannelStateReader, false), processRecoveredBufferTask(createRecoveredChannel, 15, iArr, false)});
            RemoteInputChannelTest.cleanup(networkBufferPool, newFixedThreadPool, null, null, createRecoveredChannel);
        } catch (Throwable th) {
            RemoteInputChannelTest.cleanup(networkBufferPool, newFixedThreadPool, null, null, createRecoveredChannel);
            throw th;
        }
    }

    private void testConcurrentReadStateAndRelease(boolean z) throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
        SingleInputGate createInputGate = createInputGate(networkBufferPool);
        InputChannel createRecoveredChannel = createRecoveredChannel(z, createInputGate);
        ResultPartitionTest.FiniteChannelStateReader finiteChannelStateReader = new ResultPartitionTest.FiniteChannelStateReader(15, new int[]{1, 2, 3, 4});
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        try {
            createInputGate.setInputChannels(new InputChannel[]{createRecoveredChannel});
            createInputGate.setup();
            RemoteInputChannelTest.submitTasksAndWaitForResults(newFixedThreadPool, new Callable[]{readRecoveredStateTask(createRecoveredChannel, finiteChannelStateReader, true), releaseChannelTask(createRecoveredChannel)});
            RemoteInputChannelTest.cleanup(networkBufferPool, newFixedThreadPool, null, null, createRecoveredChannel);
        } catch (Throwable th) {
            RemoteInputChannelTest.cleanup(networkBufferPool, newFixedThreadPool, null, th, createRecoveredChannel);
        }
    }

    private void testConcurrentReadStateAndProcessAndRelease(boolean z) throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
        SingleInputGate createInputGate = createInputGate(networkBufferPool);
        InputChannel createRecoveredChannel = createRecoveredChannel(z, createInputGate);
        int[] iArr = {1, 2, 3, 4};
        ResultPartitionTest.FiniteChannelStateReader finiteChannelStateReader = new ResultPartitionTest.FiniteChannelStateReader(15, iArr);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        try {
            createInputGate.setInputChannels(new InputChannel[]{createRecoveredChannel});
            createInputGate.setup();
            RemoteInputChannelTest.submitTasksAndWaitForResults(newFixedThreadPool, new Callable[]{readRecoveredStateTask(createRecoveredChannel, finiteChannelStateReader, true), processRecoveredBufferTask(createRecoveredChannel, 15, iArr, true), releaseChannelTask(createRecoveredChannel)});
            RemoteInputChannelTest.cleanup(networkBufferPool, newFixedThreadPool, null, null, createRecoveredChannel);
        } catch (Throwable th) {
            RemoteInputChannelTest.cleanup(networkBufferPool, newFixedThreadPool, null, null, createRecoveredChannel);
            throw th;
        }
    }

    private Callable<Void> readRecoveredStateTask(RecoveredInputChannel recoveredInputChannel, ChannelStateReader channelStateReader, boolean z) {
        return () -> {
            AssertionError assertionError;
            try {
                recoveredInputChannel.readRecoveredState(channelStateReader);
                return null;
            } finally {
            }
        };
    }

    private Callable<Void> processRecoveredBufferTask(RecoveredInputChannel recoveredInputChannel, int i, int[] iArr, boolean z) {
        return () -> {
            AssertionError assertionError;
            int i2 = 0;
            while (i2 < i) {
                if (z && recoveredInputChannel.isReleased()) {
                    return null;
                }
                if (recoveredInputChannel.getNumberOfQueuedBuffers() == 0) {
                    Thread.sleep(1L);
                } else {
                    try {
                        Optional nextBuffer = recoveredInputChannel.getNextBuffer();
                        if (nextBuffer.isPresent()) {
                            Buffer buffer = ((InputChannel.BufferAndAvailability) nextBuffer.get()).buffer();
                            BufferBuilderAndConsumerTest.assertContent(buffer, null, iArr);
                            buffer.recycleBuffer();
                            i2++;
                        }
                    } finally {
                    }
                }
            }
            return null;
        };
    }

    private Callable<Void> releaseChannelTask(RecoveredInputChannel recoveredInputChannel) {
        return () -> {
            recoveredInputChannel.releaseAllResources();
            return null;
        };
    }

    private RecoveredInputChannel createRecoveredChannel(boolean z, SingleInputGate singleInputGate) {
        return z ? new InputChannelBuilder().buildRemoteRecoveredChannel(singleInputGate) : new InputChannelBuilder().buildLocalRecoveredChannel(singleInputGate);
    }

    private SingleInputGate createInputGate(NetworkBufferPool networkBufferPool) throws Exception {
        return new SingleInputGateBuilder().setBufferPoolFactory(networkBufferPool.createBufferPool(8, 8)).setSegmentProvider(networkBufferPool).build();
    }
}
