/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.util.Arrays;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.runtime.io.BarrierTracker;
import org.apache.flink.streaming.runtime.io.MockInputGate;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class BarrierTrackerTest {
    private static final int PAGE_SIZE = 512;

    @Test
    public void testSingleChannelNoBarriers() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(0)};
            MockInputGate gate = new MockInputGate(512, 1, Arrays.asList(sequence));
            BarrierTracker tracker = new BarrierTracker((InputGate)gate);
            for (BufferOrEvent boe : sequence) {
                Assert.assertEquals((Object)boe, (Object)tracker.getNextNonBlocked());
            }
            Assert.assertNull((Object)tracker.getNextNonBlocked());
            Assert.assertNull((Object)tracker.getNextNonBlocked());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultiChannelNoBarriers() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(3), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(2)};
            MockInputGate gate = new MockInputGate(512, 4, Arrays.asList(sequence));
            BarrierTracker tracker = new BarrierTracker((InputGate)gate);
            for (BufferOrEvent boe : sequence) {
                Assert.assertEquals((Object)boe, (Object)tracker.getNextNonBlocked());
            }
            Assert.assertNull((Object)tracker.getNextNonBlocked());
            Assert.assertNull((Object)tracker.getNextNonBlocked());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSingleChannelWithBarriers() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(1L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(2L, 0), BarrierTrackerTest.createBarrier(3L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(4L, 0), BarrierTrackerTest.createBarrier(5L, 0), BarrierTrackerTest.createBarrier(6L, 0), BarrierTrackerTest.createBuffer(0)};
            MockInputGate gate = new MockInputGate(512, 1, Arrays.asList(sequence));
            BarrierTracker tracker = new BarrierTracker((InputGate)gate);
            CheckpointSequenceValidator validator = new CheckpointSequenceValidator(new long[]{1L, 2L, 3L, 4L, 5L, 6L});
            tracker.registerCheckpointEventHandler((StatefulTask)validator);
            for (BufferOrEvent boe : sequence) {
                if (!boe.isBuffer() && boe.getEvent().getClass() == CheckpointBarrier.class) continue;
                Assert.assertEquals((Object)boe, (Object)tracker.getNextNonBlocked());
            }
            Assert.assertNull((Object)tracker.getNextNonBlocked());
            Assert.assertNull((Object)tracker.getNextNonBlocked());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSingleChannelWithSkippedBarriers() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(1L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(3L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(4L, 0), BarrierTrackerTest.createBarrier(6L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(7L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(10L, 0), BarrierTrackerTest.createBuffer(0)};
            MockInputGate gate = new MockInputGate(512, 1, Arrays.asList(sequence));
            BarrierTracker tracker = new BarrierTracker((InputGate)gate);
            CheckpointSequenceValidator validator = new CheckpointSequenceValidator(new long[]{1L, 3L, 4L, 6L, 7L, 10L});
            tracker.registerCheckpointEventHandler((StatefulTask)validator);
            for (BufferOrEvent boe : sequence) {
                if (!boe.isBuffer() && boe.getEvent().getClass() == CheckpointBarrier.class) continue;
                Assert.assertEquals((Object)boe, (Object)tracker.getNextNonBlocked());
            }
            Assert.assertNull((Object)tracker.getNextNonBlocked());
            Assert.assertNull((Object)tracker.getNextNonBlocked());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultiChannelWithBarriers() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(1L, 1), BarrierTrackerTest.createBarrier(1L, 2), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBarrier(1L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBarrier(2L, 0), BarrierTrackerTest.createBarrier(2L, 1), BarrierTrackerTest.createBarrier(2L, 2), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBarrier(3L, 2), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBarrier(3L, 0), BarrierTrackerTest.createBarrier(3L, 1), BarrierTrackerTest.createBarrier(4L, 1), BarrierTrackerTest.createBarrier(4L, 2), BarrierTrackerTest.createBarrier(4L, 0), BarrierTrackerTest.createBuffer(0)};
            MockInputGate gate = new MockInputGate(512, 3, Arrays.asList(sequence));
            BarrierTracker tracker = new BarrierTracker((InputGate)gate);
            CheckpointSequenceValidator validator = new CheckpointSequenceValidator(new long[]{1L, 2L, 3L, 4L});
            tracker.registerCheckpointEventHandler((StatefulTask)validator);
            for (BufferOrEvent boe : sequence) {
                if (!boe.isBuffer() && boe.getEvent().getClass() == CheckpointBarrier.class) continue;
                Assert.assertEquals((Object)boe, (Object)tracker.getNextNonBlocked());
            }
            Assert.assertNull((Object)tracker.getNextNonBlocked());
            Assert.assertNull((Object)tracker.getNextNonBlocked());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultiChannelSkippingCheckpoints() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(1L, 1), BarrierTrackerTest.createBarrier(1L, 2), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBarrier(1L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBarrier(2L, 0), BarrierTrackerTest.createBarrier(2L, 1), BarrierTrackerTest.createBarrier(2L, 2), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBarrier(3L, 2), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBarrier(4L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBarrier(4L, 1), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBarrier(4L, 2), BarrierTrackerTest.createBuffer(0)};
            MockInputGate gate = new MockInputGate(512, 3, Arrays.asList(sequence));
            BarrierTracker tracker = new BarrierTracker((InputGate)gate);
            CheckpointSequenceValidator validator = new CheckpointSequenceValidator(new long[]{1L, 2L, 4L});
            tracker.registerCheckpointEventHandler((StatefulTask)validator);
            for (BufferOrEvent boe : sequence) {
                if (!boe.isBuffer() && boe.getEvent().getClass() == CheckpointBarrier.class) continue;
                Assert.assertEquals((Object)boe, (Object)tracker.getNextNonBlocked());
            }
            Assert.assertNull((Object)tracker.getNextNonBlocked());
            Assert.assertNull((Object)tracker.getNextNonBlocked());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCompleteCheckpointsOnLateBarriers() {
        try {
            BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBarrier(2L, 1), BarrierTrackerTest.createBarrier(2L, 0), BarrierTrackerTest.createBarrier(2L, 2), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(3L, 1), BarrierTrackerTest.createBarrier(3L, 2), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(4L, 2), BarrierTrackerTest.createBarrier(4L, 1), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBarrier(3L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(4L, 0), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBarrier(5L, 1), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(5L, 0), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBarrier(5L, 2), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBarrier(6L, 1), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(6L, 0), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBarrier(7L, 1), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(7L, 2), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBarrier(8L, 2), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(8L, 1), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBarrier(9L, 1), BarrierTrackerTest.createBarrier(7L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(9L, 2), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBarrier(10L, 2), BarrierTrackerTest.createBarrier(8L, 0), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBarrier(9L, 0), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(2)};
            MockInputGate gate = new MockInputGate(512, 3, Arrays.asList(sequence));
            BarrierTracker tracker = new BarrierTracker((InputGate)gate);
            CheckpointSequenceValidator validator = new CheckpointSequenceValidator(new long[]{2L, 3L, 4L, 5L, 7L, 8L, 9L});
            tracker.registerCheckpointEventHandler((StatefulTask)validator);
            for (BufferOrEvent boe : sequence) {
                if (!boe.isBuffer() && boe.getEvent().getClass() == CheckpointBarrier.class) continue;
                Assert.assertEquals((Object)boe, (Object)tracker.getNextNonBlocked());
            }
            Assert.assertNull((Object)tracker.getNextNonBlocked());
            Assert.assertNull((Object)tracker.getNextNonBlocked());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSingleChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(1L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(2L, 0), BarrierTrackerTest.createCancellationBarrier(4L, 0), BarrierTrackerTest.createBarrier(5L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createCancellationBarrier(6L, 0), BarrierTrackerTest.createBuffer(0)};
        MockInputGate gate = new MockInputGate(512, 1, Arrays.asList(sequence));
        BarrierTracker tracker = new BarrierTracker((InputGate)gate);
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(new long[]{1L, 2L, -4L, 5L, -6L});
        tracker.registerCheckpointEventHandler((StatefulTask)validator);
        for (BufferOrEvent boe : sequence) {
            if (boe.isBuffer()) {
                Assert.assertEquals((Object)boe, (Object)tracker.getNextNonBlocked());
            }
            Assert.assertTrue((boolean)tracker.isEmpty());
        }
        Assert.assertNull((Object)tracker.getNextNonBlocked());
        Assert.assertNull((Object)tracker.getNextNonBlocked());
    }

    @Test
    public void testMultiChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(1L, 1), BarrierTrackerTest.createBarrier(1L, 2), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBarrier(1L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBarrier(2L, 0), BarrierTrackerTest.createBarrier(2L, 2), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createCancellationBarrier(2L, 1), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBarrier(3L, 1), BarrierTrackerTest.createBarrier(3L, 2), BarrierTrackerTest.createBarrier(3L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createCancellationBarrier(4L, 1), BarrierTrackerTest.createBarrier(4L, 2), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBarrier(4L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createBuffer(2), BarrierTrackerTest.createBarrier(5L, 2), BarrierTrackerTest.createBarrier(5L, 1), BarrierTrackerTest.createBarrier(5L, 0), BarrierTrackerTest.createBuffer(0), BarrierTrackerTest.createBuffer(1), BarrierTrackerTest.createCancellationBarrier(6L, 1), BarrierTrackerTest.createCancellationBarrier(6L, 2), BarrierTrackerTest.createBarrier(6L, 0), BarrierTrackerTest.createBuffer(0)};
        MockInputGate gate = new MockInputGate(512, 3, Arrays.asList(sequence));
        BarrierTracker tracker = new BarrierTracker((InputGate)gate);
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(new long[]{1L, -2L, 3L, -4L, 5L, -6L});
        tracker.registerCheckpointEventHandler((StatefulTask)validator);
        for (BufferOrEvent boe : sequence) {
            if (!boe.isBuffer()) continue;
            Assert.assertEquals((Object)boe, (Object)tracker.getNextNonBlocked());
        }
        Assert.assertTrue((boolean)tracker.isEmpty());
        Assert.assertNull((Object)tracker.getNextNonBlocked());
        Assert.assertNull((Object)tracker.getNextNonBlocked());
        Assert.assertTrue((boolean)tracker.isEmpty());
    }

    @Test
    public void testInterleavedCancellationBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{BarrierTrackerTest.createBarrier(1L, 0), BarrierTrackerTest.createCancellationBarrier(2L, 0), BarrierTrackerTest.createCancellationBarrier(1L, 1), BarrierTrackerTest.createCancellationBarrier(2L, 1), BarrierTrackerTest.createCancellationBarrier(1L, 2), BarrierTrackerTest.createCancellationBarrier(2L, 2), BarrierTrackerTest.createBuffer(0)};
        MockInputGate gate = new MockInputGate(512, 3, Arrays.asList(sequence));
        BarrierTracker tracker = new BarrierTracker((InputGate)gate);
        StatefulTask statefulTask = (StatefulTask)Mockito.mock(StatefulTask.class);
        tracker.registerCheckpointEventHandler(statefulTask);
        for (BufferOrEvent boe : sequence) {
            if (!boe.isBuffer() && (boe.getEvent().getClass() == CheckpointBarrier.class || boe.getEvent().getClass() == CancelCheckpointMarker.class)) continue;
            Assert.assertEquals((Object)boe, (Object)tracker.getNextNonBlocked());
        }
        ((StatefulTask)Mockito.verify((Object)statefulTask, (VerificationMode)Mockito.times((int)1))).abortCheckpointOnBarrier(Matchers.eq((long)1L), (Throwable)Matchers.any(Throwable.class));
        ((StatefulTask)Mockito.verify((Object)statefulTask, (VerificationMode)Mockito.times((int)1))).abortCheckpointOnBarrier(Matchers.eq((long)2L), (Throwable)Matchers.any(Throwable.class));
    }

    private static BufferOrEvent createBarrier(long id, int channel) {
        return new BufferOrEvent((AbstractEvent)new CheckpointBarrier(id, System.currentTimeMillis()), channel);
    }

    private static BufferOrEvent createCancellationBarrier(long id, int channel) {
        return new BufferOrEvent((AbstractEvent)new CancelCheckpointMarker(id), channel);
    }

    private static BufferOrEvent createBuffer(int channel) {
        return new BufferOrEvent(new Buffer(MemorySegmentFactory.wrap((byte[])new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), channel);
    }

    private static class CheckpointSequenceValidator
    implements StatefulTask<StateHandle<Object>> {
        private final long[] checkpointIDs;
        private int i = 0;

        private CheckpointSequenceValidator(long ... checkpointIDs) {
            this.checkpointIDs = checkpointIDs;
        }

        public void setInitialState(StateHandle<Object> state) throws Exception {
            throw new UnsupportedOperationException("should never be called");
        }

        public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
            throw new UnsupportedOperationException("should never be called");
        }

        public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
            Assert.assertTrue((String)"More checkpoints than expected", (this.i < this.checkpointIDs.length ? 1 : 0) != 0);
            long expectedId = this.checkpointIDs[this.i++];
            if (expectedId >= 0L) {
                Assert.assertEquals((String)"wrong checkpoint id", (long)expectedId, (long)checkpointId);
                Assert.assertTrue((timestamp > 0L ? 1 : 0) != 0);
            } else {
                Assert.fail((String)"got 'triggerCheckpointOnBarrier()' when expecting an 'abortCheckpointOnBarrier()'");
            }
        }

        public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
            Assert.assertTrue((String)"More checkpoints than expected", (this.i < this.checkpointIDs.length ? 1 : 0) != 0);
            long expectedId = this.checkpointIDs[this.i++];
            if (expectedId < 0L) {
                Assert.assertEquals((String)"wrong checkpoint id for checkoint abort", (long)(-expectedId), (long)checkpointId);
            } else {
                Assert.fail((String)"got 'abortCheckpointOnBarrier()' when expecting an 'triggerCheckpointOnBarrier()'");
            }
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            throw new UnsupportedOperationException("should never be called");
        }
    }
}

