package org.apache.flink.runtime.io.network.partition;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Collectors;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.class */
public class BoundedBlockingSubpartitionWriteReadTest {
    private static FileChannelManager fileChannelManager;
    private static final int BUFFER_SIZE = 1048576;

    @Parameterized.Parameter
    public BoundedBlockingSubpartitionType type;
    private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();

    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest$LongReader.class */
    public static final class LongReader extends CheckedThread {
        private final ResultSubpartitionView reader;
        private final long numLongs;
        private final int numBuffers;

        LongReader(ResultSubpartitionView resultSubpartitionView, long j, int i) {
            this.reader = resultSubpartitionView;
            this.numLongs = j;
            this.numBuffers = i;
        }

        public void go() throws Exception {
            BoundedBlockingSubpartitionWriteReadTest.readLongs(this.reader, this.numLongs, this.numBuffers);
        }
    }

    @Parameterized.Parameters(name = "type = {0}")
    public static Collection<Object[]> modes() {
        return (Collection) Arrays.stream(BoundedBlockingSubpartitionType.values()).map(boundedBlockingSubpartitionType -> {
            return new Object[]{boundedBlockingSubpartitionType};
        }).collect(Collectors.toList());
    }

    @BeforeClass
    public static void setUp() {
        fileChannelManager = new FileChannelManagerImpl(new String[]{tempDir}, "testing");
    }

    @AfterClass
    public static void shutdown() throws Exception {
        fileChannelManager.close();
    }

    @Test
    public void testWriteAndReadData() throws Exception {
        BoundedBlockingSubpartition createAndFillPartition = createAndFillPartition(15000000L);
        ResultSubpartitionView createReadView = createAndFillPartition.createReadView(() -> {
        });
        readLongs(createReadView, 15000000L, createAndFillPartition.getBuffersInBacklog());
        createReadView.releaseAllResources();
        createAndFillPartition.release();
    }

    @Test
    public void testRead10ConsumersSequential() throws Exception {
        BoundedBlockingSubpartition createAndFillPartition = createAndFillPartition(10000000L);
        for (int i = 0; i < 10; i++) {
            ResultSubpartitionView createReadView = createAndFillPartition.createReadView(() -> {
            });
            readLongs(createReadView, 10000000L, createAndFillPartition.getBuffersInBacklog());
            createReadView.releaseAllResources();
        }
        createAndFillPartition.release();
    }

    @Test
    public void testRead10ConsumersConcurrent() throws Exception {
        BoundedBlockingSubpartition createAndFillPartition = createAndFillPartition(15000000L);
        LongReader[] createSubpartitionLongReaders = createSubpartitionLongReaders(createAndFillPartition, 10, 15000000, createAndFillPartition.getBuffersInBacklog());
        for (LongReader longReader : createSubpartitionLongReaders) {
            longReader.start();
        }
        for (LongReader longReader2 : createSubpartitionLongReaders) {
            longReader2.sync();
        }
        createAndFillPartition.release();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void readLongs(ResultSubpartitionView resultSubpartitionView, long j, int i) throws Exception {
        long j2 = 0;
        int i2 = i - 1;
        while (true) {
            ResultSubpartition.BufferAndBacklog nextBuffer = resultSubpartitionView.getNextBuffer();
            if (nextBuffer == null || !nextBuffer.buffer().isBuffer()) {
                break;
            }
            Assert.assertTrue(nextBuffer.isMoreAvailable());
            Assert.assertEquals(i2, nextBuffer.buffersInBacklog());
            ByteBuffer nioBufferReadable = nextBuffer.buffer().getNioBufferReadable();
            while (nioBufferReadable.hasRemaining()) {
                long j3 = j2;
                j2 = j3 + 1;
                Assert.assertEquals(j3, nioBufferReadable.getLong());
            }
            nextBuffer.buffer().recycleBuffer();
            i2--;
        }
        Assert.assertEquals(j, j2);
        Assert.assertEquals(-1L, i2);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.flink.core.memory.MemorySegment, long] */
    private static void writeLongs(BoundedBlockingSubpartition boundedBlockingSubpartition, long j) throws IOException {
        ?? allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE);
        long j2 = 0;
        while (j > 0) {
            int i = 0;
            while (j > 0 && i <= allocateUnpooledSegment.size() - 8) {
                j2++;
                allocateUnpooledSegment.putLongBigEndian(i, (long) allocateUnpooledSegment);
                j--;
                i += 8;
            }
            boundedBlockingSubpartition.add(new BufferConsumer((MemorySegment) allocateUnpooledSegment, memorySegment -> {
            }, i, true));
            boundedBlockingSubpartition.flush();
        }
    }

    private BoundedBlockingSubpartition createAndFillPartition(long j) throws IOException {
        BoundedBlockingSubpartition createSubpartition = createSubpartition();
        writeLongs(createSubpartition, j);
        createSubpartition.finish();
        return createSubpartition;
    }

    private BoundedBlockingSubpartition createSubpartition() throws IOException {
        return this.type.create(0, PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING, fileChannelManager), new File(TMP_FOLDER.newFolder(), "partitiondata"), BUFFER_SIZE);
    }

    private static LongReader[] createSubpartitionLongReaders(BoundedBlockingSubpartition boundedBlockingSubpartition, int i, int i2, int i3) throws IOException {
        LongReader[] longReaderArr = new LongReader[i];
        for (int i4 = 0; i4 < i; i4++) {
            longReaderArr[i4] = new LongReader(boundedBlockingSubpartition.createReadView(() -> {
            }), i2, i3);
        }
        return longReaderArr;
    }
}
