/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.runtime.io.AlternatingCheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierUnaligner;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class AlternatingCheckpointBarrierHandlerTest {
    @Test
    public void testCheckpointHandling() throws Exception {
        this.testBarrierHandling(CheckpointType.CHECKPOINT);
    }

    @Test
    public void testSavepointHandling() throws Exception {
        this.testBarrierHandling(CheckpointType.SAVEPOINT);
    }

    @Test
    public void testAlternation() throws Exception {
        int numBarriers = 123;
        int numChannels = 123;
        TestInvokable target = new TestInvokable();
        CheckpointedInputGate gate = AlternatingCheckpointBarrierHandlerTest.buildGate(target, numChannels);
        ArrayList<Long> barriers = new ArrayList<Long>();
        for (long barrier = 0L; barrier < (long)numBarriers; ++barrier) {
            barriers.add(barrier);
            CheckpointType type = barrier % 2L == 0L ? CheckpointType.CHECKPOINT : CheckpointType.SAVEPOINT;
            for (int channel = 0; channel < numChannels; ++channel) {
                this.sendBarrier(barrier, type, (TestInputChannel)gate.getChannel(channel), gate);
            }
        }
        Assert.assertEquals(barriers, (Object)target.triggeredCheckpoints);
    }

    @Test
    public void testMetricsAlternation() throws Exception {
        int numChannels = 2;
        TestInvokable target = new TestInvokable();
        CheckpointedInputGate gate = AlternatingCheckpointBarrierHandlerTest.buildGate(target, numChannels);
        long checkpoint1CreationTime = System.currentTimeMillis() - 10L;
        this.sendBarrier(1L, checkpoint1CreationTime, CheckpointType.CHECKPOINT, gate, 0);
        Thread.sleep(10L);
        this.sendBarrier(1L, checkpoint1CreationTime, CheckpointType.CHECKPOINT, gate, 1);
        this.assertMetrics(gate.getCheckpointBarrierHandler(), 1L, 0L, 10000000L);
        long checkpoint2CreationTime = System.currentTimeMillis() - 5L;
        this.sendBarrier(2L, checkpoint2CreationTime, CheckpointType.SAVEPOINT, gate, 0);
        this.assertMetrics(gate.getCheckpointBarrierHandler(), 2L, 0L, 5000000L);
        Thread.sleep(5L);
        this.sendBarrier(2L, checkpoint2CreationTime, CheckpointType.SAVEPOINT, gate, 1);
        this.assertMetrics(gate.getCheckpointBarrierHandler(), 2L, 5000000L, 5000000L);
        long checkpoint3CreationTime = System.currentTimeMillis() - 7L;
        this.sendBarrier(3L, checkpoint3CreationTime, CheckpointType.CHECKPOINT, gate, 0);
        this.assertMetrics(gate.getCheckpointBarrierHandler(), 3L, 0L, 7000000L);
        Thread.sleep(7L);
        this.sendBarrier(3L, checkpoint2CreationTime, CheckpointType.CHECKPOINT, gate, 1);
        this.assertMetrics(gate.getCheckpointBarrierHandler(), 3L, 0L, 7000000L);
    }

    private void assertMetrics(CheckpointBarrierHandler checkpointBarrierHandler, long latestCheckpointId, long alignmentDurationNanos, long startDelayNanos) {
        MatcherAssert.assertThat((Object)checkpointBarrierHandler.getLatestCheckpointId(), (Matcher)Matchers.equalTo((Object)latestCheckpointId));
        MatcherAssert.assertThat((Object)checkpointBarrierHandler.getAlignmentDurationNanos(), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(alignmentDurationNanos)));
        MatcherAssert.assertThat((Object)checkpointBarrierHandler.getCheckpointStartDelayNanos(), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(startDelayNanos)));
    }

    @Test
    public void testPreviousHandlerReset() throws Exception {
        SingleInputGate inputGate = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        inputGate.setInputChannels(new InputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)});
        TestInvokable target = new TestInvokable();
        CheckpointBarrierAligner alignedHandler = new CheckpointBarrierAligner("test", (AbstractInvokable)target, new InputGate[]{inputGate});
        CheckpointBarrierUnaligner unalignedHandler = new CheckpointBarrierUnaligner((SubtaskCheckpointCoordinator)TestSubtaskCheckpointCoordinator.INSTANCE, "test", (AbstractInvokable)target, new InputGate[]{inputGate});
        AlternatingCheckpointBarrierHandler barrierHandler = new AlternatingCheckpointBarrierHandler(alignedHandler, unalignedHandler, (AbstractInvokable)target);
        for (int i = 0; i < 4; ++i) {
            int channel = i % 2;
            CheckpointType type = channel == 0 ? CheckpointType.CHECKPOINT : CheckpointType.SAVEPOINT;
            barrierHandler.processBarrier(new CheckpointBarrier((long)i, 0L, new CheckpointOptions(type, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, channel));
            Assert.assertEquals((Object)type.isSavepoint(), (Object)alignedHandler.isCheckpointPending());
            Assert.assertNotEquals((Object)alignedHandler.isCheckpointPending(), (Object)unalignedHandler.isCheckpointPending());
            if (type.isSavepoint()) continue;
            Assert.assertFalse((boolean)barrierHandler.getAllBarriersReceivedFuture((long)i).isDone());
            AlternatingCheckpointBarrierHandlerTest.assertInflightDataEquals((CheckpointBarrierHandler)unalignedHandler, (CheckpointBarrierHandler)barrierHandler, i, inputGate.getNumberOfInputChannels());
        }
    }

    private static void assertInflightDataEquals(CheckpointBarrierHandler expected, CheckpointBarrierHandler actual, long barrierId, int numChannels) {
        for (int channelId = 0; channelId < numChannels; ++channelId) {
            InputChannelInfo channelInfo = new InputChannelInfo(0, channelId);
            Assert.assertEquals((Object)expected.hasInflightData(barrierId, channelInfo), (Object)actual.hasInflightData(barrierId, channelInfo));
        }
    }

    @Test
    public void testHasInflightDataBeforeProcessBarrier() throws Exception {
        SingleInputGate inputGate = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        inputGate.setInputChannels(new InputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)});
        TestInvokable target = new TestInvokable();
        CheckpointBarrierAligner alignedHandler = new CheckpointBarrierAligner("test", (AbstractInvokable)target, new InputGate[]{inputGate});
        CheckpointBarrierUnaligner unalignedHandler = new CheckpointBarrierUnaligner((SubtaskCheckpointCoordinator)TestSubtaskCheckpointCoordinator.INSTANCE, "test", (AbstractInvokable)target, new InputGate[]{inputGate});
        AlternatingCheckpointBarrierHandler barrierHandler = new AlternatingCheckpointBarrierHandler(alignedHandler, unalignedHandler, (AbstractInvokable)target);
        long id = 1L;
        unalignedHandler.processBarrier(new CheckpointBarrier(1L, 0L, new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0));
        AlternatingCheckpointBarrierHandlerTest.assertInflightDataEquals((CheckpointBarrierHandler)unalignedHandler, (CheckpointBarrierHandler)barrierHandler, 1L, inputGate.getNumberOfInputChannels());
        Assert.assertFalse((boolean)barrierHandler.getAllBarriersReceivedFuture(1L).isDone());
    }

    @Test
    public void testOutOfOrderBarrier() throws Exception {
        SingleInputGate inputGate = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        inputGate.setInputChannels(new InputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)});
        TestInvokable target = new TestInvokable();
        CheckpointBarrierAligner alignedHandler = new CheckpointBarrierAligner("test", (AbstractInvokable)target, new InputGate[]{inputGate});
        CheckpointBarrierUnaligner unalignedHandler = new CheckpointBarrierUnaligner((SubtaskCheckpointCoordinator)TestSubtaskCheckpointCoordinator.INSTANCE, "test", (AbstractInvokable)target, new InputGate[]{inputGate});
        AlternatingCheckpointBarrierHandler barrierHandler = new AlternatingCheckpointBarrierHandler(alignedHandler, unalignedHandler, (AbstractInvokable)target);
        long checkpointId = 10L;
        long outOfOrderSavepointId = 5L;
        long initialAlignedCheckpointId = alignedHandler.getLatestCheckpointId();
        barrierHandler.processBarrier(new CheckpointBarrier(checkpointId, 0L, new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 0));
        barrierHandler.processBarrier(new CheckpointBarrier(outOfOrderSavepointId, 0L, new CheckpointOptions(CheckpointType.SAVEPOINT, CheckpointStorageLocationReference.getDefault())), new InputChannelInfo(0, 1));
        Assert.assertEquals((long)checkpointId, (long)barrierHandler.getLatestCheckpointId());
        AlternatingCheckpointBarrierHandlerTest.assertInflightDataEquals((CheckpointBarrierHandler)unalignedHandler, (CheckpointBarrierHandler)barrierHandler, checkpointId, inputGate.getNumberOfInputChannels());
        Assert.assertEquals((long)initialAlignedCheckpointId, (long)alignedHandler.getLatestCheckpointId());
    }

    @Test
    public void testEndOfPartition() throws Exception {
        int totalChannels = 5;
        int closedChannels = 2;
        SingleInputGate inputGate = new SingleInputGateBuilder().setNumberOfChannels(totalChannels).setChannelFactory(InputChannelBuilder::buildLocalChannel).build();
        TestInvokable target = new TestInvokable();
        CheckpointBarrierAligner alignedHandler = new CheckpointBarrierAligner("test", (AbstractInvokable)target, new InputGate[]{inputGate});
        CheckpointBarrierUnaligner unalignedHandler = new CheckpointBarrierUnaligner((SubtaskCheckpointCoordinator)TestSubtaskCheckpointCoordinator.INSTANCE, "test", (AbstractInvokable)target, new InputGate[]{inputGate});
        AlternatingCheckpointBarrierHandler barrierHandler = new AlternatingCheckpointBarrierHandler(alignedHandler, unalignedHandler, (AbstractInvokable)target);
        for (int i = 0; i < closedChannels; ++i) {
            barrierHandler.processEndOfPartition();
        }
        Assert.assertEquals((long)closedChannels, (long)alignedHandler.getNumClosedChannels());
        Assert.assertEquals((long)(totalChannels - closedChannels), (long)unalignedHandler.getNumOpenChannels());
    }

    private void testBarrierHandling(CheckpointType checkpointType) throws Exception {
        long barrierId = 123L;
        TestInvokable target = new TestInvokable();
        SingleInputGate gate = new SingleInputGateBuilder().setNumberOfChannels(2).build();
        TestInputChannel fast = new TestInputChannel(gate, 0, false, true);
        TestInputChannel slow = new TestInputChannel(gate, 1, false, true);
        gate.setInputChannels(new InputChannel[]{fast, slow});
        AlternatingCheckpointBarrierHandler barrierHandler = AlternatingCheckpointBarrierHandlerTest.barrierHandler(gate, target);
        CheckpointedInputGate checkpointedGate = new CheckpointedInputGate((InputGate)gate, (CheckpointBarrierHandler)barrierHandler);
        this.sendBarrier(123L, checkpointType, fast, checkpointedGate);
        Assert.assertEquals((Object)checkpointType.isSavepoint(), (Object)target.triggeredCheckpoints.isEmpty());
        Assert.assertEquals((Object)checkpointType.isSavepoint(), (Object)barrierHandler.isBlocked(fast.getChannelInfo()));
        Assert.assertFalse((boolean)barrierHandler.isBlocked(slow.getChannelInfo()));
        this.sendBarrier(123L, checkpointType, slow, checkpointedGate);
        Assert.assertEquals(Collections.singletonList(123L), (Object)target.triggeredCheckpoints);
        for (InputChannel channel : gate.getInputChannels().values()) {
            Assert.assertFalse((boolean)barrierHandler.isBlocked(channel.getChannelInfo()));
            Assert.assertEquals((String)String.format("channel %d should be resumed", channel.getChannelIndex()), (Object)checkpointType.isSavepoint(), (Object)((TestInputChannel)channel).isResumed());
        }
    }

    private void sendBarrier(long barrierId, long barrierCreationTime, CheckpointType type, CheckpointedInputGate gate, int channelId) throws Exception {
        TestInputChannel channel = (TestInputChannel)gate.getChannel(channelId);
        channel.read(this.barrier(barrierId, type, barrierCreationTime).retainBuffer());
        while (gate.pollNext().isPresent()) {
        }
    }

    private void sendBarrier(long barrierId, CheckpointType type, TestInputChannel channel, CheckpointedInputGate gate) throws Exception {
        channel.read(this.barrier(barrierId, type).retainBuffer());
        while (gate.pollNext().isPresent()) {
        }
    }

    private static AlternatingCheckpointBarrierHandler barrierHandler(SingleInputGate inputGate, AbstractInvokable target) {
        String taskName = "test";
        Object[] channelIndexToInputGate = new InputGate[inputGate.getNumberOfInputChannels()];
        Arrays.fill(channelIndexToInputGate, inputGate);
        return new AlternatingCheckpointBarrierHandler(new CheckpointBarrierAligner(taskName, target, new InputGate[]{inputGate}), new CheckpointBarrierUnaligner((SubtaskCheckpointCoordinator)TestSubtaskCheckpointCoordinator.INSTANCE, taskName, target, new InputGate[]{inputGate}), target);
    }

    private Buffer barrier(long barrierId, CheckpointType checkpointType) throws IOException {
        return this.barrier(barrierId, checkpointType, System.currentTimeMillis());
    }

    private Buffer barrier(long barrierId, CheckpointType checkpointType, long barrierTimestamp) throws IOException {
        return EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(barrierId, barrierTimestamp, new CheckpointOptions(checkpointType, CheckpointStorageLocationReference.getDefault(), true, true)));
    }

    private static CheckpointedInputGate buildGate(TestInvokable target, int numChannels) {
        SingleInputGate gate = new SingleInputGateBuilder().setNumberOfChannels(numChannels).build();
        TestInputChannel[] channels = new TestInputChannel[numChannels];
        for (int i = 0; i < numChannels; ++i) {
            channels[i] = new TestInputChannel(gate, i, false, true);
        }
        gate.setInputChannels((InputChannel[])channels);
        return new CheckpointedInputGate((InputGate)gate, (CheckpointBarrierHandler)AlternatingCheckpointBarrierHandlerTest.barrierHandler(gate, target));
    }

    private static class TestInvokable
    extends AbstractInvokable {
        private List<Long> triggeredCheckpoints = new ArrayList<Long>();

        TestInvokable() {
            super((Environment)new DummyEnvironment());
        }

        public void invoke() {
        }

        public <E extends Exception> void executeInTaskThread(ThrowingRunnable<E> runnable, String descriptionFormat, Object ... descriptionArgs) throws E {
            runnable.run();
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) {
            this.triggeredCheckpoints.add(checkpointMetaData.getCheckpointId());
        }

        public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
        }
    }
}

