/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.hamcrest.MockitoHamcrest;
import org.mockito.verification.VerificationMode;

public class StreamTaskCancellationBarrierTest {
    @Test
    public void testEmitCancellationBarrierWhenNotReady() throws Exception {
        StreamTaskTestHarness testHarness = new StreamTaskTestHarness(InitBlockingTask::new, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        testHarness.invoke();
        StreamTask task = testHarness.getTask();
        boolean result = task.triggerCheckpoint(new CheckpointMetaData(41L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), false);
        Assert.assertFalse((String)"task triggered checkpoint though not ready", (boolean)result);
        Object emitted = testHarness.getOutput().poll();
        Assert.assertNotNull((String)"nothing emitted", (Object)emitted);
        Assert.assertTrue((String)"wrong type emitted", (boolean)(emitted instanceof CancelCheckpointMarker));
        Assert.assertEquals((String)"wrong checkpoint id", (long)41L, (long)((CancelCheckpointMarker)emitted).getCheckpointId());
    }

    @Test
    public void testDeclineCallOnCancelBarrierOneInput() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamMap mapOperator = new StreamMap((MapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)mapOperator);
        streamConfig.setOperatorID(new OperatorID());
        StreamMockEnvironment environment = (StreamMockEnvironment)Mockito.spy((Object)testHarness.createEnvironment());
        testHarness.invoke(environment);
        testHarness.waitForTaskRunning();
        testHarness.processEvent((AbstractEvent)new CancelCheckpointMarker(2L), 0, 1);
        testHarness.processEvent((AbstractEvent)new CancelCheckpointMarker(2L), 0, 0);
        testHarness.waitForInputProcessing();
        ((StreamMockEnvironment)Mockito.verify((Object)environment, (VerificationMode)Mockito.times((int)1))).declineCheckpoint(Mockito.eq((long)2L), (Throwable)MockitoHamcrest.argThat((Matcher)new CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
        Object result = testHarness.getOutput().poll();
        Assert.assertNotNull((String)"nothing emitted", (Object)result);
        Assert.assertTrue((String)"wrong type emitted", (boolean)(result instanceof CancelCheckpointMarker));
        Assert.assertEquals((String)"wrong checkpoint id", (long)2L, (long)((CancelCheckpointMarker)result).getCheckpointId());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    @Test
    public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap op = new CoStreamMap((CoMapFunction)new UnionCoMap());
        streamConfig.setStreamOperator((StreamOperator)op);
        streamConfig.setOperatorID(new OperatorID());
        StreamMockEnvironment environment = (StreamMockEnvironment)Mockito.spy((Object)testHarness.createEnvironment());
        testHarness.invoke(environment);
        testHarness.waitForTaskRunning();
        testHarness.processEvent((AbstractEvent)new CancelCheckpointMarker(2L), 0, 0);
        testHarness.processEvent((AbstractEvent)new CancelCheckpointMarker(2L), 1, 0);
        testHarness.waitForInputProcessing();
        ((StreamMockEnvironment)Mockito.verify((Object)environment, (VerificationMode)Mockito.times((int)1))).declineCheckpoint(Mockito.eq((long)2L), (Throwable)MockitoHamcrest.argThat((Matcher)new CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
        Object result = testHarness.getOutput().poll();
        Assert.assertNotNull((String)"nothing emitted", (Object)result);
        Assert.assertTrue((String)"wrong type emitted", (boolean)(result instanceof CancelCheckpointMarker));
        Assert.assertEquals((String)"wrong checkpoint id", (long)2L, (long)((CancelCheckpointMarker)result).getCheckpointId());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    private static class UnionCoMap
    implements CoMapFunction<String, String, String> {
        private static final long serialVersionUID = 1L;

        private UnionCoMap() {
        }

        public String map1(String value) throws Exception {
            return value;
        }

        public String map2(String value) throws Exception {
            return value;
        }
    }

    private static class IdentityMap
    implements MapFunction<String, String> {
        private static final long serialVersionUID = 1L;

        private IdentityMap() {
        }

        public String map(String value) throws Exception {
            return value;
        }
    }

    private static class InitBlockingTask
    extends StreamTaskTest.NoOpStreamTask<String, AbstractStreamOperator<String>> {
        private final Object lock = new Object();
        private volatile boolean running = true;

        protected InitBlockingTask(Environment env) {
            super(env);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void init() throws Exception {
            Object object = this.lock;
            synchronized (object) {
                while (this.running) {
                    this.lock.wait();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void cancelTask() throws Exception {
            this.running = false;
            Object object = this.lock;
            synchronized (object) {
                this.lock.notifyAll();
            }
        }
    }
}

