/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.Future;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyCheckpointInvokable;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.MockInputGate;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public abstract class CheckpointBarrierAlignerTestBase {
    protected static final int PAGE_SIZE = 512;
    private static final Random RND = new Random();
    private static int sizeCounter = 1;
    CheckpointedInputGate inputGate;
    static long testStartTimeNanos;
    private MockInputGate mockInputGate;

    @Before
    public void setUp() {
        testStartTimeNanos = System.nanoTime();
    }

    protected CheckpointedInputGate createBarrierBuffer(int numberOfChannels, BufferOrEvent[] sequence, AbstractInvokable toNotify) throws IOException {
        this.mockInputGate = new MockInputGate(numberOfChannels, Arrays.asList(sequence));
        return this.createBarrierBuffer(this.mockInputGate, toNotify);
    }

    protected CheckpointedInputGate createBarrierBuffer(int numberOfChannels, BufferOrEvent[] sequence) throws IOException {
        return this.createBarrierBuffer(numberOfChannels, sequence, (AbstractInvokable)new DummyCheckpointInvokable());
    }

    abstract CheckpointedInputGate createBarrierBuffer(InputGate var1, AbstractInvokable var2) throws IOException;

    @After
    public void ensureEmpty() throws Exception {
        Assert.assertFalse((boolean)this.inputGate.pollNext().isPresent());
        Assert.assertTrue((boolean)this.inputGate.isFinished());
        this.inputGate.close();
    }

    @Test
    public void testSingleChannelNoBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createEndOfPartition(0)};
        this.inputGate = this.createBarrierBuffer(1, sequence);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
        Assert.assertEquals((long)0L, (long)this.inputGate.getAlignmentDurationNanos());
    }

    @Test
    public void testMultiChannelNoBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createEndOfPartition(0), CheckpointBarrierAlignerTestBase.createBuffer(3), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createEndOfPartition(3), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createEndOfPartition(1), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createEndOfPartition(2)};
        this.inputGate = this.createBarrierBuffer(4, sequence);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
        Assert.assertEquals((long)0L, (long)this.inputGate.getAlignmentDurationNanos());
    }

    @Test
    public void testSingleChannelWithBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBarrier(1L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBarrier(2L, 0), CheckpointBarrierAlignerTestBase.createBarrier(3L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBarrier(4L, 0), CheckpointBarrierAlignerTestBase.createBarrier(5L, 0), CheckpointBarrierAlignerTestBase.createBarrier(6L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createEndOfPartition(0)};
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
        this.inputGate = this.createBarrierBuffer(1, sequence, handler);
        handler.setNextExpectedCheckpointId(1L);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testMultiChannelWithBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBarrier(1L, 1), CheckpointBarrierAlignerTestBase.createBarrier(1L, 2), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBarrier(1L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBarrier(2L, 0), CheckpointBarrierAlignerTestBase.createBarrier(2L, 1), CheckpointBarrierAlignerTestBase.createBarrier(2L, 2), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBarrier(3L, 2), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBarrier(3L, 0), CheckpointBarrierAlignerTestBase.createBarrier(3L, 1), CheckpointBarrierAlignerTestBase.createBarrier(4L, 1), CheckpointBarrierAlignerTestBase.createBarrier(4L, 2), CheckpointBarrierAlignerTestBase.createBarrier(4L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createEndOfPartition(0), CheckpointBarrierAlignerTestBase.createEndOfPartition(1), CheckpointBarrierAlignerTestBase.createEndOfPartition(2)};
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
        this.inputGate = this.createBarrierBuffer(3, sequence, handler);
        handler.setNextExpectedCheckpointId(1L);
        CheckpointBarrierAlignerTestBase.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)1L, (long)handler.getNextExpectedCheckpointId());
        long startTs = System.nanoTime();
        CheckpointBarrierAlignerTestBase.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[4], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[5], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)2L, (long)handler.getNextExpectedCheckpointId());
        CheckpointBarrierAlignerTestBase.validateAlignmentTime(startTs, this.inputGate);
        Object[] expectedUnblockedChannels1 = new Integer[]{0, 1, 2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels1, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[7], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[8], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[10], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[11], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)2L, (long)handler.getNextExpectedCheckpointId());
        startTs = System.nanoTime();
        CheckpointBarrierAlignerTestBase.check(sequence[12], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[13], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[14], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)3L, (long)handler.getNextExpectedCheckpointId());
        CheckpointBarrierAlignerTestBase.validateAlignmentTime(startTs, this.inputGate);
        Object[] expectedUnblockedChannels2 = new Integer[]{0, 1, 2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels2, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[15], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[16], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[17], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[18], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[19], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[20], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[21], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)4L, (long)handler.getNextExpectedCheckpointId());
        Object[] expectedUnblockedChannels3 = new Integer[]{0, 1, 2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels3, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[22], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[23], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[24], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)5L, (long)handler.getNextExpectedCheckpointId());
        Object[] expectedUnblockedChannels4 = new Integer[]{0, 1, 2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels4, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[25], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[26], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[27], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[28], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
    }

    @Test
    public void testMultiChannelJumpingOverCheckpoint() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBarrier(1L, 1), CheckpointBarrierAlignerTestBase.createBarrier(1L, 2), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBarrier(1L, 0), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBarrier(2L, 1), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBarrier(2L, 0), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBarrier(3L, 2), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBarrier(3L, 0), CheckpointBarrierAlignerTestBase.createBarrier(4L, 1), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createEndOfPartition(0), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createEndOfPartition(2), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createEndOfPartition(1)};
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
        this.inputGate = this.createBarrierBuffer(3, sequence, handler);
        handler.setNextExpectedCheckpointId(1L);
        CheckpointBarrierAlignerTestBase.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)1L, (long)this.inputGate.getLatestCheckpointId());
        CheckpointBarrierAlignerTestBase.check(sequence[4], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[5], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Object[] expectedUnblockedChannels1 = new Integer[]{0, 1, 2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels1, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[7], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[8], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)2L, (long)this.inputGate.getLatestCheckpointId());
        CheckpointBarrierAlignerTestBase.check(sequence[10], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[11], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[12], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[13], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)3L, (long)this.inputGate.getLatestCheckpointId());
        Object[] expectedUnblockedChannels2 = new Integer[]{0, 1};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels2, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[14], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[15], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[16], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[17], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)4L, (long)this.inputGate.getLatestCheckpointId());
        Object[] expectedUnblockedChannels3 = new Integer[]{0, 2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels3, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[18], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[19], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[20], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Object[] expectedUnblockedChannels4 = new Integer[]{1};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels4, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[21], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[22], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[23], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[24], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)1L, (long)handler.getTriggeredCheckpointCounter());
        Assert.assertEquals((long)3L, (long)handler.getAbortedCheckpointCounter());
    }

    @Test
    public void testMissingCancellationBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierAlignerTestBase.createBarrier(1L, 0), CheckpointBarrierAlignerTestBase.createCancellationBarrier(3L, 1), CheckpointBarrierAlignerTestBase.createCancellationBarrier(2L, 0), CheckpointBarrierAlignerTestBase.createCancellationBarrier(3L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0)};
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler();
        this.inputGate = this.createBarrierBuffer(2, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            if (!boe.isBuffer() && boe.getEvent().getClass() == CancelCheckpointMarker.class) continue;
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
        Object[] expectedUnblockedChannels = new Integer[]{0};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
    }

    @Test
    public void testStartAlignmentWithClosedChannels() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierAlignerTestBase.createEndOfPartition(2), CheckpointBarrierAlignerTestBase.createEndOfPartition(1), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(3), CheckpointBarrierAlignerTestBase.createBarrier(2L, 3), CheckpointBarrierAlignerTestBase.createBarrier(2L, 0), CheckpointBarrierAlignerTestBase.createBarrier(3L, 0), CheckpointBarrierAlignerTestBase.createBarrier(3L, 3), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(3), CheckpointBarrierAlignerTestBase.createEndOfPartition(0), CheckpointBarrierAlignerTestBase.createBuffer(3), CheckpointBarrierAlignerTestBase.createBarrier(4L, 3), CheckpointBarrierAlignerTestBase.createBuffer(3), CheckpointBarrierAlignerTestBase.createEndOfPartition(3)};
        this.inputGate = this.createBarrierBuffer(4, sequence);
        CheckpointBarrierAlignerTestBase.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[4], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[5], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)2L, (long)this.inputGate.getLatestCheckpointId());
        Object[] expectedUnblockedChannels1 = new Integer[]{0, 3};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels1, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[7], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[8], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)3L, (long)this.inputGate.getLatestCheckpointId());
        Object[] expectedUnblockedChannels2 = new Integer[]{0, 3};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels2, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[10], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[11], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[12], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[13], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[14], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)4L, (long)this.inputGate.getLatestCheckpointId());
        Object[] expectedUnblockedChannels3 = new Integer[]{3};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels3, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[15], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[16], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
    }

    @Test
    public void testEndOfStreamWhileCheckpoint() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierAlignerTestBase.createBarrier(1L, 0), CheckpointBarrierAlignerTestBase.createBarrier(1L, 1), CheckpointBarrierAlignerTestBase.createBarrier(1L, 2), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBarrier(2L, 2), CheckpointBarrierAlignerTestBase.createBarrier(2L, 0), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createEndOfPartition(1), CheckpointBarrierAlignerTestBase.createEndOfPartition(2), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createEndOfPartition(0)};
        this.inputGate = this.createBarrierBuffer(3, sequence);
        CheckpointBarrierAlignerTestBase.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Object[] expectedUnblockedChannels1 = new Integer[]{0, 1, 2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels1, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[4], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[5], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)1L, (long)this.inputGate.getLatestCheckpointId());
        CheckpointBarrierAlignerTestBase.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)2L, (long)this.inputGate.getLatestCheckpointId());
        CheckpointBarrierAlignerTestBase.check(sequence[7], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[8], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Object[] expectedUnblockedChannels2 = new Integer[]{0, 2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels2, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[10], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[11], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[12], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
    }

    @Test
    public void testSingleChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBarrier(1L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBarrier(2L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createCancellationBarrier(4L, 0), CheckpointBarrierAlignerTestBase.createBarrier(5L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createCancellationBarrier(6L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0)};
        ValidatingCheckpointHandler toNotify = new ValidatingCheckpointHandler();
        this.inputGate = this.createBarrierBuffer(1, sequence, toNotify);
        toNotify.setNextExpectedCheckpointId(1L);
        CheckpointBarrierAlignerTestBase.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)0L, (long)this.inputGate.getAlignmentDurationNanos());
        Object[] expectedUnblockedChannels1 = new Integer[]{0};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels1, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        toNotify.setNextExpectedCheckpointId(2L);
        CheckpointBarrierAlignerTestBase.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Object[] expectedUnblockedChannels2 = new Integer[]{0};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels2, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        toNotify.setNextExpectedCheckpointId(5L);
        CheckpointBarrierAlignerTestBase.check(sequence[4], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)5L, (long)this.inputGate.getLatestCheckpointId());
        Assert.assertEquals((long)4L, (long)toNotify.getLastCanceledCheckpointId());
        Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, (Object)toNotify.getCheckpointFailureReason());
        Assert.assertEquals((long)0L, (long)this.inputGate.getAlignmentDurationNanos());
        Object[] expectedUnblockedChannels3 = new Integer[]{0};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels3, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[7], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)6L, (long)this.inputGate.getLatestCheckpointId());
        Assert.assertEquals((long)6L, (long)toNotify.getLastCanceledCheckpointId());
        Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, (Object)toNotify.getCheckpointFailureReason());
        Assert.assertEquals((long)0L, (long)this.inputGate.getAlignmentDurationNanos());
        Assert.assertEquals((long)3L, (long)toNotify.getTriggeredCheckpointCounter());
        Assert.assertEquals((long)2L, (long)toNotify.getAbortedCheckpointCounter());
    }

    @Test
    public void testMultiChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBarrier(1L, 1), CheckpointBarrierAlignerTestBase.createBarrier(1L, 2), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBarrier(1L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBarrier(2L, 0), CheckpointBarrierAlignerTestBase.createBarrier(2L, 2), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createCancellationBarrier(2L, 1), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createBarrier(3L, 1), CheckpointBarrierAlignerTestBase.createBarrier(3L, 2), CheckpointBarrierAlignerTestBase.createBarrier(3L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createCancellationBarrier(4L, 1), CheckpointBarrierAlignerTestBase.createBarrier(4L, 2), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBarrier(4L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBarrier(5L, 2), CheckpointBarrierAlignerTestBase.createBarrier(5L, 1), CheckpointBarrierAlignerTestBase.createBarrier(5L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createCancellationBarrier(6L, 1), CheckpointBarrierAlignerTestBase.createCancellationBarrier(6L, 2), CheckpointBarrierAlignerTestBase.createBarrier(6L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0)};
        ValidatingCheckpointHandler toNotify = new ValidatingCheckpointHandler();
        this.inputGate = this.createBarrierBuffer(3, sequence, toNotify);
        CheckpointBarrierAlignerTestBase.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        long startTs = System.nanoTime();
        toNotify.setNextExpectedCheckpointId(1L);
        CheckpointBarrierAlignerTestBase.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[4], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[5], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.validateAlignmentTime(startTs, this.inputGate);
        Object[] expectedUnblockedChannels1 = new Integer[]{0, 1, 2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels1, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[7], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[8], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[10], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[11], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[13], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)2L, (long)toNotify.getLastCanceledCheckpointId());
        Object[] expectedUnblockedChannels2 = new Integer[]{0, 2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels2, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, (Object)toNotify.getCheckpointFailureReason());
        CheckpointBarrierAlignerTestBase.check(sequence[14], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        startTs = System.nanoTime();
        toNotify.setNextExpectedCheckpointId(3L);
        CheckpointBarrierAlignerTestBase.check(sequence[15], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[16], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[17], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.validateAlignmentTime(startTs, this.inputGate);
        Object[] expectedUnblockedChannels3 = new Integer[]{0, 1, 2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels3, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[18], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[19], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[21], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)4L, (long)toNotify.getLastCanceledCheckpointId());
        Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, (Object)toNotify.getCheckpointFailureReason());
        Assert.assertEquals((long)0L, (long)this.inputGate.getAlignmentDurationNanos());
        Object[] expectedUnblockedChannels4 = new Integer[]{2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels4, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[22], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[23], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Object[] expectedUnblockedChannels5 = new Integer[]{0};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels5, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[24], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[25], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[26], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        startTs = System.nanoTime();
        toNotify.setNextExpectedCheckpointId(5L);
        CheckpointBarrierAlignerTestBase.check(sequence[27], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[28], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[29], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.validateAlignmentTime(startTs, this.inputGate);
        Object[] expectedUnblockedChannels6 = new Integer[]{0, 1, 2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels6, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[30], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[31], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[34], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)6L, (long)toNotify.getLastCanceledCheckpointId());
        Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, (Object)toNotify.getCheckpointFailureReason());
        Assert.assertEquals((long)0L, (long)this.inputGate.getAlignmentDurationNanos());
        Object[] expectedUnblockedChannels7 = new Integer[]{0};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels7, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[35], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)3L, (long)toNotify.getTriggeredCheckpointCounter());
        Assert.assertEquals((long)3L, (long)toNotify.getAbortedCheckpointCounter());
    }

    @Test
    public void testAbortOnCanceledBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createBarrier(1L, 1), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createCancellationBarrier(1L, 0), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createBarrier(2L, 1), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBarrier(1L, 2), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBarrier(2L, 0), CheckpointBarrierAlignerTestBase.createBarrier(2L, 2), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createBuffer(2)};
        ValidatingCheckpointHandler toNotify = new ValidatingCheckpointHandler();
        this.inputGate = this.createBarrierBuffer(3, sequence, toNotify);
        CheckpointBarrierAlignerTestBase.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[5], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)1L, (long)toNotify.getLastCanceledCheckpointId());
        Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER, (Object)toNotify.getCheckpointFailureReason());
        Object[] expectedUnblockedChannels1 = new Integer[]{1};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels1, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        long startTs = System.nanoTime();
        CheckpointBarrierAlignerTestBase.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[7], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[8], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Object[] expectedUnblockedChannels2 = new Integer[]{2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels2, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[10], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[11], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        toNotify.setNextExpectedCheckpointId(2L);
        CheckpointBarrierAlignerTestBase.check(sequence[12], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[13], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.validateAlignmentTime(startTs, this.inputGate);
        Object[] expectedUnblockedChannels3 = new Integer[]{0, 1, 2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels3, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[14], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[15], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[16], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)1L, (long)toNotify.getTriggeredCheckpointCounter());
        Assert.assertEquals((long)1L, (long)toNotify.getAbortedCheckpointCounter());
    }

    @Test
    public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBarrier(3L, 1), CheckpointBarrierAlignerTestBase.createBarrier(3L, 0), CheckpointBarrierAlignerTestBase.createBuffer(2), CheckpointBarrierAlignerTestBase.createBarrier(5L, 2), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createCancellationBarrier(3L, 0), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createBarrier(5L, 0), CheckpointBarrierAlignerTestBase.createBarrier(5L, 1), CheckpointBarrierAlignerTestBase.createBuffer(0), CheckpointBarrierAlignerTestBase.createBuffer(1), CheckpointBarrierAlignerTestBase.createBuffer(2)};
        ValidatingCheckpointHandler toNotify = new ValidatingCheckpointHandler();
        this.inputGate = this.createBarrierBuffer(3, sequence, toNotify);
        CheckpointBarrierAlignerTestBase.check(sequence[0], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[1], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[2], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[3], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        long startTs = System.nanoTime();
        CheckpointBarrierAlignerTestBase.check(sequence[4], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)3L, (long)toNotify.getLastCanceledCheckpointId());
        Assert.assertEquals((Object)CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED, (Object)toNotify.getCheckpointFailureReason());
        Object[] expectedUnblockedChannels1 = new Integer[]{0, 1};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels1, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[5], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[6], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[8], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[9], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        toNotify.setNextExpectedCheckpointId(5L);
        CheckpointBarrierAlignerTestBase.check(sequence[10], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[11], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.validateAlignmentTime(startTs, this.inputGate);
        Object[] expectedUnblockedChannels2 = new Integer[]{0, 1, 2};
        Assert.assertArrayEquals((Object[])expectedUnblockedChannels2, (Object[])this.mockInputGate.getAndResetLastUnblockedChannels().toArray());
        CheckpointBarrierAlignerTestBase.check(sequence[12], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[13], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        CheckpointBarrierAlignerTestBase.check(sequence[14], (BufferOrEvent)this.inputGate.pollNext().get(), 512);
        Assert.assertEquals((long)1L, (long)toNotify.getTriggeredCheckpointCounter());
        Assert.assertEquals((long)1L, (long)toNotify.getAbortedCheckpointCounter());
    }

    private static BufferOrEvent createBarrier(long checkpointId, int channel) {
        return new BufferOrEvent((AbstractEvent)new CheckpointBarrier(checkpointId, System.currentTimeMillis(), CheckpointOptions.forCheckpointWithDefaultLocation()), new InputChannelInfo(0, channel));
    }

    private static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) {
        return new BufferOrEvent((AbstractEvent)new CancelCheckpointMarker(checkpointId), new InputChannelInfo(0, channel));
    }

    private static BufferOrEvent createBuffer(int channel) {
        int size = sizeCounter++;
        byte[] bytes = new byte[size];
        RND.nextBytes(bytes);
        MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment((int)512);
        memory.put(0, bytes);
        NetworkBuffer buf = new NetworkBuffer(memory, FreeingBufferRecycler.INSTANCE);
        buf.setSize(size);
        buf.retainBuffer();
        return new BufferOrEvent((Buffer)buf, new InputChannelInfo(0, channel));
    }

    private static BufferOrEvent createEndOfPartition(int channel) {
        return new BufferOrEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, new InputChannelInfo(0, channel));
    }

    private static void check(BufferOrEvent expected, BufferOrEvent present, int pageSize) {
        Assert.assertNotNull((Object)expected);
        Assert.assertNotNull((Object)present);
        Assert.assertEquals((Object)expected.isBuffer(), (Object)present.isBuffer());
        if (expected.isBuffer()) {
            Assert.assertEquals((long)expected.getBuffer().getMaxCapacity(), (long)present.getBuffer().getMaxCapacity());
            Assert.assertEquals((long)expected.getBuffer().getSize(), (long)present.getBuffer().getSize());
            MemorySegment expectedMem = expected.getBuffer().getMemorySegment();
            MemorySegment presentMem = present.getBuffer().getMemorySegment();
            Assert.assertTrue((String)"memory contents differs", (expectedMem.compare(presentMem, 0, 0, pageSize) == 0 ? 1 : 0) != 0);
        } else {
            Assert.assertEquals((Object)expected.getEvent(), (Object)present.getEvent());
        }
    }

    private static void validateAlignmentTime(long alignmentStartTimestamp, CheckpointedInputGate inputGate) {
        long elapsedAlignment = System.nanoTime() - alignmentStartTimestamp;
        long elapsedTotalTime = System.nanoTime() - testStartTimeNanos;
        Assert.assertThat((Object)inputGate.getAlignmentDurationNanos(), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(elapsedAlignment)));
        long tolerance = 1000000L;
        Assert.assertThat((Object)inputGate.getCheckpointStartDelayNanos(), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(elapsedTotalTime + tolerance)));
    }

    public static class CheckpointExceptionMatcher
    extends BaseMatcher<CheckpointException> {
        private final CheckpointFailureReason failureReason;

        public CheckpointExceptionMatcher(CheckpointFailureReason failureReason) {
            this.failureReason = failureReason;
        }

        public boolean matches(Object o) {
            return o != null && o.getClass() == CheckpointException.class && ((CheckpointException)o).getCheckpointFailureReason().equals((Object)this.failureReason);
        }

        public void describeTo(Description description) {
            description.appendText("CheckpointException - reason = " + this.failureReason);
        }
    }

    private static class ValidatingCheckpointHandler
    extends AbstractInvokable {
        private CheckpointFailureReason failureReason;
        private long lastCanceledCheckpointId = -1L;
        private long nextExpectedCheckpointId = -1L;
        private long triggeredCheckpointCounter = 0L;
        private long abortedCheckpointCounter = 0L;

        public ValidatingCheckpointHandler() {
            super((Environment)new DummyEnvironment("test", 1, 0));
        }

        public void setNextExpectedCheckpointId(long nextExpectedCheckpointId) {
            this.nextExpectedCheckpointId = nextExpectedCheckpointId;
        }

        public CheckpointFailureReason getCheckpointFailureReason() {
            return this.failureReason;
        }

        public long getLastCanceledCheckpointId() {
            return this.lastCanceledCheckpointId;
        }

        public long getTriggeredCheckpointCounter() {
            return this.triggeredCheckpointCounter;
        }

        public long getAbortedCheckpointCounter() {
            return this.abortedCheckpointCounter;
        }

        public long getNextExpectedCheckpointId() {
            return this.nextExpectedCheckpointId;
        }

        public void invoke() {
            throw new UnsupportedOperationException();
        }

        public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
            throw new UnsupportedOperationException("should never be called");
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) {
            Assert.assertTrue((String)"wrong checkpoint id", (this.nextExpectedCheckpointId == -1L || this.nextExpectedCheckpointId == checkpointMetaData.getCheckpointId() ? 1 : 0) != 0);
            Assert.assertTrue((checkpointMetaData.getTimestamp() > 0L ? 1 : 0) != 0);
            Assert.assertTrue((checkpointMetrics.getAlignmentDurationNanos() >= 0L ? 1 : 0) != 0);
            ++this.nextExpectedCheckpointId;
            ++this.triggeredCheckpointCounter;
        }

        public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
            this.lastCanceledCheckpointId = checkpointId;
            this.failureReason = ((CheckpointException)cause).getCheckpointFailureReason();
            ++this.abortedCheckpointCounter;
        }

        public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
            throw new UnsupportedOperationException("should never be called");
        }
    }
}

