package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
import org.mockito.hamcrest.MockitoHamcrest;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.class */
public class StreamTaskCancellationBarrierTest {

    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds(10);

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest$IdentityMap.class */
    private static class IdentityMap implements MapFunction<String, String> {
        private static final long serialVersionUID = 1;

        private IdentityMap() {
        }

        public String map(String str) throws Exception {
            return str;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest$InitBlockingTask.class */
    private static class InitBlockingTask extends StreamTaskTest.NoOpStreamTask<String, AbstractStreamOperator<String>> {
        private final Object lock;
        private volatile boolean running;

        protected InitBlockingTask(Environment environment) {
            super(environment);
            this.lock = new Object();
            this.running = true;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask
        public void init() throws Exception {
            super.init();
            synchronized (this.lock) {
                while (this.running) {
                    this.lock.wait();
                }
            }
        }

        protected void cancelTask() throws Exception {
            this.running = false;
            synchronized (this.lock) {
                this.lock.notifyAll();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest$UnionCoMap.class */
    private static class UnionCoMap implements CoMapFunction<String, String, String> {
        private static final long serialVersionUID = 1;

        private UnionCoMap() {
        }

        public String map1(String str) throws Exception {
            return str;
        }

        public String map2(String str) throws Exception {
            return str;
        }
    }

    @Test
    public void testDeclineCallOnCancelBarrierOneInput() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = oneInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new StreamMap(new IdentityMap()));
        streamConfig.setOperatorID(new OperatorID());
        StreamMockEnvironment streamMockEnvironment = (StreamMockEnvironment) Mockito.spy(oneInputStreamTaskTestHarness.createEnvironment());
        oneInputStreamTaskTestHarness.invoke(streamMockEnvironment);
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        oneInputStreamTaskTestHarness.processEvent(new CancelCheckpointMarker(2L), 0, 1);
        oneInputStreamTaskTestHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0);
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        ((StreamMockEnvironment) Mockito.verify(streamMockEnvironment, Mockito.times(1))).declineCheckpoint(Mockito.eq(2L), (Throwable) MockitoHamcrest.argThat(new CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
        Object poll = oneInputStreamTaskTestHarness.getOutput().poll();
        Assert.assertNotNull("nothing emitted", poll);
        Assert.assertTrue("wrong type emitted", poll instanceof CancelCheckpointMarker);
        Assert.assertEquals("wrong checkpoint id", 2L, ((CancelCheckpointMarker) poll).getCheckpointId());
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
    }

    @Test
    public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception {
        TwoInputStreamTaskTestHarness twoInputStreamTaskTestHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        twoInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = twoInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new CoStreamMap(new UnionCoMap()));
        streamConfig.setOperatorID(new OperatorID());
        StreamMockEnvironment streamMockEnvironment = (StreamMockEnvironment) Mockito.spy(twoInputStreamTaskTestHarness.createEnvironment());
        twoInputStreamTaskTestHarness.invoke(streamMockEnvironment);
        twoInputStreamTaskTestHarness.waitForTaskRunning();
        twoInputStreamTaskTestHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0);
        twoInputStreamTaskTestHarness.processEvent(new CancelCheckpointMarker(2L), 1, 0);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        ((StreamMockEnvironment) Mockito.verify(streamMockEnvironment, Mockito.times(1))).declineCheckpoint(Mockito.eq(2L), (Throwable) MockitoHamcrest.argThat(new CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
        Object poll = twoInputStreamTaskTestHarness.getOutput().poll();
        Assert.assertNotNull("nothing emitted", poll);
        Assert.assertTrue("wrong type emitted", poll instanceof CancelCheckpointMarker);
        Assert.assertEquals("wrong checkpoint id", 2L, ((CancelCheckpointMarker) poll).getCheckpointId());
        twoInputStreamTaskTestHarness.endInput();
        twoInputStreamTaskTestHarness.waitForTaskCompletion();
    }
}
