package org.apache.flink.streaming.runtime.io;

import java.util.Arrays;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/BarrierTrackerTest.class */
public class BarrierTrackerTest {
    private static final int PAGE_SIZE = 512;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/BarrierTrackerTest$CheckpointSequenceValidator.class */
    private static class CheckpointSequenceValidator implements StatefulTask {
        private final long[] checkpointIDs;
        private int i;

        private CheckpointSequenceValidator(long... jArr) {
            this.i = 0;
            this.checkpointIDs = jArr;
        }

        public void setInitialState(TaskStateHandles taskStateHandles) throws Exception {
            throw new UnsupportedOperationException("should never be called");
        }

        public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
            throw new UnsupportedOperationException("should never be called");
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
            Assert.assertTrue("More checkpoints than expected", this.i < this.checkpointIDs.length);
            long[] jArr = this.checkpointIDs;
            int i = this.i;
            this.i = i + 1;
            long j = jArr[i];
            if (j < 0) {
                Assert.fail("got 'triggerCheckpointOnBarrier()' when expecting an 'abortCheckpointOnBarrier()'");
            } else {
                Assert.assertEquals("wrong checkpoint id", j, checkpointMetaData.getCheckpointId());
                Assert.assertTrue(checkpointMetaData.getTimestamp() > 0);
            }
        }

