package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.class */
public class SpillableSubpartitionTest extends SubpartitionTestBase {
    private static final int BUFFER_DATA_SIZE = 4096;

    @Rule
    public ExpectedException exception = ExpectedException.none();
    private static final ExecutorService executorService = Executors.newCachedThreadPool();
    private static IOManager ioManager;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest$IOManagerAsyncWithClosedBufferFileWriter.class */
    private static class IOManagerAsyncWithClosedBufferFileWriter extends IOManagerAsync {
        private IOManagerAsyncWithClosedBufferFileWriter() {
        }

        public BufferFileWriter createBufferFileWriter(FileIOChannel.ID id) throws IOException {
            BufferFileWriter createBufferFileWriter = super.createBufferFileWriter(id);
            createBufferFileWriter.close();
            return createBufferFileWriter;
        }
    }

    @BeforeClass
    public static void setup() {
        ioManager = new IOManagerAsync();
    }

    @AfterClass
    public static void shutdown() {
        executorService.shutdownNow();
        ioManager.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.SubpartitionTestBase
    /* renamed from: createSubpartition, reason: merged with bridge method [inline-methods] */
    public SpillableSubpartition mo98createSubpartition() {
        return createSubpartition(ioManager);
    }

    private static SpillableSubpartition createSubpartition(IOManager iOManager) {
        ResultPartition resultPartition = (ResultPartition) Mockito.mock(ResultPartition.class);
        BufferProvider bufferProvider = (BufferProvider) Mockito.mock(BufferProvider.class);
        Mockito.when(resultPartition.getBufferProvider()).thenReturn(bufferProvider);
        Mockito.when(Integer.valueOf(bufferProvider.getMemorySegmentSize())).thenReturn(32768);
        return new SpillableSubpartition(0, resultPartition, iOManager);
    }

    @Test
    public void testConcurrentFinishAndReleaseMemory() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AsynchronousBufferFileWriter asynchronousBufferFileWriter = (AsynchronousBufferFileWriter) Mockito.mock(AsynchronousBufferFileWriter.class);
        ((AsynchronousBufferFileWriter) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.io.network.partition.SpillableSubpartitionTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m101answer(InvocationOnMock invocationOnMock) throws Throwable {
                countDownLatch2.countDown();
                countDownLatch.await();
                return null;
            }
        }).when(asynchronousBufferFileWriter)).close();
        IOManager iOManager = (IOManager) Mockito.mock(IOManager.class);
        Mockito.when(iOManager.createBufferFileWriter((FileIOChannel.ID) Matchers.any(FileIOChannel.ID.class))).thenReturn(asynchronousBufferFileWriter);
        final SpillableSubpartition spillableSubpartition = new SpillableSubpartition(0, (ResultPartition) Mockito.mock(ResultPartition.class), iOManager);
        Assert.assertEquals(0L, spillableSubpartition.releaseMemory());
        Future submit = Executors.newSingleThreadExecutor().submit(new Callable<Void>() { // from class: org.apache.flink.runtime.io.network.partition.SpillableSubpartitionTest.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                spillableSubpartition.finish();
                return null;
            }
        });
        countDownLatch2.await();
        spillableSubpartition.releaseMemory();
        countDownLatch.countDown();
        submit.get();
    }

    @Test
    public void testReleasePartitionAndGetNext() throws Exception {
        SpillableSubpartition mo98createSubpartition = mo98createSubpartition();
        mo98createSubpartition.finish();
        ResultSubpartitionView resultSubpartitionView = (ResultSubpartitionView) Mockito.spy(mo98createSubpartition.createReadView(new NoOpBufferAvailablityListener()));
        ((ResultSubpartitionView) Mockito.doNothing().when(resultSubpartitionView)).releaseAllResources();
        mo98createSubpartition.release();
        Assert.assertNull(resultSubpartitionView.getNextBuffer());
    }

    @Test
    public void testConsumeSpilledPartition() throws Exception {
        SpillableSubpartition mo98createSubpartition = mo98createSubpartition();
        BufferConsumer createFilledBufferConsumer = BufferBuilderTestUtils.createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
        BufferConsumer bufferConsumer = EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1L));
        int writtenBytes = bufferConsumer.getWrittenBytes();
        mo98createSubpartition.add(createFilledBufferConsumer.copy());
        mo98createSubpartition.add(createFilledBufferConsumer.copy());
        mo98createSubpartition.add(bufferConsumer);
        mo98createSubpartition.add(createFilledBufferConsumer);
        Assert.assertEquals(4L, mo98createSubpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(3L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(0L, mo98createSubpartition.getTotalNumberOfBytes());
        Assert.assertFalse(createFilledBufferConsumer.isRecycled());
        Assert.assertEquals(4L, mo98createSubpartition.releaseMemory());
        Assert.assertEquals(4L, mo98createSubpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(3L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(12288 + writtenBytes, mo98createSubpartition.getTotalNumberOfBytes());
        mo98createSubpartition.finish();
        Assert.assertEquals(5L, mo98createSubpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(3L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(12288 + writtenBytes + 4, mo98createSubpartition.getTotalNumberOfBytes());
        AwaitableBufferAvailablityListener awaitableBufferAvailablityListener = new AwaitableBufferAvailablityListener();
        SpilledSubpartitionView createReadView = mo98createSubpartition.createReadView(awaitableBufferAvailablityListener);
        Assert.assertEquals(1L, awaitableBufferAvailablityListener.getNumNotifications());
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        assertNextBuffer(createReadView, BUFFER_DATA_SIZE, true, 2, false, true);
        Assert.assertEquals(2L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        assertNextBuffer(createReadView, BUFFER_DATA_SIZE, true, 1, true, true);
        Assert.assertEquals(1L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertTrue(createReadView.nextBufferIsEvent());
        assertNextEvent(createReadView, writtenBytes, CancelCheckpointMarker.class, true, 1, false, true);
        Assert.assertEquals(1L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        assertNextBuffer(createReadView, BUFFER_DATA_SIZE, true, 0, true, true);
        Assert.assertEquals(0L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertTrue(createReadView.nextBufferIsEvent());
        assertNextEvent(createReadView, 4, EndOfPartitionEvent.class, false, 0, false, true);
        Assert.assertEquals(0L, mo98createSubpartition.getBuffersInBacklog());
        long currentTimeMillis = System.currentTimeMillis() + 30000;
        while (!createFilledBufferConsumer.isRecycled() && System.currentTimeMillis() < currentTimeMillis) {
            Thread.sleep(1L);
        }
        Assert.assertTrue(createFilledBufferConsumer.isRecycled());
    }

    @Test
    public void testConsumeSpilledPartitionSpilledBeforeAdd() throws Exception {
        SpillableSubpartition mo98createSubpartition = mo98createSubpartition();
        Assert.assertEquals(0L, mo98createSubpartition.releaseMemory());
        BufferBuilder[] bufferBuilderArr = {BufferBuilderTestUtils.createBufferBuilder(BUFFER_DATA_SIZE), BufferBuilderTestUtils.createBufferBuilder(BUFFER_DATA_SIZE), BufferBuilderTestUtils.createBufferBuilder(BUFFER_DATA_SIZE), BufferBuilderTestUtils.createBufferBuilder(BUFFER_DATA_SIZE)};
        BufferConsumer[] bufferConsumerArr = (BufferConsumer[]) Arrays.stream(bufferBuilderArr).map((v0) -> {
            return v0.createBufferConsumer();
        }).toArray(i -> {
            return new BufferConsumer[i];
        });
        BufferConsumer bufferConsumer = EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1L));
        int writtenBytes = bufferConsumer.getWrittenBytes();
        mo98createSubpartition.add(bufferConsumerArr[0]);
        BufferBuilderTestUtils.fillBufferBuilder(bufferBuilderArr[0], BUFFER_DATA_SIZE).finish();
        mo98createSubpartition.add(bufferConsumerArr[1]);
        BufferBuilderTestUtils.fillBufferBuilder(bufferBuilderArr[1], BUFFER_DATA_SIZE).finish();
        mo98createSubpartition.add(bufferConsumer);
        mo98createSubpartition.add(bufferConsumerArr[2]);
        bufferBuilderArr[2].finish();
        mo98createSubpartition.add(bufferConsumerArr[3]);
        BufferBuilderTestUtils.fillBufferBuilder(bufferBuilderArr[3], 2048);
        Assert.assertEquals(5L, mo98createSubpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(3L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(8192 + writtenBytes, mo98createSubpartition.getTotalNumberOfBytes());
        mo98createSubpartition.finish();
        Assert.assertEquals(6L, mo98createSubpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(3L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(r0 + 2048 + 4, mo98createSubpartition.getTotalNumberOfBytes());
        Arrays.stream(bufferConsumerArr).forEach(bufferConsumer2 -> {
            Assert.assertTrue(bufferConsumer2.isRecycled());
        });
        AwaitableBufferAvailablityListener awaitableBufferAvailablityListener = new AwaitableBufferAvailablityListener();
        SpilledSubpartitionView createReadView = mo98createSubpartition.createReadView(awaitableBufferAvailablityListener);
        Assert.assertEquals(1L, awaitableBufferAvailablityListener.getNumNotifications());
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        assertNextBuffer(createReadView, BUFFER_DATA_SIZE, true, 2, false, true);
        Assert.assertEquals(2L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        assertNextBuffer(createReadView, BUFFER_DATA_SIZE, true, 1, true, true);
        Assert.assertEquals(1L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertTrue(createReadView.nextBufferIsEvent());
        assertNextEvent(createReadView, writtenBytes, CancelCheckpointMarker.class, true, 1, false, true);
        Assert.assertEquals(1L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        assertNextBuffer(createReadView, 2048, true, 0, true, true);
        Assert.assertEquals(0L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertTrue(createReadView.nextBufferIsEvent());
        assertNextEvent(createReadView, 4, EndOfPartitionEvent.class, false, 0, false, true);
        Assert.assertEquals(0L, mo98createSubpartition.getBuffersInBacklog());
        Arrays.stream(bufferConsumerArr).forEach(bufferConsumer3 -> {
            bufferConsumer3.close();
        });
    }

    @Test
    public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception {
        SpillableSubpartition mo98createSubpartition = mo98createSubpartition();
        BufferConsumer createFilledBufferConsumer = BufferBuilderTestUtils.createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
        BufferConsumer bufferConsumer = EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1L));
        int writtenBytes = bufferConsumer.getWrittenBytes();
        mo98createSubpartition.add(createFilledBufferConsumer.copy());
        mo98createSubpartition.add(createFilledBufferConsumer.copy());
        mo98createSubpartition.add(bufferConsumer);
        mo98createSubpartition.add(createFilledBufferConsumer);
        mo98createSubpartition.finish();
        Assert.assertEquals(5L, mo98createSubpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(3L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(0L, mo98createSubpartition.getTotalNumberOfBytes());
        AwaitableBufferAvailablityListener awaitableBufferAvailablityListener = new AwaitableBufferAvailablityListener();
        SpillableSubpartitionView createReadView = mo98createSubpartition.createReadView(awaitableBufferAvailablityListener);
        Assert.assertEquals(1L, awaitableBufferAvailablityListener.getNumNotifications());
        Assert.assertFalse(createFilledBufferConsumer.isRecycled());
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        assertNextBuffer(createReadView, BUFFER_DATA_SIZE, true, 2, false, false);
        Assert.assertEquals(4096L, mo98createSubpartition.getTotalNumberOfBytes());
        Assert.assertEquals(2L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(1L, awaitableBufferAvailablityListener.getNumNotifications());
        Assert.assertFalse(createFilledBufferConsumer.isRecycled());
        Assert.assertEquals(3L, mo98createSubpartition.releaseMemory());
        Assert.assertFalse(createFilledBufferConsumer.isRecycled());
        Assert.assertEquals(5L, mo98createSubpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(2L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertEquals(8192 + writtenBytes + 4, mo98createSubpartition.getTotalNumberOfBytes());
        awaitableBufferAvailablityListener.awaitNotifications(2L, 30000L);
        Assert.assertEquals(2L, awaitableBufferAvailablityListener.getNumNotifications());
        Buffer build = createFilledBufferConsumer.build();
        build.retainBuffer();
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        assertNextBuffer(createReadView, BUFFER_DATA_SIZE, true, 1, true, false);
        Assert.assertEquals(12288 + writtenBytes + 4, mo98createSubpartition.getTotalNumberOfBytes());
        Assert.assertEquals(1L, mo98createSubpartition.getBuffersInBacklog());
        createFilledBufferConsumer.close();
        Assert.assertTrue(createReadView.nextBufferIsEvent());
        assertNextEvent(createReadView, writtenBytes, CancelCheckpointMarker.class, true, 1, false, true);
        Assert.assertEquals(12288 + writtenBytes + 4, mo98createSubpartition.getTotalNumberOfBytes());
        Assert.assertEquals(1L, mo98createSubpartition.getBuffersInBacklog());
        Assert.assertFalse(createReadView.nextBufferIsEvent());
        assertNextBuffer(createReadView, BUFFER_DATA_SIZE, true, 0, true, true);
        Assert.assertEquals(12288 + writtenBytes + 4, mo98createSubpartition.getTotalNumberOfBytes());
        Assert.assertEquals(0L, mo98createSubpartition.getBuffersInBacklog());
        build.recycleBuffer();
        Assert.assertTrue(build.isRecycled());
        Assert.assertTrue(createReadView.nextBufferIsEvent());
        assertNextEvent(createReadView, 4, EndOfPartitionEvent.class, false, 0, false, true);
        Assert.assertEquals(12288 + writtenBytes + 4, mo98createSubpartition.getTotalNumberOfBytes());
        Assert.assertEquals(0L, mo98createSubpartition.getBuffersInBacklog());
        long currentTimeMillis = System.currentTimeMillis() + 30000;
        while (!createFilledBufferConsumer.isRecycled() && System.currentTimeMillis() < currentTimeMillis) {
            Thread.sleep(1L);
        }
        Assert.assertTrue(createFilledBufferConsumer.isRecycled());
    }

    @Test
    public void testAddOnFinishedSpillablePartition() throws Exception {
        testAddOnFinishedPartition(false);
    }

    @Test
    public void testAddOnFinishedSpilledPartition() throws Exception {
        testAddOnFinishedPartition(true);
    }

    private void testAddOnFinishedPartition(boolean z) throws Exception {
        SpillableSubpartition mo98createSubpartition = mo98createSubpartition();
        if (z) {
            Assert.assertEquals(0L, mo98createSubpartition.releaseMemory());
        }
        mo98createSubpartition.finish();
        Assert.assertEquals(1L, mo98createSubpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(z ? 4L : 0L, mo98createSubpartition.getTotalNumberOfBytes());
        BufferConsumer createFilledBufferConsumer = BufferBuilderTestUtils.createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
        try {
            mo98createSubpartition.add(createFilledBufferConsumer);
            if (!createFilledBufferConsumer.isRecycled()) {
                createFilledBufferConsumer.close();
                Assert.fail("buffer not recycled");
            }
            Assert.assertEquals(1L, mo98createSubpartition.getTotalNumberOfBuffers());
            Assert.assertEquals(z ? 4L : 0L, mo98createSubpartition.getTotalNumberOfBytes());
        } catch (Throwable th) {
            if (!createFilledBufferConsumer.isRecycled()) {
                createFilledBufferConsumer.close();
                Assert.fail("buffer not recycled");
            }
            throw th;
        }
    }

    @Test
    public void testAddOnReleasedSpillablePartition() throws Exception {
        testAddOnReleasedPartition(false);
    }

    @Test
    public void testAddOnReleasedSpilledPartition() throws Exception {
        testAddOnReleasedPartition(true);
    }

    private void testAddOnReleasedPartition(boolean z) throws Exception {
        SpillableSubpartition mo98createSubpartition = mo98createSubpartition();
        mo98createSubpartition.release();
        if (z) {
            Assert.assertEquals(0L, mo98createSubpartition.releaseMemory());
        }
        BufferConsumer createFilledBufferConsumer = BufferBuilderTestUtils.createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
        try {
            mo98createSubpartition.add(createFilledBufferConsumer);
            boolean isRecycled = createFilledBufferConsumer.isRecycled();
            if (!isRecycled) {
                createFilledBufferConsumer.close();
            }
            if (!isRecycled) {
                Assert.fail("buffer not recycled");
            }
            Assert.assertEquals(0L, mo98createSubpartition.getTotalNumberOfBuffers());
            Assert.assertEquals(0L, mo98createSubpartition.getTotalNumberOfBytes());
        } catch (Throwable th) {
            if (!createFilledBufferConsumer.isRecycled()) {
                createFilledBufferConsumer.close();
            }
            throw th;
        }
    }

    @Test
    public void testAddOnSpilledPartitionWithSlowWriter() throws Exception {
        IOManagerAsyncWithNoOpBufferFileWriter iOManagerAsyncWithNoOpBufferFileWriter = new IOManagerAsyncWithNoOpBufferFileWriter();
        SpillableSubpartition createSubpartition = createSubpartition(iOManagerAsyncWithNoOpBufferFileWriter);
        Assert.assertEquals(0L, createSubpartition.releaseMemory());
        BufferConsumer createFilledBufferConsumer = BufferBuilderTestUtils.createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
        try {
            createSubpartition.add(createFilledBufferConsumer);
            iOManagerAsyncWithNoOpBufferFileWriter.shutdown();
            boolean isRecycled = createFilledBufferConsumer.isRecycled();
            if (!isRecycled) {
                createFilledBufferConsumer.close();
            }
            if (isRecycled) {
                Assert.fail("buffer recycled before the write operation completed");
            }
            Assert.assertEquals(1L, createSubpartition.getTotalNumberOfBuffers());
            Assert.assertEquals(4096L, createSubpartition.getTotalNumberOfBytes());
        } catch (Throwable th) {
            iOManagerAsyncWithNoOpBufferFileWriter.shutdown();
            if (!createFilledBufferConsumer.isRecycled()) {
                createFilledBufferConsumer.close();
            }
            throw th;
        }
    }

    @Test
    public void testReleaseOnSpillablePartitionWithoutViewWithSlowWriter() throws Exception {
        testReleaseOnSpillablePartitionWithSlowWriter(false);
    }

    @Test
    public void testReleaseOnSpillablePartitionWithViewWithSlowWriter() throws Exception {
        testReleaseOnSpillablePartitionWithSlowWriter(true);
    }

    private void testReleaseOnSpillablePartitionWithSlowWriter(boolean z) throws Exception {
        IOManagerAsyncWithNoOpBufferFileWriter iOManagerAsyncWithNoOpBufferFileWriter = new IOManagerAsyncWithNoOpBufferFileWriter();
        SpillableSubpartition createSubpartition = createSubpartition(iOManagerAsyncWithNoOpBufferFileWriter);
        BufferConsumer createFilledBufferConsumer = BufferBuilderTestUtils.createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
        BufferConsumer createFilledBufferConsumer2 = BufferBuilderTestUtils.createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
        try {
            createSubpartition.add(createFilledBufferConsumer);
            createSubpartition.add(createFilledBufferConsumer2);
            Assert.assertFalse("buffer1 should not be recycled (still in the queue)", createFilledBufferConsumer.isRecycled());
            Assert.assertFalse("buffer2 should not be recycled (still in the queue)", createFilledBufferConsumer2.isRecycled());
            Assert.assertEquals(2L, createSubpartition.getTotalNumberOfBuffers());
            Assert.assertEquals(0L, createSubpartition.getTotalNumberOfBytes());
            if (z) {
                createSubpartition.finish();
                createSubpartition.createReadView(new NoOpBufferAvailablityListener());
                Assert.assertEquals(0L, createSubpartition.getTotalNumberOfBytes());
            }
            Assert.assertEquals(2L, createSubpartition.releaseMemory());
            Assert.assertFalse("buffer1 should not be recycled (advertised as nextBuffer)", createFilledBufferConsumer.isRecycled());
            Assert.assertFalse("buffer2 should not be recycled (not written yet)", createFilledBufferConsumer2.isRecycled());
            iOManagerAsyncWithNoOpBufferFileWriter.shutdown();
            if (!createFilledBufferConsumer.isRecycled()) {
                createFilledBufferConsumer.close();
            }
            if (!createFilledBufferConsumer2.isRecycled()) {
                createFilledBufferConsumer2.close();
            }
            Assert.assertEquals(2 + (z ? 1 : 0), createSubpartition.getTotalNumberOfBuffers());
            Assert.assertEquals(BUFFER_DATA_SIZE + (z ? 4 : BUFFER_DATA_SIZE), createSubpartition.getTotalNumberOfBytes());
        } catch (Throwable th) {
            iOManagerAsyncWithNoOpBufferFileWriter.shutdown();
            if (!createFilledBufferConsumer.isRecycled()) {
                createFilledBufferConsumer.close();
            }
            if (!createFilledBufferConsumer2.isRecycled()) {
                createFilledBufferConsumer2.close();
            }
            throw th;
        }
    }

    @Test
    public void testAddOnSpilledPartitionWithFailingWriter() throws Exception {
        IOManagerAsyncWithClosedBufferFileWriter iOManagerAsyncWithClosedBufferFileWriter = new IOManagerAsyncWithClosedBufferFileWriter();
        SpillableSubpartition createSubpartition = createSubpartition(iOManagerAsyncWithClosedBufferFileWriter);
        Assert.assertEquals(0L, createSubpartition.releaseMemory());
        this.exception.expect(IOException.class);
        BufferConsumer createFilledBufferConsumer = BufferBuilderTestUtils.createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
        try {
            createSubpartition.add(createFilledBufferConsumer);
            iOManagerAsyncWithClosedBufferFileWriter.shutdown();
            boolean isRecycled = createFilledBufferConsumer.isRecycled();
            if (!isRecycled) {
                createFilledBufferConsumer.close();
            }
            if (!isRecycled) {
                Assert.fail("buffer not recycled");
            }
            Assert.assertEquals(0L, createSubpartition.getTotalNumberOfBuffers());
            Assert.assertEquals(0L, createSubpartition.getTotalNumberOfBytes());
        } catch (Throwable th) {
            iOManagerAsyncWithClosedBufferFileWriter.shutdown();
            if (!createFilledBufferConsumer.isRecycled()) {
                createFilledBufferConsumer.close();
            }
            throw th;
        }
    }

    @Test
    public void testCleanupReleasedSpillablePartitionNoView() throws Exception {
        testCleanupReleasedPartition(false, false);
    }

    @Test
    public void testCleanupReleasedSpillablePartitionWithView() throws Exception {
        testCleanupReleasedPartition(false, true);
    }

    @Test
    public void testCleanupReleasedSpilledPartitionNoView() throws Exception {
        testCleanupReleasedPartition(true, false);
    }

    @Test
    public void testCleanupReleasedSpilledPartitionWithView() throws Exception {
        testCleanupReleasedPartition(true, true);
    }

    private void testCleanupReleasedPartition(boolean z, boolean z2) throws Exception {
        SpillableSubpartition mo98createSubpartition = mo98createSubpartition();
        BufferConsumer createFilledBufferConsumer = BufferBuilderTestUtils.createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
        BufferConsumer createFilledBufferConsumer2 = BufferBuilderTestUtils.createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
        try {
            mo98createSubpartition.add(createFilledBufferConsumer);
            mo98createSubpartition.add(createFilledBufferConsumer2);
            ResultSubpartitionView resultSubpartitionView = null;
            if (z2) {
                mo98createSubpartition.finish();
                resultSubpartitionView = mo98createSubpartition.createReadView(new NoOpBufferAvailablityListener());
            }
            if (z) {
                Assert.assertEquals(2L, mo98createSubpartition.releaseMemory());
            }
            mo98createSubpartition.release();
            Assert.assertTrue(mo98createSubpartition.isReleased());
            if (z2) {
                Assert.assertTrue(resultSubpartitionView.isReleased());
            }
            Assert.assertTrue(createFilledBufferConsumer.isRecycled());
            boolean isRecycled = createFilledBufferConsumer.isRecycled();
            if (!isRecycled) {
                createFilledBufferConsumer.close();
            }
            boolean isRecycled2 = createFilledBufferConsumer2.isRecycled();
            if (!isRecycled2) {
                createFilledBufferConsumer2.close();
            }
            if (!isRecycled) {
                Assert.fail("buffer 1 not recycled");
            }
            if (!isRecycled2) {
                Assert.fail("buffer 2 not recycled");
            }
            Assert.assertEquals(z2 ? 3L : 2L, mo98createSubpartition.getTotalNumberOfBuffers());
            if (z) {
                Assert.assertEquals(BUFFER_DATA_SIZE + (z2 ? 4 : BUFFER_DATA_SIZE), mo98createSubpartition.getTotalNumberOfBytes());
            } else {
                Assert.assertEquals(0L, mo98createSubpartition.getTotalNumberOfBytes());
            }
        } catch (Throwable th) {
            if (!createFilledBufferConsumer.isRecycled()) {
                createFilledBufferConsumer.close();
            }
            if (!createFilledBufferConsumer2.isRecycled()) {
                createFilledBufferConsumer2.close();
            }
            throw th;
        }
    }

    @Test
    public void testSpillFinishedBufferConsumersFull() throws Exception {
        SpillableSubpartition mo98createSubpartition = mo98createSubpartition();
        BufferBuilder createBufferBuilder = BufferBuilderTestUtils.createBufferBuilder(BUFFER_DATA_SIZE);
        mo98createSubpartition.add(createBufferBuilder.createBufferConsumer());
        Assert.assertEquals(0L, mo98createSubpartition.releaseMemory());
        Assert.assertEquals(1L, mo98createSubpartition.getBuffersInBacklog());
        BufferBuilderTestUtils.fillBufferBuilder(createBufferBuilder, BUFFER_DATA_SIZE).finish();
        Assert.assertEquals(4096L, mo98createSubpartition.spillFinishedBufferConsumers(false));
        Assert.assertEquals(1L, mo98createSubpartition.getBuffersInBacklog());
    }

    @Test
    public void testSpillFinishedBufferConsumersPartial() throws Exception {
        SpillableSubpartition mo98createSubpartition = mo98createSubpartition();
        BufferBuilder createBufferBuilder = BufferBuilderTestUtils.createBufferBuilder(8192);
        mo98createSubpartition.add(createBufferBuilder.createBufferConsumer());
        BufferBuilderTestUtils.fillBufferBuilder(createBufferBuilder, BUFFER_DATA_SIZE);
        Assert.assertEquals(0L, mo98createSubpartition.releaseMemory());
        Assert.assertEquals(2L, mo98createSubpartition.getBuffersInBacklog());
        BufferBuilderTestUtils.fillBufferBuilder(createBufferBuilder, BUFFER_DATA_SIZE).finish();
        Assert.assertEquals(4096L, mo98createSubpartition.spillFinishedBufferConsumers(false));
        Assert.assertEquals(2L, mo98createSubpartition.getBuffersInBacklog());
    }
}
