package org.apache.flink.runtime.io.network.partition;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.Preconditions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.class */
public class PipelinedSubpartitionTest extends SubpartitionTestBase {
    private static final ExecutorService executorService = Executors.newCachedThreadPool();

    @AfterClass
    public static void shutdownExecutorService() throws Exception {
        executorService.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.SubpartitionTestBase
    /* renamed from: createSubpartition, reason: merged with bridge method [inline-methods] */
    public PipelinedSubpartition mo98createSubpartition() {
        return new PipelinedSubpartition(0, (ResultPartition) Mockito.mock(ResultPartition.class));
    }

    @Test(expected = IllegalStateException.class)
    public void testAddTwoNonFinishedBuffer() throws Exception {
        PipelinedSubpartition mo98createSubpartition = mo98createSubpartition();
        AwaitableBufferAvailablityListener awaitableBufferAvailablityListener = new AwaitableBufferAvailablityListener();
        ResultSubpartitionView createReadView = mo98createSubpartition.createReadView(awaitableBufferAvailablityListener);
        awaitableBufferAvailablityListener.resetNotificationCounters();
        try {
            mo98createSubpartition.add(BufferBuilderTestUtils.createBufferBuilder().createBufferConsumer());
            mo98createSubpartition.add(BufferBuilderTestUtils.createBufferBuilder().createBufferConsumer());
            Assert.assertNull(createReadView.getNextBuffer());
            mo98createSubpartition.release();
        } catch (Throwable th) {
            mo98createSubpartition.release();
            throw th;
        }
    }

    @Test
    public void testAddEmptyNonFinishedBuffer() throws Exception {
        PipelinedSubpartition mo98createSubpartition = mo98createSubpartition();
        AwaitableBufferAvailablityListener awaitableBufferAvailablityListener = new AwaitableBufferAvailablityListener();
        ResultSubpartitionView createReadView = mo98createSubpartition.createReadView(awaitableBufferAvailablityListener);
        awaitableBufferAvailablityListener.resetNotificationCounters();
        try {
            Assert.assertEquals(0L, awaitableBufferAvailablityListener.getNumNotifications());
            BufferBuilder createBufferBuilder = BufferBuilderTestUtils.createBufferBuilder();
            mo98createSubpartition.add(createBufferBuilder.createBufferConsumer());
            Assert.assertEquals(0L, awaitableBufferAvailablityListener.getNumNotifications());
            Assert.assertNull(createReadView.getNextBuffer());
            createBufferBuilder.finish();
            mo98createSubpartition.add(BufferBuilderTestUtils.createBufferBuilder().createBufferConsumer());
            Assert.assertEquals(1L, awaitableBufferAvailablityListener.getNumNotifications());
            Assert.assertNull(createReadView.getNextBuffer());
            Assert.assertEquals(1L, mo98createSubpartition.getBuffersInBacklog());
            createReadView.releaseAllResources();
            mo98createSubpartition.release();
        } catch (Throwable th) {
            createReadView.releaseAllResources();
            mo98createSubpartition.release();
            throw th;
        }
    }

    @Test
    public void testAddNonEmptyNotFinishedBuffer() throws Exception {
        PipelinedSubpartition mo98createSubpartition = mo98createSubpartition();
        AwaitableBufferAvailablityListener awaitableBufferAvailablityListener = new AwaitableBufferAvailablityListener();
        ResultSubpartitionView createReadView = mo98createSubpartition.createReadView(awaitableBufferAvailablityListener);
        awaitableBufferAvailablityListener.resetNotificationCounters();
        try {
            Assert.assertEquals(0L, awaitableBufferAvailablityListener.getNumNotifications());
            BufferBuilder createBufferBuilder = BufferBuilderTestUtils.createBufferBuilder();
            createBufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
            mo98createSubpartition.add(createBufferBuilder.createBufferConsumer());
            assertNextBuffer(createReadView, 1024, false, 1, false, false);
            Assert.assertEquals(1L, mo98createSubpartition.getBuffersInBacklog());
            createReadView.releaseAllResources();
            mo98createSubpartition.release();
        } catch (Throwable th) {
            createReadView.releaseAllResources();
            mo98createSubpartition.release();
            throw th;
        }
    }

    @Test
    public void testUnfinishedBufferBehindFinished() throws Exception {
        PipelinedSubpartition mo98createSubpartition = mo98createSubpartition();
        ResultSubpartitionView createReadView = mo98createSubpartition.createReadView(new AwaitableBufferAvailablityListener());
        try {
            mo98createSubpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(1025));
            mo98createSubpartition.add(BufferBuilderTestUtils.createFilledBufferBuilder(1024).createBufferConsumer());
            assertNextBuffer(createReadView, 1025, false, 1, false, true);
            mo98createSubpartition.release();
        } catch (Throwable th) {
            mo98createSubpartition.release();
            throw th;
        }
    }

