/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.LocalBufferPoolDestroyTest;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class ResultPartitionTest {
    private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
    private static FileChannelManager fileChannelManager;
    private final int bufferSize = 1024;

    @BeforeClass
    public static void setUp() {
        fileChannelManager = new FileChannelManagerImpl(new String[]{tempDir}, "testing");
    }

    @AfterClass
    public static void shutdown() throws Exception {
        fileChannelManager.close();
    }

    @Test
    public void testResultSubpartitionInfo() {
        int numPartitions = 2;
        int numSubpartitions = 3;
        for (int i = 0; i < 2; ++i) {
            PipelinedResultPartition partition = (PipelinedResultPartition)new ResultPartitionBuilder().setResultPartitionIndex(i).setNumberOfSubpartitions(3).build();
            ResultSubpartition[] subpartitions = partition.getAllPartitions();
            for (int j = 0; j < subpartitions.length; ++j) {
                ResultSubpartitionInfo subpartitionInfo = subpartitions[j].getSubpartitionInfo();
                Assert.assertEquals((long)i, (long)subpartitionInfo.getPartitionIdx());
                Assert.assertEquals((long)j, (long)subpartitionInfo.getSubPartitionIdx());
            }
        }
    }

    @Test
    public void testAddOnFinishedPipelinedPartition() throws Exception {
        this.testAddOnFinishedPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testAddOnFinishedBlockingPartition() throws Exception {
        this.testAddOnFinishedPartition(ResultPartitionType.BLOCKING);
    }

    @Test
    public void testBlockingPartitionIsConsumableMultipleTimesIfNotReleasedOnConsumption() throws IOException {
        ResultPartitionManager manager = new ResultPartitionManager();
        ResultPartition partition = new ResultPartitionBuilder().setResultPartitionManager(manager).setResultPartitionType(ResultPartitionType.BLOCKING).setFileChannelManager(fileChannelManager).build();
        manager.registerResultPartition(partition);
        partition.finish();
        MatcherAssert.assertThat((Object)manager.getUnreleasedPartitions(), (Matcher)Matchers.contains((Object[])new ResultPartitionID[]{partition.getPartitionId()}));
        for (int x = 0; x < 2; ++x) {
            ResultSubpartitionView subpartitionView1 = partition.createSubpartitionView(0, () -> {});
            subpartitionView1.releaseAllResources();
            MatcherAssert.assertThat((Object)manager.getUnreleasedPartitions(), (Matcher)Matchers.contains((Object[])new ResultPartitionID[]{partition.getPartitionId()}));
            Assert.assertFalse((boolean)partition.isReleased());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAddOnFinishedPartition(ResultPartitionType partitionType) throws Exception {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(partitionType);
        try {
            bufferWritingResultPartition.finish();
            bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        }
        catch (IllegalStateException illegalStateException) {
        }
        finally {
            Assert.assertEquals((long)0L, (long)bufferWritingResultPartition.numBuffersOut.getCount());
            Assert.assertEquals((long)0L, (long)bufferWritingResultPartition.numBytesOut.getCount());
            Assert.assertEquals((long)0L, (long)bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
        }
    }

    @Test
    public void testAddOnReleasedPipelinedPartition() throws Exception {
        this.testAddOnReleasedPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testAddOnReleasedBlockingPartition() throws Exception {
        this.testAddOnReleasedPartition(ResultPartitionType.BLOCKING);
    }

    private void testAddOnReleasedPartition(ResultPartitionType partitionType) throws Exception {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(partitionType);
        try {
            bufferWritingResultPartition.release(null);
            bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        }
        finally {
            Assert.assertEquals((long)1L, (long)bufferWritingResultPartition.numBuffersOut.getCount());
            Assert.assertEquals((long)1024L, (long)bufferWritingResultPartition.numBytesOut.getCount());
            Assert.assertEquals((long)0L, (long)bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
        }
    }

    @Test
    public void testAddOnPipelinedPartition() throws Exception {
        this.testAddOnPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testAddOnBlockingPartition() throws Exception {
        this.testAddOnPartition(ResultPartitionType.BLOCKING);
    }

    @Test
    public void testCreateSubpartitionOnFailingPartition() throws Exception {
        ResultPartitionManager manager = new ResultPartitionManager();
        ResultPartition partition = new ResultPartitionBuilder().setResultPartitionManager(manager).build();
        manager.registerResultPartition(partition);
        partition.fail(null);
        PartitionTestUtils.verifyCreateSubpartitionViewThrowsException((ResultPartitionProvider)manager, partition.getPartitionId());
    }

    private void testAddOnPartition(ResultPartitionType partitionType) throws Exception {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(partitionType);
        try {
            bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        }
        finally {
            Assert.assertEquals((long)1L, (long)bufferWritingResultPartition.numBuffersOut.getCount());
            Assert.assertEquals((long)1024L, (long)bufferWritingResultPartition.numBytesOut.getCount());
            Assert.assertEquals((long)1L, (long)bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReleaseMemoryOnPipelinedPartition() throws Exception {
        int numAllBuffers = 10;
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(1024).build();
        ResultPartition resultPartition = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED, 1);
        try {
            resultPartition.setup();
            for (int i = 0; i < 10; ++i) {
                resultPartition.emitRecord(ByteBuffer.allocate(1023), 0);
            }
            Assert.assertEquals((long)0L, (long)resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
            resultPartition.close();
            Assert.assertTrue((boolean)resultPartition.getBufferPool().isDestroyed());
            Assert.assertEquals((long)10L, (long)network.getNetworkBufferPool().getNumberOfUsedMemorySegments());
            resultPartition.release();
            Assert.assertEquals((long)0L, (long)network.getNetworkBufferPool().getNumberOfUsedMemorySegments());
        }
        finally {
            network.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIsAvailableOrNot() throws IOException {
        int numAllBuffers = 10;
        int bufferSize = 1024;
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(1024).build();
        ResultPartition resultPartition = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED, 1);
        try {
            resultPartition.setup();
            resultPartition.getBufferPool().setNumBuffers(2);
            Assert.assertTrue((boolean)resultPartition.getAvailableFuture().isDone());
            resultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
            resultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
            Assert.assertFalse((boolean)resultPartition.getAvailableFuture().isDone());
        }
        finally {
            resultPartition.release();
            network.close();
        }
    }

    @Test
    public void testPipelinedPartitionBufferPool() throws Exception {
        this.testPartitionBufferPool(ResultPartitionType.PIPELINED_BOUNDED);
    }

    @Test
    public void testBlockingPartitionBufferPool() throws Exception {
        this.testPartitionBufferPool(ResultPartitionType.BLOCKING);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testPartitionBufferPool(ResultPartitionType type) throws Exception {
        int networkBuffersPerChannel = 2;
        int floatingNetworkBuffersPerGate = 8;
        NetworkBufferPool globalPool = new NetworkBufferPool(20, 1);
        ResultPartition partition = new ResultPartitionBuilder().setResultPartitionType(type).setFileChannelManager(fileChannelManager).setNetworkBuffersPerChannel(2).setFloatingNetworkBuffersPerGate(8).setNetworkBufferPool(globalPool).build();
        try {
            partition.setup();
            BufferPool bufferPool = partition.getBufferPool();
            Assert.assertEquals((long)(partition.getNumberOfSubpartitions() + 1), (long)bufferPool.getNumberOfRequiredMemorySegments());
            if (type.isBounded()) {
                int maxNumBuffers = 2 * partition.getNumberOfSubpartitions() + 8;
                Assert.assertEquals((long)maxNumBuffers, (long)bufferPool.getMaxNumberOfMemorySegments());
            } else {
                Assert.assertEquals((long)Integer.MAX_VALUE, (long)bufferPool.getMaxNumberOfMemorySegments());
            }
        }
        finally {
            globalPool.destroyAllBufferPools();
            globalPool.destroy();
        }
    }

    private BufferWritingResultPartition createResultPartition(ResultPartitionType partitionType) throws IOException {
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(1024).build();
        ResultPartition resultPartition = PartitionTestUtils.createPartition(network, fileChannelManager, partitionType, 2);
        resultPartition.setup();
        return (BufferWritingResultPartition)resultPartition;
    }

    @Test
    public void testIdleAndBackPressuredTime() throws IOException, InterruptedException {
        int bufferSize = 1024;
        NetworkBufferPool globalPool = new NetworkBufferPool(10, bufferSize);
        BufferPool localPool = globalPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE, 0);
        BufferWritingResultPartition resultPartition = (BufferWritingResultPartition)new ResultPartitionBuilder().setBufferPoolFactory((SupplierWithException<BufferPool, IOException>)((SupplierWithException)() -> localPool)).build();
        resultPartition.setup();
        resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
        ResultSubpartitionView readView = resultPartition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        Buffer buffer = readView.getNextBuffer().buffer();
        Assert.assertNotNull((Object)buffer);
        MatcherAssert.assertThat((Object)resultPartition.getHardBackPressuredTimeMsPerSecond().getValue(), (Matcher)Matchers.equalTo((Object)0L));
        CountDownLatch syncLock = new CountDownLatch(1);
        Thread requestThread = new Thread(() -> {
            try {
                syncLock.countDown();
                resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        requestThread.start();
        syncLock.await();
        while (!LocalBufferPoolDestroyTest.isInBlockingBufferRequest(requestThread.getStackTrace())) {
            Thread.sleep(50L);
        }
        Thread.sleep(5L);
        buffer.recycleBuffer();
        requestThread.join();
        Assert.assertThat((Object)resultPartition.getHardBackPressuredTimeMsPerSecond().getCount(), (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L)));
        Assert.assertNotNull((Object)readView.getNextBuffer().buffer());
    }

    @Test
    public void testFlushBoundedBlockingResultPartition() throws IOException {
        int value = 1024;
        BufferWritingResultPartition partition = this.createResultPartition(ResultPartitionType.BLOCKING);
        ByteBuffer record = ByteBuffer.allocate(4);
        record.putInt(value);
        record.rewind();
        partition.emitRecord(record, 0);
        partition.flush(0);
        record.rewind();
        partition.emitRecord(record, 0);
        record.rewind();
        partition.broadcastRecord(record);
        partition.flushAll();
        record.rewind();
        partition.broadcastRecord(record);
        partition.finish();
        record.rewind();
        ResultSubpartitionView readView1 = partition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        for (int i = 0; i < 4; ++i) {
            Assert.assertEquals((Object)record, (Object)readView1.getNextBuffer().buffer().getNioBufferReadable());
        }
        Assert.assertFalse((boolean)readView1.getNextBuffer().buffer().isBuffer());
        Assert.assertNull((Object)readView1.getNextBuffer());
        ResultSubpartitionView readView2 = partition.createSubpartitionView(1, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        for (int i = 0; i < 2; ++i) {
            Assert.assertEquals((Object)record, (Object)readView2.getNextBuffer().buffer().getNioBufferReadable());
        }
        Assert.assertFalse((boolean)readView2.getNextBuffer().buffer().isBuffer());
        Assert.assertNull((Object)readView2.getNextBuffer());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEmitRecordWithRecordSpanningMultipleBuffers() throws Exception {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED);
        PipelinedSubpartition pipelinedSubpartition = (PipelinedSubpartition)bufferWritingResultPartition.subpartitions[0];
        int partialLength = 341;
        try {
            bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(partialLength), 0);
            bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        }
        finally {
            Assert.assertEquals((long)2L, (long)pipelinedSubpartition.getNumberOfQueuedBuffers());
            Assert.assertEquals((long)0L, (long)pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
            Assert.assertEquals((long)partialLength, (long)pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBroadcastRecordWithRecordSpanningMultipleBuffers() throws Exception {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED);
        int partialLength = 341;
        try {
            bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(partialLength));
            bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(1024));
        }
        finally {
            for (ResultSubpartition resultSubpartition : bufferWritingResultPartition.subpartitions) {
                PipelinedSubpartition pipelinedSubpartition = (PipelinedSubpartition)resultSubpartition;
                Assert.assertEquals((long)2L, (long)pipelinedSubpartition.getNumberOfQueuedBuffers());
                Assert.assertEquals((long)0L, (long)pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
                Assert.assertEquals((long)partialLength, (long)pipelinedSubpartition.getNextBuffer().getPartialRecordLength());
            }
        }
    }

    @Test
    public void testWaitForAllRecordProcessed() throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        bufferWritingResultPartition.notifyEndOfData(StopMode.DRAIN);
        CompletableFuture allRecordsProcessedFuture = bufferWritingResultPartition.getAllDataProcessedFuture();
        Assert.assertFalse((boolean)allRecordsProcessedFuture.isDone());
        for (ResultSubpartition resultSubpartition : bufferWritingResultPartition.subpartitions) {
            Assert.assertEquals((long)1L, (long)resultSubpartition.getTotalNumberOfBuffersUnsafe());
            Buffer nextBuffer = ((PipelinedSubpartition)resultSubpartition).pollBuffer().buffer();
            Assert.assertFalse((boolean)nextBuffer.isBuffer());
            Assert.assertEquals((Object)new EndOfData(StopMode.DRAIN), (Object)EventSerializer.fromBuffer((Buffer)nextBuffer, (ClassLoader)this.getClass().getClassLoader()));
        }
        for (int i = 0; i < bufferWritingResultPartition.subpartitions.length; ++i) {
            ((PipelinedSubpartition)bufferWritingResultPartition.subpartitions[i]).acknowledgeAllDataProcessed();
            if (i < bufferWritingResultPartition.subpartitions.length - 1) {
                Assert.assertFalse((boolean)allRecordsProcessedFuture.isDone());
                continue;
            }
            Assert.assertTrue((boolean)allRecordsProcessedFuture.isDone());
            Assert.assertFalse((boolean)allRecordsProcessedFuture.isCompletedExceptionally());
        }
    }

    @Test
    public void testDifferentBufferSizeForSubpartitions() throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        ResultSubpartition[] subpartitions = bufferWritingResultPartition.subpartitions;
        Assert.assertEquals((long)2L, (long)subpartitions.length);
        PipelinedSubpartition subpartition0 = (PipelinedSubpartition)subpartitions[0];
        PipelinedSubpartition subpartition1 = (PipelinedSubpartition)subpartitions[1];
        subpartition0.bufferSize(10);
        subpartition1.bufferSize(6);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(2), 0);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(10), 0);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(2), 1);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(10), 1);
        Assert.assertEquals((long)10L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)2L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)6L, (long)subpartition1.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)6L, (long)subpartition1.pollBuffer().buffer().getSize());
        subpartition0.bufferSize(13);
        subpartition1.bufferSize(5);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(12), 0);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(8), 0);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(2), 1);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(7), 1);
        Assert.assertEquals((long)8L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)12L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)5L, (long)subpartition1.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)4L, (long)subpartition1.pollBuffer().buffer().getSize());
    }

    @Test
    public void testBufferSizeGreaterOrEqualToFirstRecord() throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        ResultSubpartition[] subpartitions = bufferWritingResultPartition.subpartitions;
        Assert.assertEquals((long)2L, (long)subpartitions.length);
        PipelinedSubpartition subpartition0 = (PipelinedSubpartition)subpartitions[0];
        PipelinedSubpartition subpartition1 = (PipelinedSubpartition)subpartitions[1];
        subpartition0.bufferSize(10);
        subpartition1.bufferSize(7);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(12), 0);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(111), 1);
        Assert.assertEquals((long)12L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)111L, (long)subpartition1.pollBuffer().buffer().getSize());
    }

    @Test
    public void testDynamicBufferSizeForBroadcast() throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        ResultSubpartition[] subpartitions = bufferWritingResultPartition.subpartitions;
        Assert.assertEquals((long)2L, (long)subpartitions.length);
        PipelinedSubpartition subpartition0 = (PipelinedSubpartition)subpartitions[0];
        PipelinedSubpartition subpartition1 = (PipelinedSubpartition)subpartitions[1];
        subpartition0.bufferSize(6);
        subpartition1.bufferSize(10);
        bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(6));
        Assert.assertEquals((long)6L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)6L, (long)subpartition1.pollBuffer().buffer().getSize());
        subpartition0.bufferSize(4);
        subpartition1.bufferSize(12);
        bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(3));
        bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(7));
        Assert.assertEquals((long)4L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)6L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)4L, (long)subpartition1.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)6L, (long)subpartition1.pollBuffer().buffer().getSize());
        subpartition0.bufferSize(8);
        subpartition1.bufferSize(5);
        bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(3));
        Assert.assertEquals((long)3L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)3L, (long)subpartition1.pollBuffer().buffer().getSize());
    }

    @Test
    public void testBufferSizeGreaterOrEqualToFirstBroadcastRecord() throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        ResultSubpartition[] subpartitions = bufferWritingResultPartition.subpartitions;
        PipelinedSubpartition subpartition0 = (PipelinedSubpartition)subpartitions[0];
        PipelinedSubpartition subpartition1 = (PipelinedSubpartition)subpartitions[1];
        subpartition0.bufferSize(6);
        subpartition1.bufferSize(10);
        bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(31));
        Assert.assertEquals((long)31L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)31L, (long)subpartition1.pollBuffer().buffer().getSize());
    }

    @Test
    public void testBufferSizeNotChanged() throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        ResultSubpartition[] subpartitions = bufferWritingResultPartition.subpartitions;
        Assert.assertEquals((long)2L, (long)subpartitions.length);
        PipelinedSubpartition subpartition0 = (PipelinedSubpartition)subpartitions[0];
        PipelinedSubpartition subpartition1 = (PipelinedSubpartition)subpartitions[1];
        subpartition0.bufferSize(1025);
        subpartition1.bufferSize(Integer.MAX_VALUE);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(1024), 1);
        Assert.assertEquals((long)1024L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)1024L, (long)subpartition1.pollBuffer().buffer().getSize());
    }

    @Test
    public void testNumBytesProducedCounterForUnicast() throws IOException {
        this.testNumBytesProducedCounter(false);
    }

    @Test
    public void testNumBytesProducedCounterForBroadcast() throws IOException {
        this.testNumBytesProducedCounter(true);
    }

    private void testNumBytesProducedCounter(boolean isBroadcast) throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.BLOCKING);
        if (isBroadcast) {
            bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(1024));
            Assert.assertEquals((long)1024L, (long)bufferWritingResultPartition.numBytesProduced.getCount());
            Assert.assertEquals((long)2048L, (long)bufferWritingResultPartition.numBytesOut.getCount());
        } else {
            bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
            Assert.assertEquals((long)1024L, (long)bufferWritingResultPartition.numBytesProduced.getCount());
            Assert.assertEquals((long)1024L, (long)bufferWritingResultPartition.numBytesOut.getCount());
        }
    }

    @Test
    public void testSizeOfQueuedBuffers() throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED);
        ResultSubpartition[] subpartitions = bufferWritingResultPartition.subpartitions;
        Assert.assertEquals((long)2L, (long)subpartitions.length);
        PipelinedSubpartition subpartition0 = (PipelinedSubpartition)subpartitions[0];
        PipelinedSubpartition subpartition1 = (PipelinedSubpartition)subpartitions[1];
        subpartition0.bufferSize(10);
        subpartition1.bufferSize(10);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(3), 0);
        Assert.assertEquals((long)3L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(3), 1);
        Assert.assertEquals((long)6L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(10), 0);
        Assert.assertEquals((long)16L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(10), 1);
        Assert.assertEquals((long)26L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
        bufferWritingResultPartition.broadcastEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, false);
        Assert.assertEquals((long)34L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(5), 0);
        Assert.assertEquals((long)39L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
        bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(7));
        Assert.assertEquals((long)53L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals((long)10L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)43L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals((long)10L, (long)subpartition1.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)33L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals((long)3L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)30L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals((long)3L, (long)subpartition1.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)27L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals((long)4L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)23L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals((long)4L, (long)subpartition1.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)19L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals((long)5L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)14L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals((long)7L, (long)subpartition0.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)7L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
        Assert.assertEquals((long)7L, (long)subpartition1.pollBuffer().buffer().getSize());
        Assert.assertEquals((long)0L, (long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe());
    }
}

