package org.apache.flink.streaming.runtime.io;

import java.io.File;
import java.util.Arrays;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.util.event.EventListener;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/BarrierBufferTest.class */
public class BarrierBufferTest {
    private static final int PAGE_SIZE = 512;
    private static int SIZE_COUNTER = 0;
    private static IOManager IO_MANAGER;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/BarrierBufferTest$ValidatingCheckpointHandler.class */
    private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier> {
        private long nextExpectedCheckpointId;

        private ValidatingCheckpointHandler() {
            this.nextExpectedCheckpointId = -1L;
        }

        public void setNextExpectedCheckpointId(long j) {
            this.nextExpectedCheckpointId = j;
        }

        public long getNextExpectedCheckpointId() {
            return this.nextExpectedCheckpointId;
        }

        public void onEvent(CheckpointBarrier checkpointBarrier) {
            Assert.assertNotNull(checkpointBarrier);
            Assert.assertTrue("wrong checkpoint id", this.nextExpectedCheckpointId == -1 || this.nextExpectedCheckpointId == checkpointBarrier.getId());
            Assert.assertTrue(checkpointBarrier.getTimestamp() > 0);
            this.nextExpectedCheckpointId++;
        }
    }

    @BeforeClass
    public static void setup() {
        IO_MANAGER = new IOManagerAsync();
        SIZE_COUNTER = 1;
    }

    @AfterClass
    public static void shutdownIOManager() {
        IO_MANAGER.shutdown();
    }

