/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.File;
import java.util.Arrays;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.runtime.io.BarrierBuffer;
import org.apache.flink.streaming.runtime.io.MockInputGate;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class BarrierBufferTest {
    private static final int PAGE_SIZE = 512;
    private static int SIZE_COUNTER = 0;
    private static IOManager IO_MANAGER;

    @BeforeClass
    public static void setup() {
        IO_MANAGER = new IOManagerAsync();
        SIZE_COUNTER = 1;
    }

    @AfterClass
    public static void shutdownIOManager() {
        IO_MANAGER.shutdown();
    }

    @Test
    public void testSingleChannelNoBarriers() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createEndOfPartition(0)};
            MockInputGate gate = new MockInputGate(512, 1, Arrays.asList(sequence));
            BarrierBuffer buffer = new BarrierBuffer((InputGate)gate, IO_MANAGER);
            for (BufferOrEvent boe : sequence) {
                Assert.assertEquals((Object)boe, (Object)buffer.getNextNonBlocked());
            }
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            buffer.cleanup();
            BarrierBufferTest.checkNoTempFilesRemain();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultiChannelNoBarriers() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createEndOfPartition(0), BarrierBufferTest.createBuffer(3), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createEndOfPartition(3), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createEndOfPartition(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createEndOfPartition(2)};
            MockInputGate gate = new MockInputGate(512, 4, Arrays.asList(sequence));
            BarrierBuffer buffer = new BarrierBuffer((InputGate)gate, IO_MANAGER);
            for (BufferOrEvent boe : sequence) {
                Assert.assertEquals((Object)boe, (Object)buffer.getNextNonBlocked());
            }
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            buffer.cleanup();
            BarrierBufferTest.checkNoTempFilesRemain();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSingleChannelWithBarriers() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(1L, 0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(2L, 0), BarrierBufferTest.createBarrier(3L, 0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(4L, 0), BarrierBufferTest.createBarrier(5L, 0), BarrierBufferTest.createBarrier(6L, 0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createEndOfPartition(0)};
            MockInputGate gate = new MockInputGate(512, 1, Arrays.asList(sequence));
            BarrierBuffer buffer = new BarrierBuffer((InputGate)gate, IO_MANAGER);
            ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
            buffer.registerCheckpointEventHandler((EventListener)handler);
            handler.setNextExpectedCheckpointId(1L);
            for (BufferOrEvent boe : sequence) {
                if (!boe.isBuffer() && boe.getEvent().getClass() == CheckpointBarrier.class) continue;
                Assert.assertEquals((Object)boe, (Object)buffer.getNextNonBlocked());
            }
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            buffer.cleanup();
            BarrierBufferTest.checkNoTempFilesRemain();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultiChannelWithBarriers() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(1L, 1), BarrierBufferTest.createBarrier(1L, 2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(1L, 0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(2L, 0), BarrierBufferTest.createBarrier(2L, 1), BarrierBufferTest.createBarrier(2L, 2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(3L, 2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(3L, 0), BarrierBufferTest.createBarrier(3L, 1), BarrierBufferTest.createBarrier(4L, 1), BarrierBufferTest.createBarrier(4L, 2), BarrierBufferTest.createBarrier(4L, 0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(5L, 1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBarrier(5L, 2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBarrier(5L, 0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createEndOfPartition(0), BarrierBufferTest.createEndOfPartition(1), BarrierBufferTest.createEndOfPartition(2)};
            MockInputGate gate = new MockInputGate(512, 3, Arrays.asList(sequence));
            BarrierBuffer buffer = new BarrierBuffer((InputGate)gate, IO_MANAGER);
            ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
            buffer.registerCheckpointEventHandler((EventListener)handler);
            handler.setNextExpectedCheckpointId(1L);
            BarrierBufferTest.check(sequence[0], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[1], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[2], buffer.getNextNonBlocked());
            Assert.assertEquals((long)1L, (long)handler.getNextExpectedCheckpointId());
            BarrierBufferTest.check(sequence[7], buffer.getNextNonBlocked());
            Assert.assertEquals((long)1L, (long)handler.getNextExpectedCheckpointId());
            BarrierBufferTest.check(sequence[5], buffer.getNextNonBlocked());
            Assert.assertEquals((long)2L, (long)handler.getNextExpectedCheckpointId());
            BarrierBufferTest.check(sequence[6], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[9], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[10], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[11], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[12], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[13], buffer.getNextNonBlocked());
            Assert.assertEquals((long)2L, (long)handler.getNextExpectedCheckpointId());
            BarrierBufferTest.check(sequence[17], buffer.getNextNonBlocked());
            Assert.assertEquals((long)3L, (long)handler.getNextExpectedCheckpointId());
            BarrierBufferTest.check(sequence[18], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[20], buffer.getNextNonBlocked());
            Assert.assertEquals((long)4L, (long)handler.getNextExpectedCheckpointId());
            BarrierBufferTest.check(sequence[21], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[27], buffer.getNextNonBlocked());
            Assert.assertEquals((long)5L, (long)handler.getNextExpectedCheckpointId());
            BarrierBufferTest.check(sequence[28], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[29], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[31], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[32], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[33], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[37], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[34], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[36], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[38], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[39], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[41], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[42], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[43], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[44], buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            buffer.cleanup();
            BarrierBufferTest.checkNoTempFilesRemain();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultiChannelTrailingBlockedData() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(1L, 1), BarrierBufferTest.createBarrier(1L, 2), BarrierBufferTest.createBarrier(1L, 0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(2L, 1), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createEndOfPartition(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(2L, 2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createEndOfPartition(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createEndOfPartition(0)};
            MockInputGate gate = new MockInputGate(512, 3, Arrays.asList(sequence));
            BarrierBuffer buffer = new BarrierBuffer((InputGate)gate, IO_MANAGER);
            ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
            buffer.registerCheckpointEventHandler((EventListener)handler);
            handler.setNextExpectedCheckpointId(1L);
            BarrierBufferTest.check(sequence[0], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[1], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[2], buffer.getNextNonBlocked());
            Assert.assertEquals((long)1L, (long)handler.getNextExpectedCheckpointId());
            BarrierBufferTest.check(sequence[6], buffer.getNextNonBlocked());
            Assert.assertEquals((long)2L, (long)handler.getNextExpectedCheckpointId());
            BarrierBufferTest.check(sequence[7], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[8], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[13], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[14], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[18], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[19], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[10], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[11], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[12], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[16], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[17], buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            buffer.cleanup();
            BarrierBufferTest.checkNoTempFilesRemain();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultiChannelWithQueuedFutureBarriers() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(1L, 1), BarrierBufferTest.createBarrier(1L, 2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(1L, 0), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(2L, 1), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(2L, 0), BarrierBufferTest.createBarrier(3L, 0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(3L, 1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(4L, 1), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(2L, 2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(4L, 0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(5L, 1), BarrierBufferTest.createBarrier(3L, 2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(6L, 1), BarrierBufferTest.createBarrier(4L, 2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createEndOfPartition(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createEndOfPartition(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createEndOfPartition(0)};
            MockInputGate gate = new MockInputGate(512, 3, Arrays.asList(sequence));
            BarrierBuffer buffer = new BarrierBuffer((InputGate)gate, IO_MANAGER);
            ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
            buffer.registerCheckpointEventHandler((EventListener)handler);
            handler.setNextExpectedCheckpointId(1L);
            BarrierBufferTest.check(sequence[0], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[1], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[2], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[7], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[5], buffer.getNextNonBlocked());
            Assert.assertEquals((long)2L, (long)handler.getNextExpectedCheckpointId());
            BarrierBufferTest.check(sequence[6], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[9], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[10], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[13], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[20], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[23], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[12], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[25], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[27], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[30], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[32], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[16], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[18], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[19], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[28], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[36], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[38], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[22], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[26], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[31], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[33], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[39], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[42], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[45], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[46], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[37], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[47], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[48], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[43], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[44], buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            buffer.cleanup();
            BarrierBufferTest.checkNoTempFilesRemain();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultiChannelSkippingCheckpoints() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(1L, 1), BarrierBufferTest.createBarrier(1L, 2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(1L, 0), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(2L, 1), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(2L, 0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(3L, 2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createEndOfPartition(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createEndOfPartition(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createEndOfPartition(0)};
            MockInputGate gate = new MockInputGate(512, 3, Arrays.asList(sequence));
            BarrierBuffer buffer = new BarrierBuffer((InputGate)gate, IO_MANAGER);
            ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
            buffer.registerCheckpointEventHandler((EventListener)handler);
            handler.setNextExpectedCheckpointId(1L);
            BarrierBufferTest.check(sequence[0], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[1], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[2], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[7], buffer.getNextNonBlocked());
            Assert.assertEquals((long)1L, (long)buffer.getCurrentCheckpointId());
            BarrierBufferTest.check(sequence[5], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[6], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[9], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[10], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[13], buffer.getNextNonBlocked());
            Assert.assertEquals((long)2L, (long)buffer.getCurrentCheckpointId());
            BarrierBufferTest.check(sequence[15], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[12], buffer.getNextNonBlocked());
            Assert.assertEquals((long)3L, (long)buffer.getCurrentCheckpointId());
            BarrierBufferTest.check(sequence[16], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[19], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[20], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[18], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[21], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[22], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[23], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[24], buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            buffer.cleanup();
            BarrierBufferTest.checkNoTempFilesRemain();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultiChannelJumpingOverCheckpoint() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(1L, 1), BarrierBufferTest.createBarrier(1L, 2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(1L, 0), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(2L, 1), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(2L, 0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(3L, 1), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(3L, 0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(4L, 2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createEndOfPartition(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createEndOfPartition(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createEndOfPartition(0)};
            MockInputGate gate = new MockInputGate(512, 3, Arrays.asList(sequence));
            BarrierBuffer buffer = new BarrierBuffer((InputGate)gate, IO_MANAGER);
            ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
            buffer.registerCheckpointEventHandler((EventListener)handler);
            handler.setNextExpectedCheckpointId(1L);
            BarrierBufferTest.check(sequence[0], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[1], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[2], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[7], buffer.getNextNonBlocked());
            Assert.assertEquals((long)1L, (long)buffer.getCurrentCheckpointId());
            BarrierBufferTest.check(sequence[5], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[6], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[9], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[10], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[13], buffer.getNextNonBlocked());
            Assert.assertEquals((long)2L, (long)buffer.getCurrentCheckpointId());
            BarrierBufferTest.check(sequence[15], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[19], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[21], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[12], buffer.getNextNonBlocked());
            Assert.assertEquals((long)4L, (long)buffer.getCurrentCheckpointId());
            BarrierBufferTest.check(sequence[16], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[18], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[22], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[25], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[26], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[24], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[27], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[28], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[29], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[30], buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            buffer.cleanup();
            BarrierBufferTest.checkNoTempFilesRemain();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultiChannelSkippingCheckpointsViaBlockedInputs() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(1L, 1), BarrierBufferTest.createBarrier(1L, 2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(1L, 0), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(2L, 1), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(2L, 0), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(3L, 0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(4L, 1), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(2L, 2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(3L, 2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(4L, 0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(4L, 2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createEndOfPartition(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createEndOfPartition(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createEndOfPartition(0)};
            MockInputGate gate = new MockInputGate(512, 3, Arrays.asList(sequence));
            BarrierBuffer buffer = new BarrierBuffer((InputGate)gate, IO_MANAGER);
            BarrierBufferTest.check(sequence[0], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[1], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[2], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[7], buffer.getNextNonBlocked());
            Assert.assertEquals((long)1L, (long)buffer.getCurrentCheckpointId());
            BarrierBufferTest.check(sequence[5], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[6], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[9], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[10], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[13], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[22], buffer.getNextNonBlocked());
            Assert.assertEquals((long)2L, (long)buffer.getCurrentCheckpointId());
            BarrierBufferTest.check(sequence[12], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[15], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[16], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[18], buffer.getNextNonBlocked());
            Assert.assertEquals((long)4L, (long)buffer.getCurrentCheckpointId());
            BarrierBufferTest.check(sequence[21], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[24], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[26], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[30], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[20], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[28], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[29], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[32], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[33], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[34], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[35], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[36], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[37], buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            buffer.cleanup();
            BarrierBufferTest.checkNoTempFilesRemain();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testEarlyCleanup() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(1L, 1), BarrierBufferTest.createBarrier(1L, 2), BarrierBufferTest.createBarrier(1L, 0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(2L, 1), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createBuffer(1), BarrierBufferTest.createEndOfPartition(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createBarrier(2L, 2), BarrierBufferTest.createBuffer(2), BarrierBufferTest.createEndOfPartition(2), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createEndOfPartition(0)};
            MockInputGate gate = new MockInputGate(512, 3, Arrays.asList(sequence));
            BarrierBuffer buffer = new BarrierBuffer((InputGate)gate, IO_MANAGER);
            ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
            buffer.registerCheckpointEventHandler((EventListener)handler);
            handler.setNextExpectedCheckpointId(1L);
            BarrierBufferTest.check(sequence[0], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[1], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[2], buffer.getNextNonBlocked());
            Assert.assertEquals((long)1L, (long)handler.getNextExpectedCheckpointId());
            BarrierBufferTest.check(sequence[6], buffer.getNextNonBlocked());
            Assert.assertEquals((long)2L, (long)handler.getNextExpectedCheckpointId());
            BarrierBufferTest.check(sequence[7], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[8], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[13], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[14], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[18], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[19], buffer.getNextNonBlocked());
            buffer.getNextNonBlocked();
            buffer.cleanup();
            BarrierBufferTest.checkNoTempFilesRemain();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testStartAlignmentWithClosedChannels() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierBufferTest.createEndOfPartition(2), BarrierBufferTest.createEndOfPartition(1), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(3), BarrierBufferTest.createBarrier(2L, 3), BarrierBufferTest.createBarrier(2L, 0), BarrierBufferTest.createBuffer(3), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(3L, 3), BarrierBufferTest.createBuffer(3), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBarrier(3L, 0), BarrierBufferTest.createBarrier(4L, 0), BarrierBufferTest.createBarrier(4L, 3), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(0), BarrierBufferTest.createBuffer(3), BarrierBufferTest.createEndOfPartition(0), BarrierBufferTest.createBuffer(3), BarrierBufferTest.createBarrier(5L, 3), BarrierBufferTest.createBuffer(3), BarrierBufferTest.createEndOfPartition(3)};
            MockInputGate gate = new MockInputGate(512, 4, Arrays.asList(sequence));
            BarrierBuffer buffer = new BarrierBuffer((InputGate)gate, IO_MANAGER);
            BarrierBufferTest.check(sequence[0], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[1], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[2], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[3], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[4], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[7], buffer.getNextNonBlocked());
            Assert.assertEquals((long)2L, (long)buffer.getCurrentCheckpointId());
            BarrierBufferTest.check(sequence[8], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[11], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[10], buffer.getNextNonBlocked());
            Assert.assertEquals((long)3L, (long)buffer.getCurrentCheckpointId());
            BarrierBufferTest.check(sequence[15], buffer.getNextNonBlocked());
            Assert.assertEquals((long)4L, (long)buffer.getCurrentCheckpointId());
            BarrierBufferTest.check(sequence[16], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[17], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[18], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[19], buffer.getNextNonBlocked());
            BarrierBufferTest.check(sequence[21], buffer.getNextNonBlocked());
            Assert.assertEquals((long)5L, (long)buffer.getCurrentCheckpointId());
            BarrierBufferTest.check(sequence[22], buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            Assert.assertNull((Object)buffer.getNextNonBlocked());
            buffer.cleanup();
            BarrierBufferTest.checkNoTempFilesRemain();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testEndOfStreamWhileCheckpoint() {
    }

    private static BufferOrEvent createBarrier(long id, int channel) {
        return new BufferOrEvent((AbstractEvent)new CheckpointBarrier(id, System.currentTimeMillis()), channel);
    }

    private static BufferOrEvent createBuffer(int channel) {
        Buffer buf = new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)512), FreeingBufferRecycler.INSTANCE);
        buf.setSize(SIZE_COUNTER++);
        return new BufferOrEvent(buf, channel);
    }

    private static BufferOrEvent createEndOfPartition(int channel) {
        return new BufferOrEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, channel);
    }

    private static void check(BufferOrEvent expected, BufferOrEvent present) {
        Assert.assertNotNull((Object)expected);
        Assert.assertNotNull((Object)present);
        Assert.assertEquals((Object)expected.isBuffer(), (Object)present.isBuffer());
        if (expected.isBuffer()) {
            Assert.assertEquals((long)expected.getBuffer().getSize(), (long)present.getBuffer().getSize());
        } else {
            Assert.assertEquals((Object)expected.getEvent(), (Object)present.getEvent());
        }
    }

    private static void checkNoTempFilesRemain() {
        for (File dir : IO_MANAGER.getSpillingDirectories()) {
            for (String file : dir.list()) {
                if (file == null || file.equals(".") || file.equals("..")) continue;
                Assert.fail((String)("barrier buffer did not clean up temp files. remaining file: " + file));
            }
        }
    }

    private static class ValidatingCheckpointHandler
    implements EventListener<CheckpointBarrier> {
        private long nextExpectedCheckpointId = -1L;

        private ValidatingCheckpointHandler() {
        }

        public void setNextExpectedCheckpointId(long nextExpectedCheckpointId) {
            this.nextExpectedCheckpointId = nextExpectedCheckpointId;
        }

        public long getNextExpectedCheckpointId() {
            return this.nextExpectedCheckpointId;
        }

        public void onEvent(CheckpointBarrier barrier) {
            Assert.assertNotNull((Object)barrier);
            Assert.assertTrue((String)"wrong checkpoint id", (this.nextExpectedCheckpointId == -1L || this.nextExpectedCheckpointId == barrier.getId() ? 1 : 0) != 0);
            Assert.assertTrue((barrier.getTimestamp() > 0L ? 1 : 0) != 0);
            ++this.nextExpectedCheckpointId;
        }
    }
}

