package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
import org.apache.flink.shaded.com.google.common.collect.Lists;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.class */
public class SpilledSubpartitionViewTest {
    private static final IOManager ioManager = new IOManagerAsync();
    private static final ExecutorService executor = Executors.newCachedThreadPool();
    private static final TestInfiniteBufferProvider writerBufferPool = new TestInfiniteBufferProvider();
    private IOManager.IOMode ioMode;

    public SpilledSubpartitionViewTest(IOManager.IOMode iOMode) {
        this.ioMode = iOMode;
    }

    @AfterClass
    public static void shutdown() {
        ioManager.shutdown();
        executor.shutdown();
    }

    @Parameterized.Parameters
    public static Collection<Object[]> ioMode() {
        return Arrays.asList(new Object[]{IOManager.IOMode.SYNC}, new Object[]{IOManager.IOMode.ASYNC});
    }

    @Test
    public void testReadMultipleFilesWithSingleBufferPool() throws Exception {
        BufferFileWriter[] bufferFileWriterArr = {createWriterAndWriteBuffers(ioManager, writerBufferPool, 512), createWriterAndWriteBuffers(ioManager, writerBufferPool, 512)};
        ResultSubpartitionView[] resultSubpartitionViewArr = new ResultSubpartitionView[bufferFileWriterArr.length];
        TestPooledBufferProvider testPooledBufferProvider = new TestPooledBufferProvider(2);
        ResultSubpartition resultSubpartition = (ResultSubpartition) Mockito.mock(ResultSubpartition.class);
        try {
            for (BufferFileWriter bufferFileWriter : bufferFileWriterArr) {
                bufferFileWriter.close();
            }
            for (int i = 0; i < resultSubpartitionViewArr.length; i++) {
                if (this.ioMode.isSynchronous()) {
                    resultSubpartitionViewArr[i] = new SpilledSubpartitionViewSyncIO(resultSubpartition, testPooledBufferProvider.getMemorySegmentSize(), bufferFileWriterArr[i].getChannelID(), 0L);
                } else {
                    resultSubpartitionViewArr[i] = new SpilledSubpartitionViewAsyncIO(resultSubpartition, testPooledBufferProvider, ioManager, bufferFileWriterArr[i].getChannelID(), 0L);
                }
            }
            ArrayList newArrayList = Lists.newArrayList();
            for (ResultSubpartitionView resultSubpartitionView : resultSubpartitionViewArr) {
                newArrayList.add(executor.submit(new TestSubpartitionConsumer(resultSubpartitionView, false, new TestConsumerCallback.RecyclingCallback())));
            }
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                try {
                    ((Future) it.next()).get(2L, TimeUnit.MINUTES);
                } catch (TimeoutException e) {
                    throw new TimeoutException("There has been a timeout in the test. This indicates that there is a bug/deadlock in the tested subpartition view. The timed out test was in " + this.ioMode + " mode.");
                }
            }
        } finally {
            for (BufferFileWriter bufferFileWriter2 : bufferFileWriterArr) {
                if (bufferFileWriter2 != null) {
                    bufferFileWriter2.deleteChannel();
                }
            }
            for (ResultSubpartitionView resultSubpartitionView2 : resultSubpartitionViewArr) {
                if (resultSubpartitionView2 != null) {
                    resultSubpartitionView2.releaseAllResources();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static BufferFileWriter createWriterAndWriteBuffers(IOManager iOManager, BufferProvider bufferProvider, int i) throws IOException {
        BufferFileWriter createBufferFileWriter = iOManager.createBufferFileWriter(iOManager.createChannel());
        for (int i2 = 0; i2 < i; i2++) {
            createBufferFileWriter.writeBlock(bufferProvider.requestBuffer());
        }
        createBufferFileWriter.writeBlock(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE));
        return createBufferFileWriter;
    }
}