    @Test
    public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
        PipelinedSubpartition mo98createSubpartition = mo98createSubpartition();
        ResultSubpartitionView createReadView = mo98createSubpartition.createReadView(new AwaitableBufferAvailablityListener());
        try {
            mo98createSubpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(1025));
            mo98createSubpartition.add(BufferBuilderTestUtils.createFilledBufferBuilder(1024).createBufferConsumer());
            mo98createSubpartition.flush();
            assertNextBuffer(createReadView, 1025, true, 1, false, true);
            assertNextBuffer(createReadView, 1024, false, 1, false, false);
            mo98createSubpartition.release();
        } catch (Throwable th) {
            mo98createSubpartition.release();
            throw th;
        }
    }

    @Test
    public void testMultipleEmptyBuffers() throws Exception {
        PipelinedSubpartition mo98createSubpartition = mo98createSubpartition();
        AwaitableBufferAvailablityListener awaitableBufferAvailablityListener = new AwaitableBufferAvailablityListener();
        ResultSubpartitionView createReadView = mo98createSubpartition.createReadView(awaitableBufferAvailablityListener);
        awaitableBufferAvailablityListener.resetNotificationCounters();
        try {
            Assert.assertEquals(0L, awaitableBufferAvailablityListener.getNumNotifications());
            mo98createSubpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(0));
            Assert.assertEquals(1L, awaitableBufferAvailablityListener.getNumNotifications());
            mo98createSubpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(0));
            Assert.assertEquals(2L, awaitableBufferAvailablityListener.getNumNotifications());
            mo98createSubpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(0));
            Assert.assertEquals(2L, awaitableBufferAvailablityListener.getNumNotifications());
            Assert.assertEquals(3L, mo98createSubpartition.getBuffersInBacklog());
            mo98createSubpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(1024));
            Assert.assertEquals(2L, awaitableBufferAvailablityListener.getNumNotifications());
            assertNextBuffer(createReadView, 1024, false, 0, false, true);
            createReadView.releaseAllResources();
            mo98createSubpartition.release();
        } catch (Throwable th) {
            createReadView.releaseAllResources();
            mo98createSubpartition.release();
            throw th;
        }
    }

    @Test
    public void testIllegalReadViewRequest() throws Exception {
        PipelinedSubpartition mo98createSubpartition = mo98createSubpartition();
        Assert.assertNotNull(mo98createSubpartition.createReadView(new NoOpBufferAvailablityListener()));
        try {
            mo98createSubpartition.createReadView(new NoOpBufferAvailablityListener());
            Assert.fail("Did not throw expected exception after duplicate notifyNonEmpty view request.");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void testEmptyFlush() throws Exception {
        PipelinedSubpartition mo98createSubpartition = mo98createSubpartition();
        AwaitableBufferAvailablityListener awaitableBufferAvailablityListener = new AwaitableBufferAvailablityListener();
        mo98createSubpartition.createReadView(awaitableBufferAvailablityListener);
        mo98createSubpartition.flush();
        Assert.assertEquals(0L, awaitableBufferAvailablityListener.getNumNotifications());
    }

    @Test
    public void testBasicPipelinedProduceConsumeLogic() throws Exception {
        PipelinedSubpartition mo98createSubpartition = mo98createSubpartition();
        BufferAvailabilityListener bufferAvailabilityListener = (BufferAvailabilityListener) Mockito.mock(BufferAvailabilityListener.class);
        PipelinedSubpartitionView createReadView = mo98createSubpartition.createReadView(bufferAvailabilityListener);
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        Assert.assertNull(createReadView.getNextBuffer());
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        ((BufferAvailabilityListener) Mockito.verify(bufferAvailabilityListener, Mockito.times(0))).notifyDataAvailable();
        mo98createSubpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(32768));
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        Assert.assertEquals(1L, mo98createSubpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(1L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(0L, mo98createSubpartition.getTotalNumberOfBytes());
        ((BufferAvailabilityListener) Mockito.verify(bufferAvailabilityListener, Mockito.times(1))).notifyDataAvailable();
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        ResultSubpartition.BufferAndBacklog nextBuffer = createReadView.getNextBuffer();
        Assert.assertNotNull(nextBuffer);
        Assert.assertTrue(nextBuffer.buffer().isBuffer());
        Assert.assertEquals(32768L, mo98createSubpartition.getTotalNumberOfBytes());
        Assert.assertEquals(0L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(mo98createSubpartition.getBuffersInBacklog(), nextBuffer.buffersInBacklog());
        Assert.assertFalse(nextBuffer.nextBufferIsEvent());
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        Assert.assertNull(createReadView.getNextBuffer());
        Assert.assertEquals(0L, mo98createSubpartition.getBuffersInBacklog());
        mo98createSubpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(32768));
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        Assert.assertEquals(2L, mo98createSubpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(1L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(32768L, mo98createSubpartition.getTotalNumberOfBytes());
        ((BufferAvailabilityListener) Mockito.verify(bufferAvailabilityListener, Mockito.times(2))).notifyDataAvailable();
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        ResultSubpartition.BufferAndBacklog nextBuffer2 = createReadView.getNextBuffer();
        Assert.assertNotNull(nextBuffer2);
        Assert.assertTrue(nextBuffer2.buffer().isBuffer());
        Assert.assertEquals(65536L, mo98createSubpartition.getTotalNumberOfBytes());
        Assert.assertEquals(0L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(mo98createSubpartition.getBuffersInBacklog(), nextBuffer2.buffersInBacklog());
        Assert.assertFalse(nextBuffer2.nextBufferIsEvent());
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        Assert.assertNull(createReadView.getNextBuffer());
        Assert.assertEquals(0L, mo98createSubpartition.getBuffersInBacklog());
        mo98createSubpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(32768));
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        mo98createSubpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(32768));
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        mo98createSubpartition.add(BufferBuilderTestUtils.createFilledBufferConsumer(32768));
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        Assert.assertEquals(5L, mo98createSubpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(2L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(65536L, mo98createSubpartition.getTotalNumberOfBytes());
        ((BufferAvailabilityListener) Mockito.verify(bufferAvailabilityListener, Mockito.times(4))).notifyDataAvailable();
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        ResultSubpartition.BufferAndBacklog nextBuffer3 = createReadView.getNextBuffer();
        Assert.assertNotNull(nextBuffer3);
        Assert.assertTrue(nextBuffer3.buffer().isBuffer());
        Assert.assertEquals(98304L, mo98createSubpartition.getTotalNumberOfBytes());
        Assert.assertEquals(1L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(mo98createSubpartition.getBuffersInBacklog(), nextBuffer3.buffersInBacklog());
        Assert.assertTrue(nextBuffer3.nextBufferIsEvent());
        Assert.assertTrue(createReadView.nextBufferIsEvent());
        ResultSubpartition.BufferAndBacklog nextBuffer4 = createReadView.getNextBuffer();
        Assert.assertNotNull(nextBuffer4);
        Assert.assertFalse(nextBuffer4.buffer().isBuffer());
        Assert.assertEquals(131072L, mo98createSubpartition.getTotalNumberOfBytes());
        Assert.assertEquals(1L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(mo98createSubpartition.getBuffersInBacklog(), nextBuffer4.buffersInBacklog());
        Assert.assertFalse(nextBuffer4.nextBufferIsEvent());
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        ResultSubpartition.BufferAndBacklog nextBuffer5 = createReadView.getNextBuffer();
        Assert.assertNotNull(nextBuffer5);
        Assert.assertTrue(nextBuffer5.buffer().isBuffer());
        Assert.assertEquals(163840L, mo98createSubpartition.getTotalNumberOfBytes());
        Assert.assertEquals(0L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(mo98createSubpartition.getBuffersInBacklog(), nextBuffer5.buffersInBacklog());
        Assert.assertFalse(nextBuffer5.nextBufferIsEvent());
        Assert.assertEquals(5L, mo98createSubpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(163840L, mo98createSubpartition.getTotalNumberOfBytes());
        ((BufferAvailabilityListener) Mockito.verify(bufferAvailabilityListener, Mockito.times(4))).notifyDataAvailable();
    }

    @Test
    public void testConcurrentFastProduceAndFastConsume() throws Exception {
        testProduceConsume(false, false);
    }

    @Test
    public void testConcurrentFastProduceAndSlowConsume() throws Exception {
        testProduceConsume(false, true);
    }

    @Test
    public void testConcurrentSlowProduceAndFastConsume() throws Exception {
        testProduceConsume(true, false);
    }

    @Test
    public void testConcurrentSlowProduceAndSlowConsume() throws Exception {
        testProduceConsume(true, true);
    }

    @Test
    public void testIsReleasedChecksParent() throws Exception {
        PipelinedSubpartition pipelinedSubpartition = (PipelinedSubpartition) Mockito.mock(PipelinedSubpartition.class);
        PipelinedSubpartitionView pipelinedSubpartitionView = new PipelinedSubpartitionView(pipelinedSubpartition, (BufferAvailabilityListener) Mockito.mock(BufferAvailabilityListener.class));
        Assert.assertFalse(pipelinedSubpartitionView.isReleased());
        ((PipelinedSubpartition) Mockito.verify(pipelinedSubpartition, Mockito.times(1))).isReleased();
        Mockito.when(Boolean.valueOf(pipelinedSubpartition.isReleased())).thenReturn(true);
        Assert.assertTrue(pipelinedSubpartitionView.isReleased());
        ((PipelinedSubpartition) Mockito.verify(pipelinedSubpartition, Mockito.times(2))).isReleased();
    }

    private void testProduceConsume(boolean z, boolean z2) throws Exception {
        TestProducerSource testProducerSource = new TestProducerSource() { // from class: org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionTest.1
            private BufferProvider bufferProvider = new TestPooledBufferProvider(8);
            private int numberOfBuffers;

            @Override // org.apache.flink.runtime.io.network.util.TestProducerSource
            public TestProducerSource.BufferConsumerAndChannel getNextBufferConsumer() throws Exception {
                if (this.numberOfBuffers == 128) {
                    return null;
                }
                BufferBuilder requestBufferBuilderBlocking = this.bufferProvider.requestBufferBuilderBlocking();
                int maxCapacity = requestBufferBuilderBlocking.getMaxCapacity();
                MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(maxCapacity);
                int i = this.numberOfBuffers * (maxCapacity / 4);
                for (int i2 = 0; i2 < maxCapacity; i2 += 4) {
                    allocateUnpooledSegment.putInt(i2, i);
                    i++;
                }
                Preconditions.checkState(requestBufferBuilderBlocking.appendAndCommit(ByteBuffer.wrap(allocateUnpooledSegment.getArray())) == maxCapacity);
                requestBufferBuilderBlocking.finish();
                this.numberOfBuffers++;
                return new TestProducerSource.BufferConsumerAndChannel(requestBufferBuilderBlocking.createBufferConsumer(), 0);
            }
        };
        TestConsumerCallback testConsumerCallback = new TestConsumerCallback() { // from class: org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionTest.2
            private int numberOfBuffers;

            @Override // org.apache.flink.runtime.io.network.util.TestConsumerCallback
            public void onBuffer(Buffer buffer) {
                MemorySegment memorySegment = buffer.getMemorySegment();
                Assert.assertEquals(memorySegment.size(), buffer.getSize());
                int size = this.numberOfBuffers * (memorySegment.size() / 4);
                for (int i = 0; i < memorySegment.size(); i += 4) {
                    Assert.assertEquals(size, memorySegment.getInt(i));
                    size++;
                }
                this.numberOfBuffers++;
                buffer.recycleBuffer();
            }

            @Override // org.apache.flink.runtime.io.network.util.TestConsumerCallback
            public void onEvent(AbstractEvent abstractEvent) {
            }
        };
        PipelinedSubpartition mo98createSubpartition = mo98createSubpartition();
        TestSubpartitionConsumer testSubpartitionConsumer = new TestSubpartitionConsumer(z2, testConsumerCallback);
        testSubpartitionConsumer.setSubpartitionView(mo98createSubpartition.createReadView(testSubpartitionConsumer));
        FutureUtil.waitForAll(60000L, new Future[]{executorService.submit(new TestSubpartitionProducer(mo98createSubpartition, z, testProducerSource)), executorService.submit(testSubpartitionConsumer)});
    }

    @Test
    public void testCleanupReleasedPartitionNoView() throws Exception {
        testCleanupReleasedPartition(false);
    }

    @Test
    public void testCleanupReleasedPartitionWithView() throws Exception {
        testCleanupReleasedPartition(true);
    }

    private void testCleanupReleasedPartition(boolean z) throws Exception {
        PipelinedSubpartition mo98createSubpartition = mo98createSubpartition();
        BufferConsumer createFilledBufferConsumer = BufferBuilderTestUtils.createFilledBufferConsumer(4096);
        BufferConsumer createFilledBufferConsumer2 = BufferBuilderTestUtils.createFilledBufferConsumer(4096);
        try {
            mo98createSubpartition.add(createFilledBufferConsumer);
            mo98createSubpartition.add(createFilledBufferConsumer2);
            PipelinedSubpartitionView pipelinedSubpartitionView = null;
            if (z) {
                pipelinedSubpartitionView = mo98createSubpartition.createReadView(new NoOpBufferAvailablityListener());
            }
            mo98createSubpartition.release();
            Assert.assertTrue(mo98createSubpartition.isReleased());
            if (z) {
                Assert.assertTrue(pipelinedSubpartitionView.isReleased());
            }
            Assert.assertTrue(createFilledBufferConsumer.isRecycled());
            boolean isRecycled = createFilledBufferConsumer.isRecycled();
            if (!isRecycled) {
                createFilledBufferConsumer.close();
            }
            boolean isRecycled2 = createFilledBufferConsumer2.isRecycled();
            if (!isRecycled2) {
                createFilledBufferConsumer2.close();
            }
            if (!isRecycled) {
                Assert.fail("buffer 1 not recycled");
            }
            if (!isRecycled2) {
                Assert.fail("buffer 2 not recycled");
            }
            Assert.assertEquals(2L, mo98createSubpartition.getTotalNumberOfBuffers());
            Assert.assertEquals(0L, mo98createSubpartition.getTotalNumberOfBytes());
        } catch (Throwable th) {
            if (!createFilledBufferConsumer.isRecycled()) {
                createFilledBufferConsumer.close();
            }
            if (!createFilledBufferConsumer2.isRecycled()) {
                createFilledBufferConsumer2.close();
            }
            throw th;
        }
    }
}
