/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.RuntimeEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierUnaligner;
import org.apache.flink.streaming.runtime.io.MockIndexedInputGate;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class CheckpointBarrierUnalignerCancellationTest {
    private final List<RuntimeEvent> events;
    private final boolean expectTriggerCheckpoint;
    private final boolean expectAbortCheckpoint;
    private final int numChannels;
    private final int channel;

    public CheckpointBarrierUnalignerCancellationTest(boolean expectTriggerCheckpoint, boolean expectAbortCheckpoint, List<RuntimeEvent> events, int numChannels, int channel) {
        this.events = events;
        this.expectTriggerCheckpoint = expectTriggerCheckpoint;
        this.expectAbortCheckpoint = expectAbortCheckpoint;
        this.numChannels = numChannels;
        this.channel = channel;
    }

    @Parameterized.Parameters(name="expect trigger: {0}, expect abort {1}, numChannels: {3}, chan: {4}, events: {2}")
    public static Object[][] parameters() {
        return new Object[][]{{false, true, Arrays.asList(CheckpointBarrierUnalignerCancellationTest.cancel(10), CheckpointBarrierUnalignerCancellationTest.cancel(20)), 1, 0}, {false, true, Arrays.asList(CheckpointBarrierUnalignerCancellationTest.cancel(20), CheckpointBarrierUnalignerCancellationTest.cancel(10)), 1, 0}, {false, true, Arrays.asList(CheckpointBarrierUnalignerCancellationTest.cancel(10), CheckpointBarrierUnalignerCancellationTest.checkpoint(10)), 1, 0}, {true, true, Arrays.asList(CheckpointBarrierUnalignerCancellationTest.cancel(10), CheckpointBarrierUnalignerCancellationTest.checkpoint(20)), 1, 0}, {false, true, Arrays.asList(CheckpointBarrierUnalignerCancellationTest.cancel(20), CheckpointBarrierUnalignerCancellationTest.checkpoint(10)), 1, 0}, {true, false, Arrays.asList(CheckpointBarrierUnalignerCancellationTest.checkpoint(10), CheckpointBarrierUnalignerCancellationTest.checkpoint(10)), 1, 0}, {true, false, Arrays.asList(CheckpointBarrierUnalignerCancellationTest.checkpoint(10), CheckpointBarrierUnalignerCancellationTest.checkpoint(20)), 1, 0}, {true, true, Arrays.asList(CheckpointBarrierUnalignerCancellationTest.checkpoint(10), CheckpointBarrierUnalignerCancellationTest.checkpoint(20)), 2, 0}, {true, false, Arrays.asList(CheckpointBarrierUnalignerCancellationTest.checkpoint(20), CheckpointBarrierUnalignerCancellationTest.checkpoint(10)), 1, 0}, {true, true, Arrays.asList(CheckpointBarrierUnalignerCancellationTest.checkpoint(10), CheckpointBarrierUnalignerCancellationTest.cancel(10)), 1, 0}, {true, true, Arrays.asList(CheckpointBarrierUnalignerCancellationTest.checkpoint(10), CheckpointBarrierUnalignerCancellationTest.cancel(20)), 1, 0}, {true, false, Arrays.asList(CheckpointBarrierUnalignerCancellationTest.checkpoint(20), CheckpointBarrierUnalignerCancellationTest.cancel(10)), 1, 0}};
    }

    @Test
    public void test() throws Exception {
        TestInvokable invokable = new TestInvokable();
        CheckpointBarrierUnaligner unaligner = new CheckpointBarrierUnaligner((SubtaskCheckpointCoordinator)TestSubtaskCheckpointCoordinator.INSTANCE, "test", (AbstractInvokable)invokable, new InputGate[]{new MockIndexedInputGate(0, this.numChannels)});
        for (RuntimeEvent e : this.events) {
            if (e instanceof CancelCheckpointMarker) {
                unaligner.processCancellationBarrier((CancelCheckpointMarker)e);
                continue;
            }
            if (e instanceof CheckpointBarrier) {
                unaligner.processBarrier((CheckpointBarrier)e, new InputChannelInfo(0, this.channel));
                continue;
            }
            throw new IllegalArgumentException("unexpected event type: " + e);
        }
        Assert.assertEquals((String)"expectAbortCheckpoint", (Object)this.expectAbortCheckpoint, (Object)invokable.checkpointAborted);
        Assert.assertEquals((String)"expectTriggerCheckpoint", (Object)this.expectTriggerCheckpoint, (Object)invokable.checkpointTriggered);
    }

    private static CheckpointBarrier checkpoint(int checkpointId) {
        return new CheckpointBarrier((long)checkpointId, 1L, CheckpointOptions.forCheckpointWithDefaultLocation());
    }

    private static CancelCheckpointMarker cancel(int checkpointId) {
        return new CancelCheckpointMarker((long)checkpointId);
    }

    private static class TestInvokable
    extends AbstractInvokable {
        private boolean checkpointAborted;
        private boolean checkpointTriggered;

        TestInvokable() {
            super((Environment)new DummyEnvironment());
        }

        public void invoke() {
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) {
            this.checkpointTriggered = true;
        }

        public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
            this.checkpointAborted = true;
        }

        public <E extends Exception> void executeInTaskThread(ThrowingRunnable<E> runnable, String descriptionFormat, Object ... descriptionArgs) {
            try {
                runnable.run();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}