        public void abortCheckpointOnBarrier(long j, Throwable th) {
            Assert.assertTrue("More checkpoints than expected", this.i < this.checkpointIDs.length);
            long[] jArr = this.checkpointIDs;
            int i = this.i;
            this.i = i + 1;
            long j2 = jArr[i];
            if (j2 < 0) {
                Assert.assertEquals("wrong checkpoint id for checkoint abort", -j2, j);
            } else {
                Assert.fail("got 'abortCheckpointOnBarrier()' when expecting an 'triggerCheckpointOnBarrier()'");
            }
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            throw new UnsupportedOperationException("should never be called");
        }
    }

    @Test
    public void testSingleChannelNoBarriers() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(0), createBuffer(0)};
            BarrierTracker barrierTracker = new BarrierTracker(new MockInputGate(PAGE_SIZE, 1, Arrays.asList(bufferOrEventArr)));
            for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
                Assert.assertEquals(bufferOrEvent, barrierTracker.getNextNonBlocked());
            }
            Assert.assertNull(barrierTracker.getNextNonBlocked());
            Assert.assertNull(barrierTracker.getNextNonBlocked());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultiChannelNoBarriers() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(2), createBuffer(2), createBuffer(0), createBuffer(1), createBuffer(0), createBuffer(3), createBuffer(1), createBuffer(1), createBuffer(2)};
            BarrierTracker barrierTracker = new BarrierTracker(new MockInputGate(PAGE_SIZE, 4, Arrays.asList(bufferOrEventArr)));
            for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
                Assert.assertEquals(bufferOrEvent, barrierTracker.getNextNonBlocked());
            }
            Assert.assertNull(barrierTracker.getNextNonBlocked());
            Assert.assertNull(barrierTracker.getNextNonBlocked());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSingleChannelWithBarriers() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(0), createBuffer(0), createBarrier(1L, 0), createBuffer(0), createBuffer(0), createBuffer(0), createBuffer(0), createBarrier(2L, 0), createBarrier(3L, 0), createBuffer(0), createBuffer(0), createBarrier(4L, 0), createBarrier(5L, 0), createBarrier(6L, 0), createBuffer(0)};
            BarrierTracker barrierTracker = new BarrierTracker(new MockInputGate(PAGE_SIZE, 1, Arrays.asList(bufferOrEventArr)));
            barrierTracker.registerCheckpointEventHandler(new CheckpointSequenceValidator(new long[]{1, 2, 3, 4, 5, 6}));
            for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
                if (bufferOrEvent.isBuffer() || bufferOrEvent.getEvent().getClass() != CheckpointBarrier.class) {
                    Assert.assertEquals(bufferOrEvent, barrierTracker.getNextNonBlocked());
                }
            }
            Assert.assertNull(barrierTracker.getNextNonBlocked());
            Assert.assertNull(barrierTracker.getNextNonBlocked());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSingleChannelWithSkippedBarriers() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBarrier(1L, 0), createBuffer(0), createBuffer(0), createBarrier(3L, 0), createBuffer(0), createBarrier(4L, 0), createBarrier(6L, 0), createBuffer(0), createBarrier(7L, 0), createBuffer(0), createBarrier(10L, 0), createBuffer(0)};
            BarrierTracker barrierTracker = new BarrierTracker(new MockInputGate(PAGE_SIZE, 1, Arrays.asList(bufferOrEventArr)));
            barrierTracker.registerCheckpointEventHandler(new CheckpointSequenceValidator(new long[]{1, 3, 4, 6, 7, 10}));
            for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
                if (bufferOrEvent.isBuffer() || bufferOrEvent.getEvent().getClass() != CheckpointBarrier.class) {
                    Assert.assertEquals(bufferOrEvent, barrierTracker.getNextNonBlocked());
                }
            }
            Assert.assertNull(barrierTracker.getNextNonBlocked());
            Assert.assertNull(barrierTracker.getNextNonBlocked());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultiChannelWithBarriers() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(2), createBuffer(1), createBarrier(1L, 0), createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2), createBarrier(2L, 0), createBarrier(2L, 1), createBarrier(2L, 2), createBuffer(2), createBuffer(2), createBarrier(3L, 2), createBuffer(2), createBuffer(2), createBarrier(3L, 0), createBarrier(3L, 1), createBarrier(4L, 1), createBarrier(4L, 2), createBarrier(4L, 0), createBuffer(0)};
            BarrierTracker barrierTracker = new BarrierTracker(new MockInputGate(PAGE_SIZE, 3, Arrays.asList(bufferOrEventArr)));
            barrierTracker.registerCheckpointEventHandler(new CheckpointSequenceValidator(new long[]{1, 2, 3, 4}));
            for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
                if (bufferOrEvent.isBuffer() || bufferOrEvent.getEvent().getClass() != CheckpointBarrier.class) {
                    Assert.assertEquals(bufferOrEvent, barrierTracker.getNextNonBlocked());
                }
            }
            Assert.assertNull(barrierTracker.getNextNonBlocked());
            Assert.assertNull(barrierTracker.getNextNonBlocked());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultiChannelSkippingCheckpoints() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(2), createBuffer(1), createBarrier(1L, 0), createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2), createBarrier(2L, 0), createBarrier(2L, 1), createBarrier(2L, 2), createBuffer(2), createBuffer(2), createBarrier(3L, 2), createBuffer(2), createBuffer(2), createBarrier(4L, 0), createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(4L, 1), createBuffer(1), createBarrier(4L, 2), createBuffer(0)};
            BarrierTracker barrierTracker = new BarrierTracker(new MockInputGate(PAGE_SIZE, 3, Arrays.asList(bufferOrEventArr)));
            barrierTracker.registerCheckpointEventHandler(new CheckpointSequenceValidator(new long[]{1, 2, 4}));
            for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
                if (bufferOrEvent.isBuffer() || bufferOrEvent.getEvent().getClass() != CheckpointBarrier.class) {
                    Assert.assertEquals(bufferOrEvent, barrierTracker.getNextNonBlocked());
                }
            }
            Assert.assertNull(barrierTracker.getNextNonBlocked());
            Assert.assertNull(barrierTracker.getNextNonBlocked());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCompleteCheckpointsOnLateBarriers() {
        try {
            BufferOrEvent[] bufferOrEventArr = {createBuffer(1), createBuffer(1), createBuffer(0), createBuffer(2), createBarrier(2L, 1), createBarrier(2L, 0), createBarrier(2L, 2), createBuffer(1), createBuffer(0), createBarrier(3L, 1), createBarrier(3L, 2), createBuffer(1), createBuffer(0), createBarrier(4L, 2), createBarrier(4L, 1), createBuffer(1), createBuffer(2), createBarrier(3L, 0), createBuffer(0), createBarrier(4L, 0), createBuffer(1), createBuffer(2), createBarrier(5L, 1), createBuffer(0), createBarrier(5L, 0), createBuffer(1), createBarrier(5L, 2), createBuffer(1), createBarrier(6L, 1), createBuffer(0), createBarrier(6L, 0), createBuffer(1), createBarrier(7L, 1), createBuffer(0), createBarrier(7L, 2), createBuffer(2), createBarrier(8L, 2), createBuffer(0), createBarrier(8L, 1), createBuffer(1), createBarrier(9L, 1), createBarrier(7L, 0), createBuffer(0), createBarrier(9L, 2), createBuffer(2), createBarrier(10L, 2), createBarrier(8L, 0), createBuffer(1), createBuffer(2), createBarrier(9L, 0), createBuffer(1), createBuffer(0), createBuffer(2)};
            BarrierTracker barrierTracker = new BarrierTracker(new MockInputGate(PAGE_SIZE, 3, Arrays.asList(bufferOrEventArr)));
            barrierTracker.registerCheckpointEventHandler(new CheckpointSequenceValidator(new long[]{2, 3, 4, 5, 7, 8, 9}));
            for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
                if (bufferOrEvent.isBuffer() || bufferOrEvent.getEvent().getClass() != CheckpointBarrier.class) {
                    Assert.assertEquals(bufferOrEvent, barrierTracker.getNextNonBlocked());
                }
            }
            Assert.assertNull(barrierTracker.getNextNonBlocked());
            Assert.assertNull(barrierTracker.getNextNonBlocked());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSingleChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBarrier(1L, 0), createBuffer(0), createBarrier(2L, 0), createCancellationBarrier(4L, 0), createBarrier(5L, 0), createBuffer(0), createCancellationBarrier(6L, 0), createBuffer(0)};
        BarrierTracker barrierTracker = new BarrierTracker(new MockInputGate(PAGE_SIZE, 1, Arrays.asList(bufferOrEventArr)));
        barrierTracker.registerCheckpointEventHandler(new CheckpointSequenceValidator(new long[]{1, 2, -4, 5, -6}));
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            if (bufferOrEvent.isBuffer()) {
                Assert.assertEquals(bufferOrEvent, barrierTracker.getNextNonBlocked());
            }
            Assert.assertTrue(barrierTracker.isEmpty());
        }
        Assert.assertNull(barrierTracker.getNextNonBlocked());
        Assert.assertNull(barrierTracker.getNextNonBlocked());
    }

    @Test
    public void testMultiChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1L, 1), createBarrier(1L, 2), createBuffer(2), createBuffer(1), createBarrier(1L, 0), createBuffer(0), createBuffer(2), createBarrier(2L, 0), createBarrier(2L, 2), createBuffer(0), createBuffer(2), createCancellationBarrier(2L, 1), createBuffer(2), createBuffer(1), createBarrier(3L, 1), createBarrier(3L, 2), createBarrier(3L, 0), createBuffer(0), createBuffer(1), createCancellationBarrier(4L, 1), createBarrier(4L, 2), createBuffer(0), createBarrier(4L, 0), createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(5L, 2), createBarrier(5L, 1), createBarrier(5L, 0), createBuffer(0), createBuffer(1), createCancellationBarrier(6L, 1), createCancellationBarrier(6L, 2), createBarrier(6L, 0), createBuffer(0)};
        BarrierTracker barrierTracker = new BarrierTracker(new MockInputGate(PAGE_SIZE, 3, Arrays.asList(bufferOrEventArr)));
        barrierTracker.registerCheckpointEventHandler(new CheckpointSequenceValidator(new long[]{1, -2, 3, -4, 5, -6}));
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            if (bufferOrEvent.isBuffer()) {
                Assert.assertEquals(bufferOrEvent, barrierTracker.getNextNonBlocked());
            }
        }
        Assert.assertTrue(barrierTracker.isEmpty());
        Assert.assertNull(barrierTracker.getNextNonBlocked());
        Assert.assertNull(barrierTracker.getNextNonBlocked());
        Assert.assertTrue(barrierTracker.isEmpty());
    }

    @Test
    public void testInterleavedCancellationBarriers() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBarrier(1L, 0), createCancellationBarrier(2L, 0), createCancellationBarrier(1L, 1), createCancellationBarrier(2L, 1), createCancellationBarrier(1L, 2), createCancellationBarrier(2L, 2), createBuffer(0)};
        BarrierTracker barrierTracker = new BarrierTracker(new MockInputGate(PAGE_SIZE, 3, Arrays.asList(bufferOrEventArr)));
        StatefulTask statefulTask = (StatefulTask) Mockito.mock(StatefulTask.class);
        barrierTracker.registerCheckpointEventHandler(statefulTask);
        for (BufferOrEvent bufferOrEvent : bufferOrEventArr) {
            if (bufferOrEvent.isBuffer() || (bufferOrEvent.getEvent().getClass() != CheckpointBarrier.class && bufferOrEvent.getEvent().getClass() != CancelCheckpointMarker.class)) {
                Assert.assertEquals(bufferOrEvent, barrierTracker.getNextNonBlocked());
            }
        }
        ((StatefulTask) Mockito.verify(statefulTask, Mockito.times(1))).abortCheckpointOnBarrier(Matchers.eq(1L), (Throwable) Matchers.any(Throwable.class));
        ((StatefulTask) Mockito.verify(statefulTask, Mockito.times(1))).abortCheckpointOnBarrier(Matchers.eq(2L), (Throwable) Matchers.any(Throwable.class));
    }

    private static BufferOrEvent createBarrier(long j, int i) {
        return new BufferOrEvent(new CheckpointBarrier(j, System.currentTimeMillis()), i);
    }

    private static BufferOrEvent createCancellationBarrier(long j, int i) {
        return new BufferOrEvent(new CancelCheckpointMarker(j), i);
    }

    private static BufferOrEvent createBuffer(int i) {
        return new BufferOrEvent(new Buffer(MemorySegmentFactory.wrap(new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), i);
    }
}