    @Test
    public void testSingleChannelNoBarriers() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(0), createBuffer(0), createEndOfPartition(0)};
            BarrierBuffer barrierBuffer = new BarrierBuffer(new MockInputGate(PAGE_SIZE, 1, Arrays.asList(bufferOrEventArr)), IO_MANAGER);
            for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
                Assert.assertEquals(bufferOrEvent, barrierBuffer.getNextNonBlocked());
            }
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            barrierBuffer.cleanup();
            checkNoTempFilesRemain();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultiChannelNoBarriers() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(2), createBuffer(2), createBuffer(0), createBuffer(1), createBuffer(0), createEndOfPartition(0), createBuffer(3), createBuffer(1), createEndOfPartition(3), createBuffer(1), createEndOfPartition(1), createBuffer(2), createEndOfPartition(2)};
            BarrierBuffer barrierBuffer = new BarrierBuffer(new MockInputGate(PAGE_SIZE, 4, Arrays.asList(bufferOrEventArr)), IO_MANAGER);
            for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
                Assert.assertEquals(bufferOrEvent, barrierBuffer.getNextNonBlocked());
            }
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            barrierBuffer.cleanup();
            checkNoTempFilesRemain();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSingleChannelWithBarriers() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(0), createBuffer(0), createBarrier(1L, 0), createBuffer(0), createBuffer(0), createBuffer(0), createBuffer(0), createBarrier(2L, 0), createBarrier(3L, 0), createBuffer(0), createBuffer(0), createBarrier(4L, 0), createBarrier(5L, 0), createBarrier(6L, 0), createBuffer(0), createEndOfPartition(0)};
            BarrierBuffer barrierBuffer = new BarrierBuffer(new MockInputGate(PAGE_SIZE, 1, Arrays.asList(bufferOrEventArr)), IO_MANAGER);
            ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
            barrierBuffer.registerCheckpointEventHandler(validatingCheckpointHandler);
            validatingCheckpointHandler.setNextExpectedCheckpointId(1L);
            for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
                if (bufferOrEvent.isBuffer() || bufferOrEvent.getEvent().getClass() != CheckpointBarrier.class) {
                    Assert.assertEquals(bufferOrEvent, barrierBuffer.getNextNonBlocked());
                }
            }
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            barrierBuffer.cleanup();
            checkNoTempFilesRemain();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultiChannelWithBarriers() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(1L, 0), createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2), createBarrier(2L, 0), createBarrier(2L, 1), createBarrier(2L, 2), createBuffer(2), createBuffer(2), createBarrier(3L, 2), createBuffer(2), createBuffer(2), createBarrier(3L, 0), createBarrier(3L, 1), createBarrier(4L, 1), createBarrier(4L, 2), createBarrier(4L, 0), createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(5L, 1), createBuffer(2), createBuffer(0), createBuffer(2), createBuffer(1), createBarrier(5L, 2), createBuffer(1), createBuffer(0), createBuffer(2), createBuffer(1), createBarrier(5L, 0), createBuffer(0), createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2)};
            BarrierBuffer barrierBuffer = new BarrierBuffer(new MockInputGate(PAGE_SIZE, 3, Arrays.asList(bufferOrEventArr)), IO_MANAGER);
            ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
            barrierBuffer.registerCheckpointEventHandler(validatingCheckpointHandler);
            validatingCheckpointHandler.setNextExpectedCheckpointId(1L);
            check(bufferOrEventArr[0], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[1], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[2], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(1L, validatingCheckpointHandler.getNextExpectedCheckpointId());
            check(bufferOrEventArr[7], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(1L, validatingCheckpointHandler.getNextExpectedCheckpointId());
            check(bufferOrEventArr[5], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(2L, validatingCheckpointHandler.getNextExpectedCheckpointId());
            check(bufferOrEventArr[6], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[9], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[10], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[11], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[12], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[13], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(2L, validatingCheckpointHandler.getNextExpectedCheckpointId());
            check(bufferOrEventArr[17], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(3L, validatingCheckpointHandler.getNextExpectedCheckpointId());
            check(bufferOrEventArr[18], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[20], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(4L, validatingCheckpointHandler.getNextExpectedCheckpointId());
            check(bufferOrEventArr[21], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[27], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(5L, validatingCheckpointHandler.getNextExpectedCheckpointId());
            check(bufferOrEventArr[28], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[29], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[31], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[32], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[33], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[37], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[34], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[36], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[38], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[39], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[41], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[42], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[43], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[44], barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            barrierBuffer.cleanup();
            checkNoTempFilesRemain();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultiChannelTrailingBlockedData() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(1L, 1), createBarrier(1L, 2), createBarrier(1L, 0), createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(2L, 1), createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2), createBarrier(2L, 2), createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)};
            BarrierBuffer barrierBuffer = new BarrierBuffer(new MockInputGate(PAGE_SIZE, 3, Arrays.asList(bufferOrEventArr)), IO_MANAGER);
            ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
            barrierBuffer.registerCheckpointEventHandler(validatingCheckpointHandler);
            validatingCheckpointHandler.setNextExpectedCheckpointId(1L);
            check(bufferOrEventArr[0], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[1], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[2], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(1L, validatingCheckpointHandler.getNextExpectedCheckpointId());
            check(bufferOrEventArr[6], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(2L, validatingCheckpointHandler.getNextExpectedCheckpointId());
            check(bufferOrEventArr[7], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[8], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[13], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[14], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[18], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[19], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[10], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[11], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[12], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[16], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[17], barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            barrierBuffer.cleanup();
            checkNoTempFilesRemain();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultiChannelWithQueuedFutureBarriers() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(1L, 0), createBuffer(1), createBuffer(0), createBarrier(2L, 1), createBuffer(1), createBuffer(2), createBarrier(2L, 0), createBarrier(3L, 0), createBuffer(0), createBarrier(3L, 1), createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(4L, 1), createBuffer(1), createBuffer(2), createBarrier(2L, 2), createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0), createBarrier(4L, 0), createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0), createBarrier(5L, 1), createBarrier(3L, 2), createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0), createBarrier(6L, 1), createBarrier(4L, 2), createBuffer(2), createBuffer(1), createEndOfPartition(1), createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)};
            BarrierBuffer barrierBuffer = new BarrierBuffer(new MockInputGate(PAGE_SIZE, 3, Arrays.asList(bufferOrEventArr)), IO_MANAGER);
            ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
            barrierBuffer.registerCheckpointEventHandler(validatingCheckpointHandler);
            validatingCheckpointHandler.setNextExpectedCheckpointId(1L);
            check(bufferOrEventArr[0], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[1], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[2], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[7], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[5], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(2L, validatingCheckpointHandler.getNextExpectedCheckpointId());
            check(bufferOrEventArr[6], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[9], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[10], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[13], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[20], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[23], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[12], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[25], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[27], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[30], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[32], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[16], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[18], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[19], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[28], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[36], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[38], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[22], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[26], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[31], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[33], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[39], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[42], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[45], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[46], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[37], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[47], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[48], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[43], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[44], barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            barrierBuffer.cleanup();
            checkNoTempFilesRemain();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultiChannelSkippingCheckpoints() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(1L, 0), createBuffer(1), createBuffer(0), createBarrier(2L, 1), createBuffer(1), createBuffer(2), createBarrier(2L, 0), createBuffer(2), createBuffer(0), createBarrier(3L, 2), createBuffer(2), createBuffer(1), createEndOfPartition(1), createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)};
            BarrierBuffer barrierBuffer = new BarrierBuffer(new MockInputGate(PAGE_SIZE, 3, Arrays.asList(bufferOrEventArr)), IO_MANAGER);
            ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
            barrierBuffer.registerCheckpointEventHandler(validatingCheckpointHandler);
            validatingCheckpointHandler.setNextExpectedCheckpointId(1L);
            check(bufferOrEventArr[0], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[1], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[2], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[7], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(1L, barrierBuffer.getCurrentCheckpointId());
            check(bufferOrEventArr[5], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[6], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[9], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[10], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[13], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(2L, barrierBuffer.getCurrentCheckpointId());
            check(bufferOrEventArr[15], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[12], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(3L, barrierBuffer.getCurrentCheckpointId());
            check(bufferOrEventArr[16], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[19], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[20], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[18], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[21], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[22], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[23], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[24], barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            barrierBuffer.cleanup();
            checkNoTempFilesRemain();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultiChannelJumpingOverCheckpoint() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(1L, 0), createBuffer(1), createBuffer(0), createBarrier(2L, 1), createBuffer(1), createBuffer(2), createBarrier(2L, 0), createBuffer(2), createBuffer(0), createBarrier(3L, 1), createBuffer(1), createBuffer(2), createBarrier(3L, 0), createBuffer(2), createBuffer(0), createBarrier(4L, 2), createBuffer(2), createBuffer(1), createEndOfPartition(1), createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)};
            BarrierBuffer barrierBuffer = new BarrierBuffer(new MockInputGate(PAGE_SIZE, 3, Arrays.asList(bufferOrEventArr)), IO_MANAGER);
            ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
            barrierBuffer.registerCheckpointEventHandler(validatingCheckpointHandler);
            validatingCheckpointHandler.setNextExpectedCheckpointId(1L);
            check(bufferOrEventArr[0], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[1], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[2], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[7], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(1L, barrierBuffer.getCurrentCheckpointId());
            check(bufferOrEventArr[5], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[6], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[9], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[10], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[13], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(2L, barrierBuffer.getCurrentCheckpointId());
            check(bufferOrEventArr[15], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[19], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[21], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[12], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(4L, barrierBuffer.getCurrentCheckpointId());
            check(bufferOrEventArr[16], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[18], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[22], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[25], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[26], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[24], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[27], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[28], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[29], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[30], barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            barrierBuffer.cleanup();
            checkNoTempFilesRemain();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultiChannelSkippingCheckpointsViaBlockedInputs() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(1L, 0), createBuffer(1), createBuffer(0), createBarrier(2L, 1), createBuffer(1), createBuffer(2), createBarrier(2L, 0), createBuffer(1), createBuffer(0), createBarrier(3L, 0), createBuffer(0), createBarrier(4L, 1), createBuffer(1), createBuffer(0), createBuffer(2), createBarrier(2L, 2), createBuffer(0), createBarrier(3L, 2), createBuffer(2), createBarrier(4L, 0), createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(4L, 2), createBuffer(1), createEndOfPartition(1), createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)};
            BarrierBuffer barrierBuffer = new BarrierBuffer(new MockInputGate(PAGE_SIZE, 3, Arrays.asList(bufferOrEventArr)), IO_MANAGER);
            check(bufferOrEventArr[0], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[1], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[2], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[7], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(1L, barrierBuffer.getCurrentCheckpointId());
            check(bufferOrEventArr[5], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[6], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[9], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[10], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[13], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[22], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(2L, barrierBuffer.getCurrentCheckpointId());
            check(bufferOrEventArr[12], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[15], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[16], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[18], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(4L, barrierBuffer.getCurrentCheckpointId());
            check(bufferOrEventArr[21], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[24], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[26], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[30], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[20], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[28], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[29], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[32], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[33], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[34], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[35], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[36], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[37], barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            barrierBuffer.cleanup();
            checkNoTempFilesRemain();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testEarlyCleanup() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(1L, 1), createBarrier(1L, 2), createBarrier(1L, 0), createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(2L, 1), createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2), createBarrier(2L, 2), createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)};
            BarrierBuffer barrierBuffer = new BarrierBuffer(new MockInputGate(PAGE_SIZE, 3, Arrays.asList(bufferOrEventArr)), IO_MANAGER);
            ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
            barrierBuffer.registerCheckpointEventHandler(validatingCheckpointHandler);
            validatingCheckpointHandler.setNextExpectedCheckpointId(1L);
            check(bufferOrEventArr[0], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[1], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[2], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(1L, validatingCheckpointHandler.getNextExpectedCheckpointId());
            check(bufferOrEventArr[6], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(2L, validatingCheckpointHandler.getNextExpectedCheckpointId());
            check(bufferOrEventArr[7], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[8], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[13], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[14], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[18], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[19], barrierBuffer.getNextNonBlocked());
            barrierBuffer.getNextNonBlocked();
            barrierBuffer.cleanup();
            checkNoTempFilesRemain();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testStartAlignmentWithClosedChannels() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createEndOfPartition(2), createEndOfPartition(1), createBuffer(0), createBuffer(0), createBuffer(3), createBarrier(2L, 3), createBarrier(2L, 0), createBuffer(3), createBuffer(0), createBarrier(3L, 3), createBuffer(3), createBuffer(0), createBarrier(3L, 0), createBarrier(4L, 0), createBarrier(4L, 3), createBuffer(0), createBuffer(0), createBuffer(3), createEndOfPartition(0), createBuffer(3), createBarrier(5L, 3), createBuffer(3), createEndOfPartition(3)};
            BarrierBuffer barrierBuffer = new BarrierBuffer(new MockInputGate(PAGE_SIZE, 4, Arrays.asList(bufferOrEventArr)), IO_MANAGER);
            check(bufferOrEventArr[0], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[1], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[2], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[3], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[4], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[7], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(2L, barrierBuffer.getCurrentCheckpointId());
            check(bufferOrEventArr[8], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[11], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[10], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(3L, barrierBuffer.getCurrentCheckpointId());
            check(bufferOrEventArr[15], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(4L, barrierBuffer.getCurrentCheckpointId());
            check(bufferOrEventArr[16], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[17], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[18], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[19], barrierBuffer.getNextNonBlocked());
            check(bufferOrEventArr[21], barrierBuffer.getNextNonBlocked());
            Assert.assertEquals(5L, barrierBuffer.getCurrentCheckpointId());
            check(bufferOrEventArr[22], barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            Assert.assertNull(barrierBuffer.getNextNonBlocked());
            barrierBuffer.cleanup();
            checkNoTempFilesRemain();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testEndOfStreamWhileCheckpoint() {
    }

    private static BufferOrEvent createBarrier(long j, int i) {
        return new BufferOrEvent(new CheckpointBarrier(j, System.currentTimeMillis()), i);
    }

    private static BufferOrEvent createBuffer(int i) {
        Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE), FreeingBufferRecycler.INSTANCE);
        int i2 = SIZE_COUNTER;
        SIZE_COUNTER = i2 + 1;
        buffer.setSize(i2);
        return new BufferOrEvent(buffer, i);
    }

    private static BufferOrEvent createEndOfPartition(int i) {
        return new BufferOrEvent(EndOfPartitionEvent.INSTANCE, i);
    }

    private static void check(BufferOrEvent bufferOrEvent, BufferOrEvent bufferOrEvent2) {
        Assert.assertNotNull(bufferOrEvent);
        Assert.assertNotNull(bufferOrEvent2);
        Assert.assertEquals(Boolean.valueOf(bufferOrEvent.isBuffer()), Boolean.valueOf(bufferOrEvent2.isBuffer()));
        if (bufferOrEvent.isBuffer()) {
            Assert.assertEquals(bufferOrEvent.getBuffer().getSize(), bufferOrEvent2.getBuffer().getSize());
        } else {
            Assert.assertEquals(bufferOrEvent.getEvent(), bufferOrEvent2.getEvent());
        }
    }

    private static void checkNoTempFilesRemain() {
        for (File file : IO_MANAGER.getSpillingDirectories()) {
            for (String str : file.list()) {
                if (str != null && !str.equals(".") && !str.equals("..")) {
                    Assert.fail("barrier buffer did not clean up temp files. remaining file: " + str);
                }
            }
        }
    }
}
