package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import javax.annotation.Nonnull;
import junit.framework.TestCase;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.event.RuntimeEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EventAnnouncement;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.util.TestCheckpointedInputGateBuilder;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.class */
public class AlternatingCheckpointsTest {
    private final ClockWithDelayedActions clock = new ClockWithDelayedActions();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest$CallableWithTimestamp.class */
    public static class CallableWithTimestamp {
        private final long timestamp;
        private final Callable<?> callable;

        private CallableWithTimestamp(long j, @Nonnull Callable<?> callable) {
            this.timestamp = j;
            this.callable = callable;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public Callable<?> getCallable() {
            return this.callable;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest$ClockWithDelayedActions.class */
    public static class ClockWithDelayedActions extends Clock implements BiFunction<Callable<?>, Duration, CheckpointBarrierHandler.Cancellable> {
        private final ManualClock clock;
        private final PriorityQueue<CallableWithTimestamp> queue;

        private ClockWithDelayedActions() {
            this.clock = new ManualClock(100000000L);
            this.queue = new PriorityQueue<>(Comparator.comparingLong((v0) -> {
                return v0.getTimestamp();
            }));
        }

        @Override // java.util.function.BiFunction
        public CheckpointBarrierHandler.Cancellable apply(Callable<?> callable, Duration duration) {
            CallableWithTimestamp callableWithTimestamp = new CallableWithTimestamp(this.clock.relativeTimeNanos() + duration.toNanos(), callable);
            this.queue.add(callableWithTimestamp);
            return () -> {
                this.queue.remove(callableWithTimestamp);
            };
        }

        public void advanceTime(long j, TimeUnit timeUnit) throws Exception {
            this.clock.advanceTime(j, timeUnit);
            executeCallables();
        }

        public void advanceTime(Duration duration) throws Exception {
            this.clock.advanceTime(duration);
            executeCallables();
        }

        public ManualClock getClock() {
            return this.clock;
        }

        public void advanceTimeWithoutRunningCallables(long j, TimeUnit timeUnit) {
            this.clock.advanceTime(j, timeUnit);
        }

        public void executeCallables() throws Exception {
            long relativeTimeNanos = this.clock.relativeTimeNanos();
            while (!this.queue.isEmpty() && this.queue.peek().getTimestamp() <= relativeTimeNanos) {
                this.queue.poll().getCallable().call();
            }
        }

        public long absoluteTimeMillis() {
            return this.clock.absoluteTimeMillis();
        }

        public long relativeTimeMillis() {
            return this.clock.relativeTimeMillis();
        }

        public long relativeTimeNanos() {
            return this.clock.relativeTimeNanos();
        }
    }

    private TestBarrierHandlerFactory getTestBarrierHandlerFactory(ValidatingCheckpointHandler validatingCheckpointHandler) {
        return TestBarrierHandlerFactory.forTarget(validatingCheckpointHandler).withActionRegistration(this.clock).withClock(this.clock);
    }

    @Test
    public void testChannelResetOnNewBarrier() throws Exception {
        ChannelStateWriter recordingChannelStateWriter = new RecordingChannelStateWriter();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(2, getTestBarrierHandlerFactory(new ValidatingCheckpointHandler())).withChannelStateWriter(recordingChannelStateWriter).withRemoteChannels().withMailboxExecutor().build();
        Throwable th = null;
        try {
            try {
                sendBarrier(0L, this.clock.relativeTimeMillis(), CheckpointType.SAVEPOINT, build, 0);
                build.getChannel(0).onBuffer(TestBufferFactory.createBuffer(StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE), 1, 0);
                send(EventSerializer.toBuffer(new CheckpointBarrier(1L, this.clock.relativeTimeMillis(), CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())), true), 1, build);
                Assert.assertFalse(recordingChannelStateWriter.getAddedInput().isEmpty());
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSwitchToUnalignedByUpstream() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(2, getTestBarrierHandlerFactory(validatingCheckpointHandler)).build();
        Throwable th = null;
        try {
            CheckpointBarrier checkpointBarrier = new CheckpointBarrier(1L, this.clock.relativeTimeMillis(), CheckpointOptions.alignedWithTimeout(CheckpointStorageLocationReference.getDefault(), 2147483647L));
            send(EventSerializer.toBuffer(new EventAnnouncement(checkpointBarrier, 0), true), 0, build);
            Assert.assertEquals(0L, validatingCheckpointHandler.triggeredCheckpointCounter);
            send(EventSerializer.toBuffer(checkpointBarrier.asUnaligned(), true), 1, build);
            Assert.assertEquals(1L, validatingCheckpointHandler.triggeredCheckpointCounter);
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCheckpointHandling() throws Exception {
        testBarrierHandling(CheckpointType.CHECKPOINT);
    }

    @Test
    public void testSavepointHandling() throws Exception {
        testBarrierHandling(CheckpointType.SAVEPOINT);
    }

    @Test
    public void testAlternation() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(123, getTestBarrierHandlerFactory(validatingCheckpointHandler)).build();
        Throwable th = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                for (long j = 0; j < 123; j++) {
                    arrayList.add(Long.valueOf(j));
                    CheckpointType checkpointType = j % 2 == 0 ? CheckpointType.CHECKPOINT : CheckpointType.SAVEPOINT;
                    for (int i = 0; i < 123; i++) {
                        send(barrier(j, this.clock.relativeTimeMillis(), CheckpointOptions.alignedNoTimeout(checkpointType, CheckpointStorageLocationReference.getDefault())).retainBuffer(), i, build);
                    }
                }
                Assert.assertEquals(arrayList, validatingCheckpointHandler.triggeredCheckpoints);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testAlignedAfterTimedOut() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(1, getTestBarrierHandlerFactory(validatingCheckpointHandler)).withRemoteChannels().withMailboxExecutor().build();
        Throwable th = null;
        try {
            build.getChannel(0).onBuffer(barrier(1L, this.clock.relativeTimeMillis(), CheckpointOptions.alignedWithTimeout(CheckpointStorageLocationReference.getDefault(), 100L)).retainBuffer(), 0, 0);
            assertAnnouncement(build);
            this.clock.advanceTime(100 + 1, TimeUnit.MILLISECONDS);
            assertBarrier(build);
            Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
            build.getChannel(0).onBuffer(barrier(2L, this.clock.relativeTimeMillis(), CheckpointOptions.alignedWithTimeout(CheckpointStorageLocationReference.getDefault(), 100L)).retainBuffer(), 1, 0);
            assertAnnouncement(build);
            assertBarrier(build);
            Assert.assertEquals(2L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
            MatcherAssert.assertThat(validatingCheckpointHandler.getTriggeredCheckpointOptions(), Matchers.contains(new CheckpointOptions[]{CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault()), CheckpointOptions.alignedWithTimeout(CheckpointStorageLocationReference.getDefault(), 100L)}));
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAlignedNeverTimeoutableCheckpoint() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(2, getTestBarrierHandlerFactory(validatingCheckpointHandler)).build();
        Throwable th = null;
        try {
            try {
                Buffer withTimeout = withTimeout(2147483647L);
                send(withTimeout, 0, build);
                sendData(1000, 1, build);
                Assert.assertEquals(0L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
                send(withTimeout, 1, build);
                Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTimeoutAlignment() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(2, getTestBarrierHandlerFactory(validatingCheckpointHandler)).withRemoteChannels().withMailboxExecutor().build();
        Throwable th = null;
        try {
            try {
                testTimeoutBarrierOnTwoChannels(validatingCheckpointHandler, build, 10L);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTimeoutAlignmentAfterProcessingBarrier() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(3, getTestBarrierHandlerFactory(validatingCheckpointHandler)).withRemoteChannels().withMailboxExecutor().build();
        Throwable th = null;
        try {
            try {
                send(barrier(1L, this.clock.relativeTimeMillis(), CheckpointOptions.alignedWithTimeout(CheckpointStorageLocationReference.getDefault(), 2147483647L)), 2, build);
                Assert.assertEquals(0L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
                testTimeoutBarrierOnTwoChannels(validatingCheckpointHandler, build, 2147483647L);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    private void testTimeoutBarrierOnTwoChannels(ValidatingCheckpointHandler validatingCheckpointHandler, CheckpointedInputGate checkpointedInputGate, long j) throws Exception {
        Buffer withTimeout = withTimeout(j);
        getChannel(checkpointedInputGate, 0).onBuffer(dataBuffer(), 0, 0);
        getChannel(checkpointedInputGate, 0).onBuffer(dataBuffer(), 1, 0);
        getChannel(checkpointedInputGate, 0).onBuffer(withTimeout.retainBuffer(), 2, 0);
        getChannel(checkpointedInputGate, 1).onBuffer(dataBuffer(), 0, 0);
        getChannel(checkpointedInputGate, 1).onBuffer(withTimeout.retainBuffer(), 1, 0);
        Assert.assertEquals(0L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
        assertAnnouncement(checkpointedInputGate);
        this.clock.advanceTime(j * 2, TimeUnit.MILLISECONDS);
        assertAnnouncement(checkpointedInputGate);
        assertBarrier(checkpointedInputGate);
        assertBarrier(checkpointedInputGate);
        Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
        MatcherAssert.assertThat(validatingCheckpointHandler.getTriggeredCheckpointOptions(), Matchers.contains(new CheckpointOptions[]{CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())}));
        assertData(checkpointedInputGate);
        assertData(checkpointedInputGate);
        assertData(checkpointedInputGate);
    }

    private Buffer dataBuffer() {
        return TestBufferFactory.createBuffer(100).retainBuffer();
    }

    @Test
    public void testTimeoutAlignmentOnFirstBarrier() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(2, getTestBarrierHandlerFactory(validatingCheckpointHandler)).withRemoteChannels().withMailboxExecutor().build();
        Buffer withTimeout = withTimeout(100L);
        for (int i = 0; i < 2; i++) {
            getChannel(build, i).onBuffer(withTimeout.retainBuffer(), 0, 0);
        }
        Assert.assertEquals(0L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
        for (int i2 = 0; i2 < 2; i2++) {
            assertAnnouncement(build);
        }
        Assert.assertEquals(0L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
        this.clock.advanceTime(100 * 4, TimeUnit.MILLISECONDS);
        assertBarrier(build);
        Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
    }

    @Test
    public void testTimeoutAlignmentOnAnnouncementForSecondCheckpoint() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(2, getTestBarrierHandlerFactory(validatingCheckpointHandler)).withRemoteChannels().withMailboxExecutor().build();
        performFirstCheckpoint(2, validatingCheckpointHandler, build, 100L);
        Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
        Buffer withTimeout = withTimeout(2, 100L);
        for (int i = 0; i < 2; i++) {
            getChannel(build, i).onBuffer(dataBuffer(), 1, 0);
            getChannel(build, i).onBuffer(withTimeout.retainBuffer(), 2, 0);
        }
        Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
        for (int i2 = 0; i2 < 2; i2++) {
            assertAnnouncement(build);
        }
        Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
        this.clock.advanceTime(100 * 4, TimeUnit.MILLISECONDS);
        assertBarrier(build);
        Assert.assertEquals(2L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
    }

    private void performFirstCheckpoint(int i, ValidatingCheckpointHandler validatingCheckpointHandler, CheckpointedInputGate checkpointedInputGate, long j) throws IOException, InterruptedException {
        Buffer withTimeout = withTimeout(1, j);
        for (int i2 = 0; i2 < i; i2++) {
            getChannel(checkpointedInputGate, i2).onBuffer(withTimeout.retainBuffer(), 0, 0);
        }
        Assert.assertEquals(0L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
        for (int i3 = 0; i3 < i; i3++) {
            assertAnnouncement(checkpointedInputGate);
        }
        for (int i4 = 0; i4 < i; i4++) {
            assertBarrier(checkpointedInputGate);
        }
    }

    @Test
    public void testPassiveTimeoutAlignmentOnAnnouncement() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(2, getTestBarrierHandlerFactory(validatingCheckpointHandler)).withRemoteChannels().withMailboxExecutor().build();
        Buffer withTimeout = withTimeout(100L);
        getChannel(build, 0).onBuffer(withTimeout.retainBuffer(), 0, 0);
        Assert.assertEquals(0L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
        assertAnnouncement(build);
        assertBarrier(build);
        this.clock.advanceTimeWithoutRunningCallables(100 * 4, TimeUnit.MILLISECONDS);
        getChannel(build, 1).onBuffer(withTimeout.retainBuffer(), 0, 0);
        assertAnnouncement(build);
        assertBarrier(build);
        Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
    }

    @Test
    public void testActiveTimeoutAlignmentOnFirstBarrier() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        send(withTimeout(100L), 0, new TestCheckpointedInputGateBuilder(2, getTestBarrierHandlerFactory(validatingCheckpointHandler)).withRemoteChannels().withSyncExecutor().build());
        this.clock.advanceTime(100 + 1, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat(validatingCheckpointHandler.getTriggeredCheckpointOptions(), Matchers.contains(new CheckpointOptions[]{CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())}));
    }

    @Test
    public void testAllChannelsUnblockedAfterAlignmentTimeout() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(2, getTestBarrierHandlerFactory(validatingCheckpointHandler)).withTestChannels().withSyncExecutor().build();
        CheckpointBarrier checkpointBarrier = new CheckpointBarrier(1L, this.clock.relativeTimeMillis(), CheckpointOptions.alignedWithTimeout(CheckpointStorageLocationReference.getDefault(), 100L));
        Buffer buffer = EventSerializer.toBuffer(checkpointBarrier, false);
        send(EventSerializer.toBuffer(new EventAnnouncement(checkpointBarrier, 0), true), 0, build);
        build.getChannel(0).setBlocked(true);
        send(buffer, 0, build);
        this.clock.advanceTime(100 + 1, TimeUnit.MILLISECONDS);
        send(EventSerializer.toBuffer(new EventAnnouncement(checkpointBarrier, 0), true), 1, build);
        build.getChannel(1).setBlocked(true);
        send(buffer, 1, build);
        MatcherAssert.assertThat(Integer.valueOf(validatingCheckpointHandler.getTriggeredCheckpointOptions().size()), Matchers.equalTo(1));
        MatcherAssert.assertThat(validatingCheckpointHandler.getTriggeredCheckpointOptions(), Matchers.contains(new CheckpointOptions[]{CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())}));
        Assert.assertFalse(build.getChannel(0).isBlocked());
        Assert.assertFalse(build.getChannel(1).isBlocked());
    }

    @Test
    public void testNoActiveTimeoutAlignmentAfterLastBarrier() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(2, getTestBarrierHandlerFactory(validatingCheckpointHandler)).withTestChannels().withSyncExecutor().build();
        Buffer withTimeout = withTimeout(100L);
        send(withTimeout, 0, build);
        send(withTimeout, 1, build);
        this.clock.advanceTime(100 + 1, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat(validatingCheckpointHandler.getTriggeredCheckpointOptions(), CoreMatchers.not(Matchers.contains(new CheckpointOptions[]{CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())})));
    }

    @Test
    public void testNoActiveTimeoutAlignmentAfterAbort() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(2, getTestBarrierHandlerFactory(validatingCheckpointHandler)).withTestChannels().withSyncExecutor().build();
        send(withTimeout(100L), 0, build);
        send(EventSerializer.toBuffer(new CancelCheckpointMarker(1L), true), 0, build);
        send(EventSerializer.toBuffer(new CancelCheckpointMarker(1L), true), 1, build);
        this.clock.advanceTime(100 + 1, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat(Integer.valueOf(validatingCheckpointHandler.getTriggeredCheckpointOptions().size()), Matchers.equalTo(0));
    }

    @Test
    public void testNoActiveTimeoutAlignmentAfterClose() throws Exception {
        ClockWithDelayedActions clockWithDelayedActions = new ClockWithDelayedActions() { // from class: org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCheckpointsTest.1
            @Override // org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCheckpointsTest.ClockWithDelayedActions, java.util.function.BiFunction
            public CheckpointBarrierHandler.Cancellable apply(Callable<?> callable, Duration duration) {
                super.apply(callable, duration);
                return () -> {
                };
            }
        };
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(2, TestBarrierHandlerFactory.forTarget(validatingCheckpointHandler).withActionRegistration(clockWithDelayedActions).withClock(clockWithDelayedActions)).withRemoteChannels().withSyncExecutor().build();
        send(withTimeout(100L), 0, build);
        build.close();
        clockWithDelayedActions.advanceTime(100 + 1, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat(Integer.valueOf(validatingCheckpointHandler.getTriggeredCheckpointOptions().size()), Matchers.equalTo(0));
    }

    @Test
    public void testActiveTimeoutAlignmentOnAnnouncement() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(2, getTestBarrierHandlerFactory(validatingCheckpointHandler)).withRemoteChannels().withMailboxExecutor().build();
        Throwable th = null;
        try {
            try {
                Buffer withTimeout = withTimeout(10L);
                getChannel(build, 0).onBuffer(dataBuffer(), 0, 0);
                getChannel(build, 0).onBuffer(dataBuffer(), 1, 0);
                getChannel(build, 0).onBuffer(withTimeout.retainBuffer(), 2, 0);
                getChannel(build, 1).onBuffer(dataBuffer(), 0, 0);
                getChannel(build, 1).onBuffer(withTimeout.retainBuffer(), 1, 0);
                Assert.assertEquals(0L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
                assertAnnouncement(build);
                assertAnnouncement(build);
                this.clock.advanceTime(10 + 1, TimeUnit.MILLISECONDS);
                assertBarrier(build);
                assertBarrier(build);
                Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
                MatcherAssert.assertThat(validatingCheckpointHandler.getTriggeredCheckpointOptions(), Matchers.contains(new CheckpointOptions[]{CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())}));
                assertData(build);
                assertData(build);
                assertData(build);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testActiveTimeoutAfterAnnouncementPassiveTimeout() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(2, getTestBarrierHandlerFactory(validatingCheckpointHandler)).withRemoteChannels().withMailboxExecutor().build();
        Throwable th = null;
        try {
            try {
                Buffer withTimeout = withTimeout(10L);
                getChannel(build, 0).onBuffer(dataBuffer(), 0, 0);
                getChannel(build, 0).onBuffer(dataBuffer(), 1, 0);
                getChannel(build, 0).onBuffer(withTimeout.retainBuffer(), 2, 0);
                getChannel(build, 1).onBuffer(dataBuffer(), 0, 0);
                getChannel(build, 1).onBuffer(withTimeout.retainBuffer(), 1, 0);
                Assert.assertEquals(0L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
                assertAnnouncement(build);
                this.clock.advanceTimeWithoutRunningCallables(10 + 1, TimeUnit.MILLISECONDS);
                assertAnnouncement(build);
                this.clock.executeCallables();
                assertBarrier(build);
                assertBarrier(build);
                Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
                MatcherAssert.assertThat(validatingCheckpointHandler.getTriggeredCheckpointOptions(), Matchers.contains(new CheckpointOptions[]{CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())}));
                assertData(build);
                assertData(build);
                assertData(build);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testActiveTimeoutAfterBarrierPassiveTimeout() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(2, getTestBarrierHandlerFactory(validatingCheckpointHandler)).withRemoteChannels().withMailboxExecutor().build();
        Throwable th = null;
        try {
            try {
                Buffer withTimeout = withTimeout(10L);
                getChannel(build, 0).onBuffer(dataBuffer(), 0, 0);
                getChannel(build, 0).onBuffer(dataBuffer(), 1, 0);
                getChannel(build, 0).onBuffer(withTimeout.retainBuffer(), 2, 0);
                Assert.assertEquals(0L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
                assertAnnouncement(build);
                assertData(build);
                assertData(build);
                this.clock.advanceTimeWithoutRunningCallables(10 + 1, TimeUnit.MILLISECONDS);
                assertBarrier(build);
                this.clock.executeCallables();
                getChannel(build, 1).onBuffer(dataBuffer(), 0, 0);
                getChannel(build, 1).onBuffer(withTimeout.retainBuffer(), 1, 0);
                assertAnnouncement(build);
                assertBarrier(build);
                Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
                MatcherAssert.assertThat(validatingCheckpointHandler.getTriggeredCheckpointOptions(), Matchers.contains(new CheckpointOptions[]{CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())}));
                assertData(build);
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTimeoutAlignmentOnUnalignedCheckpoint() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(3, getTestBarrierHandlerFactory(validatingCheckpointHandler)).withChannelStateWriter(new RecordingChannelStateWriter()).withRemoteChannels().withMailboxExecutor().build();
        getChannel(build, 0).onBuffer(withTimeout(2147483647L).retainBuffer(), 0, 0);
        assertAnnouncement(build);
        assertBarrier(build);
        getChannel(build, 1).onBuffer(dataBuffer(), 0, 0);
        getChannel(build, 1).onBuffer(dataBuffer(), 1, 0);
        getChannel(build, 1).onBuffer(EventSerializer.toBuffer(new CheckpointBarrier(1L, this.clock.relativeTimeMillis(), CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())), true).retainBuffer(), 2, 0);
        assertBarrier(build);
        Assert.assertEquals(2L, r0.getAddedInput().get(getChannel(build, 1).getChannelInfo()).size());
        Assert.assertEquals(1L, validatingCheckpointHandler.getTriggeredCheckpointCounter());
    }

    private RemoteInputChannel getChannel(CheckpointedInputGate checkpointedInputGate, int i) {
        return checkpointedInputGate.getChannel(i);
    }

    @Test
    public void testMetricsAlternation() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(2, getTestBarrierHandlerFactory(validatingCheckpointHandler)).build();
        long relativeTimeNanos = this.clock.relativeTimeNanos();
        long relativeTimeMillis = this.clock.relativeTimeMillis() - 10;
        sendBarrier(1L, relativeTimeMillis, CheckpointType.CHECKPOINT, build, 0);
        sendData(1000, 0, build);
        sendData(1000, 1, build);
        this.clock.advanceTime(6L, TimeUnit.MILLISECONDS);
        sendBarrier(1L, relativeTimeMillis, CheckpointType.CHECKPOINT, build, 1);
        sendData(1000, 0, build);
        assertMetrics(validatingCheckpointHandler, build.getCheckpointBarrierHandler(), 1L, relativeTimeNanos, 6000000L, 10000000L, 1000 * 2);
        long relativeTimeNanos2 = this.clock.relativeTimeNanos();
        long relativeTimeMillis2 = this.clock.relativeTimeMillis() - 5;
        sendBarrier(2L, relativeTimeMillis2, CheckpointType.SAVEPOINT, build, 0);
        sendData(1000, 1, build);
        assertMetrics(validatingCheckpointHandler, build.getCheckpointBarrierHandler(), 2L, relativeTimeNanos2, 0L, 5000000L, 1000 * 2);
        this.clock.advanceTime(5L, TimeUnit.MILLISECONDS);
        sendBarrier(2L, relativeTimeMillis2, CheckpointType.SAVEPOINT, build, 1);
        sendData(1000, 0, build);
        assertMetrics(validatingCheckpointHandler, build.getCheckpointBarrierHandler(), 2L, relativeTimeNanos2, 5000000L, 5000000L, 1000);
        long relativeTimeNanos3 = this.clock.relativeTimeNanos();
        send(barrier(3L, this.clock.relativeTimeMillis() - 7, CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())), 0, build);
        sendData(1000, 0, build);
        sendData(1000, 1, build);
        assertMetrics(validatingCheckpointHandler, build.getCheckpointBarrierHandler(), 3L, relativeTimeNanos3, 0L, 7000000L, -1L);
        this.clock.advanceTime(10L, TimeUnit.MILLISECONDS);
        send(barrier(3L, relativeTimeMillis2, CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())), 1, build);
        assertMetrics(validatingCheckpointHandler, build.getCheckpointBarrierHandler(), 3L, relativeTimeNanos3, 10000000L, 7000000L, 1000 * 2);
    }

    @Test
    public void testMetricsSingleChannel() throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        CheckpointedInputGate build = new TestCheckpointedInputGateBuilder(1, getTestBarrierHandlerFactory(validatingCheckpointHandler)).build();
        long relativeTimeMillis = this.clock.relativeTimeMillis() - 10;
        long relativeTimeNanos = this.clock.relativeTimeNanos();
        sendData(1000, 0, build);
        sendBarrier(1L, relativeTimeMillis, CheckpointType.CHECKPOINT, build, 0);
        sendData(1000, 0, build);
        this.clock.advanceTime(6L, TimeUnit.MILLISECONDS);
        assertMetrics(validatingCheckpointHandler, build.getCheckpointBarrierHandler(), 1L, relativeTimeNanos, 0L, 10000000L, 0L);
        long relativeTimeMillis2 = this.clock.relativeTimeMillis() - 5;
        long relativeTimeNanos2 = this.clock.relativeTimeNanos();
        sendData(1000, 0, build);
        sendBarrier(2L, relativeTimeMillis2, CheckpointType.SAVEPOINT, build, 0);
        sendData(1000, 0, build);
        this.clock.advanceTime(5L, TimeUnit.MILLISECONDS);
        assertMetrics(validatingCheckpointHandler, build.getCheckpointBarrierHandler(), 2L, relativeTimeNanos2, 0L, 5000000L, 0L);
    }

    private void assertMetrics(ValidatingCheckpointHandler validatingCheckpointHandler, CheckpointBarrierHandler checkpointBarrierHandler, long j, long j2, long j3, long j4, long j5) {
        MatcherAssert.assertThat(Long.valueOf(checkpointBarrierHandler.getLatestCheckpointId()), Matchers.equalTo(Long.valueOf(j)));
        long alignmentDurationNanos = checkpointBarrierHandler.getAlignmentDurationNanos();
        long relativeTimeNanos = this.clock.relativeTimeNanos() - j2;
        MatcherAssert.assertThat(Long.valueOf(alignmentDurationNanos), Matchers.greaterThanOrEqualTo(Long.valueOf(j3)));
        MatcherAssert.assertThat(Long.valueOf(alignmentDurationNanos), Matchers.lessThanOrEqualTo(Long.valueOf(relativeTimeNanos)));
        MatcherAssert.assertThat(Long.valueOf(checkpointBarrierHandler.getCheckpointStartDelayNanos()), Matchers.greaterThanOrEqualTo(Long.valueOf(j4)));
        MatcherAssert.assertThat(FutureUtils.getOrDefault(validatingCheckpointHandler.getLastBytesProcessedDuringAlignment(), -1L), Matchers.equalTo(Long.valueOf(j5)));
    }

    @Test
    public void testPreviousHandlerReset() throws Exception {
        SingleInputGate build = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        TestInputChannel[] testInputChannelArr = {new TestInputChannel(build, 0), new TestInputChannel(build, 1)};
        build.setInputChannels(testInputChannelArr);
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        SingleCheckpointBarrierHandler create = getTestBarrierHandlerFactory(validatingCheckpointHandler).create(build);
        for (int i = 0; i < 4; i++) {
            int i2 = i % 2;
            CheckpointType checkpointType = i2 == 0 ? CheckpointType.SAVEPOINT : CheckpointType.CHECKPOINT;
            validatingCheckpointHandler.setNextExpectedCheckpointId(-1L);
            if (checkpointType.isSavepoint()) {
                testInputChannelArr[i2].setBlocked(true);
            }
            create.processBarrier(new CheckpointBarrier(i, this.clock.relativeTimeMillis(), new CheckpointOptions(checkpointType, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, i2));
            if (checkpointType.isSavepoint()) {
                TestCase.assertTrue(testInputChannelArr[i2].isBlocked());
                Assert.assertFalse(testInputChannelArr[(i2 + 1) % 2].isBlocked());
            } else {
                Assert.assertFalse(testInputChannelArr[0].isBlocked());
                Assert.assertFalse(testInputChannelArr[1].isBlocked());
            }
            TestCase.assertTrue(create.isCheckpointPending());
            Assert.assertFalse(create.getAllBarriersReceivedFuture(i).isDone());
            testInputChannelArr[0].setBlocked(false);
            testInputChannelArr[1].setBlocked(false);
        }
    }

    @Test
    public void testHasInflightDataBeforeProcessBarrier() throws Exception {
        SingleInputGate build = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        build.setInputChannels(new InputChannel[]{new TestInputChannel(build, 0), new TestInputChannel(build, 1)});
        SingleCheckpointBarrierHandler create = getTestBarrierHandlerFactory(new ValidatingCheckpointHandler()).create(build);
        create.processBarrier(new CheckpointBarrier(1L, this.clock.relativeTimeMillis(), new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0));
        Assert.assertFalse(create.getAllBarriersReceivedFuture(1L).isDone());
    }

    @Test
    public void testOutOfOrderBarrier() throws Exception {
        SingleInputGate build = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        InputChannel testInputChannel = new TestInputChannel(build, 0);
        InputChannel testInputChannel2 = new TestInputChannel(build, 1);
        build.setInputChannels(new InputChannel[]{testInputChannel, testInputChannel2});
        SingleCheckpointBarrierHandler create = getTestBarrierHandlerFactory(new ValidatingCheckpointHandler()).create(build);
        create.processBarrier(new CheckpointBarrier(10L, this.clock.relativeTimeMillis(), new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0));
        testInputChannel2.setBlocked(true);
        create.processBarrier(new CheckpointBarrier(5L, this.clock.relativeTimeMillis(), new CheckpointOptions(CheckpointType.SAVEPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 1));
        Assert.assertEquals(10L, create.getLatestCheckpointId());
        Assert.assertFalse(testInputChannel2.isBlocked());
    }

    private void testBarrierHandling(CheckpointType checkpointType) throws Exception {
        ValidatingCheckpointHandler validatingCheckpointHandler = new ValidatingCheckpointHandler();
        SingleInputGate build = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        InputChannel testInputChannel = new TestInputChannel(build, 0, false, true);
        InputChannel testInputChannel2 = new TestInputChannel(build, 1, false, true);
        build.setInputChannels(new InputChannel[]{testInputChannel, testInputChannel2});
        CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate(build, getTestBarrierHandlerFactory(validatingCheckpointHandler).create(build), new SyncMailboxExecutor());
        if (checkpointType.isSavepoint()) {
            testInputChannel.setBlocked(true);
            testInputChannel2.setBlocked(true);
        }
        Buffer barrier = barrier(123L, 1L, checkpointType.isSavepoint() ? CheckpointOptions.alignedNoTimeout(checkpointType, CheckpointStorageLocationReference.getDefault()) : CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault()));
        send(barrier.retainBuffer(), testInputChannel, checkpointedInputGate);
        Assert.assertEquals(Boolean.valueOf(checkpointType.isSavepoint()), Boolean.valueOf(validatingCheckpointHandler.triggeredCheckpoints.isEmpty()));
        send(barrier.retainBuffer(), testInputChannel2, checkpointedInputGate);
        Assert.assertEquals(Collections.singletonList(123L), validatingCheckpointHandler.triggeredCheckpoints);
        if (checkpointType.isSavepoint()) {
            for (TestInputChannel testInputChannel3 : build.getInputChannels().values()) {
                Assert.assertFalse(String.format("channel %d should be resumed", Integer.valueOf(testInputChannel3.getChannelIndex())), testInputChannel3.isBlocked());
            }
        }
    }

    private void sendBarrier(long j, long j2, CheckpointType checkpointType, CheckpointedInputGate checkpointedInputGate, int i) throws Exception {
        send(barrier(j, j2, CheckpointOptions.alignedNoTimeout(checkpointType, CheckpointStorageLocationReference.getDefault())), i, checkpointedInputGate);
    }

    private void sendData(int i, int i2, CheckpointedInputGate checkpointedInputGate) throws Exception {
        send(TestBufferFactory.createBuffer(i), i2, checkpointedInputGate);
    }

    private void send(Buffer buffer, int i, CheckpointedInputGate checkpointedInputGate) throws Exception {
        send(buffer.retainBuffer(), checkpointedInputGate.getChannel(i), checkpointedInputGate);
    }

    private void send(Buffer buffer, InputChannel inputChannel, CheckpointedInputGate checkpointedInputGate) throws IOException, InterruptedException {
        if (inputChannel instanceof TestInputChannel) {
            ((TestInputChannel) inputChannel).read(buffer, buffer.getDataType());
        } else {
            if (!(inputChannel instanceof RemoteInputChannel)) {
                throw new IllegalArgumentException("Unknown channel type: " + inputChannel);
            }
            ((RemoteInputChannel) inputChannel).onBuffer(buffer, 0, 0);
        }
        do {
        } while (checkpointedInputGate.pollNext().isPresent());
    }

    private Buffer withTimeout(long j) throws IOException {
        return withTimeout(1, j);
    }

    private Buffer withTimeout(int i, long j) throws IOException {
        return barrier(i, this.clock.relativeTimeMillis(), CheckpointOptions.alignedWithTimeout(CheckpointStorageLocationReference.getDefault(), j));
    }

    private Buffer barrier(long j, long j2, CheckpointOptions checkpointOptions) throws IOException {
        CheckpointBarrier checkpointBarrier = new CheckpointBarrier(j, j2, checkpointOptions);
        return EventSerializer.toBuffer(checkpointBarrier, checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint());
    }

    private static void assertAnnouncement(CheckpointedInputGate checkpointedInputGate) throws IOException, InterruptedException {
        assertEvent(checkpointedInputGate, EventAnnouncement.class);
    }

    private static void assertBarrier(CheckpointedInputGate checkpointedInputGate) throws IOException, InterruptedException {
        assertEvent(checkpointedInputGate, CheckpointBarrier.class);
    }

    private static <T extends RuntimeEvent> void assertEvent(CheckpointedInputGate checkpointedInputGate, Class<T> cls) throws IOException, InterruptedException {
        Optional<BufferOrEvent> assertPoll = assertPoll(checkpointedInputGate);
        TestCase.assertTrue("expected event, got data buffer on " + assertPoll.get().getChannelInfo(), assertPoll.get().isEvent());
        Assert.assertEquals(cls, assertPoll.get().getEvent().getClass());
    }

    private static <T extends RuntimeEvent> void assertData(CheckpointedInputGate checkpointedInputGate) throws IOException, InterruptedException {
        Optional<BufferOrEvent> assertPoll = assertPoll(checkpointedInputGate);
        TestCase.assertTrue("expected data, got " + assertPoll.get().getEvent() + "  on " + assertPoll.get().getChannelInfo(), assertPoll.get().isBuffer());
    }

    private static Optional<BufferOrEvent> assertPoll(CheckpointedInputGate checkpointedInputGate) throws IOException, InterruptedException {
        Optional<BufferOrEvent> pollNext = checkpointedInputGate.pollNext();
        TestCase.assertTrue("empty gate", pollNext.isPresent());
        return pollNext;
    }
}
