package org.apache.flink.runtime.taskmanager;

import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.class */
public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
    private static volatile Exception ASYNC_PRODUCER_EXCEPTION;
    private static volatile Exception ASYNC_CONSUMER_EXCEPTION;
    private static volatile Thread ASYNC_PRODUCER_THREAD;
    private static volatile Thread ASYNC_CONSUMER_THREAD;

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase$AsyncConsumer.class */
    public static class AsyncConsumer extends AbstractInvokable {

        /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase$AsyncConsumer$ConsumerThread.class */
        private static class ConsumerThread extends Thread {
            private final InputGate inputGate;

            public ConsumerThread(InputGate inputGate) {
                this.inputGate = inputGate;
            }

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        this.inputGate.getNextBufferOrEvent();
                    } catch (Exception e) {
                        Exception unused = TaskCancelAsyncProducerConsumerITCase.ASYNC_CONSUMER_EXCEPTION = e;
                        return;
                    }
                }
            }
        }

        public void invoke() throws Exception {
            ConsumerThread consumerThread = new ConsumerThread(getEnvironment().getInputGate(0));
            Thread unused = TaskCancelAsyncProducerConsumerITCase.ASYNC_CONSUMER_THREAD = consumerThread;
            consumerThread.start();
            while (consumerThread.isAlive()) {
                try {
                    consumerThread.join();
                } catch (InterruptedException e) {
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase$AsyncProducer.class */
    public static class AsyncProducer extends AbstractInvokable {

        /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase$AsyncProducer$ProducerThread.class */
        private static class ProducerThread extends Thread {
            private final RecordWriter<LongValue> recordWriter;

            public ProducerThread(ResultPartitionWriter resultPartitionWriter) {
                this.recordWriter = new RecordWriter<>(resultPartitionWriter);
            }

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                LongValue longValue = new LongValue(0L);
                while (true) {
                    try {
                        longValue.setValue(longValue.getValue() + 1);
                        this.recordWriter.emit(longValue);
                        this.recordWriter.flush();
                    } catch (Exception e) {
                        Exception unused = TaskCancelAsyncProducerConsumerITCase.ASYNC_PRODUCER_EXCEPTION = e;
                        return;
                    }
                }
            }
        }

        public void invoke() throws Exception {
            ProducerThread producerThread = new ProducerThread(getEnvironment().getWriter(0));
            Thread unused = TaskCancelAsyncProducerConsumerITCase.ASYNC_PRODUCER_THREAD = producerThread;
            producerThread.start();
            while (producerThread.isAlive()) {
                try {
                    producerThread.join();
                } catch (InterruptedException e) {
                }
            }
        }
    }

    @Test
    public void testCancelAsyncProducerAndConsumer() throws Exception {
        Deadline fromNow = new FiniteDuration(2L, TimeUnit.MINUTES).fromNow();
        TestingCluster testingCluster = null;
        try {
            Configuration configuration = new Configuration();
            configuration.setInteger("local.number-taskmanager", 1);
            configuration.setInteger("taskmanager.numberOfTaskSlots", 1);
            configuration.setInteger("taskmanager.memory.segment-size", 4096);
            configuration.setInteger("taskmanager.network.numberOfBuffers", 8);
            testingCluster = new TestingCluster(configuration, true);
            testingCluster.start();
            JobVertex jobVertex = new JobVertex("AsyncProducer");
            jobVertex.setParallelism(1);
            jobVertex.setInvokableClass(AsyncProducer.class);
            JobVertex jobVertex2 = new JobVertex("AsyncConsumer");
            jobVertex2.setParallelism(1);
            jobVertex2.setInvokableClass(AsyncConsumer.class);
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE);
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup(new JobVertexID[]{jobVertex.getID(), jobVertex2.getID()});
            jobVertex.setSlotSharingGroup(slotSharingGroup);
            jobVertex2.setSlotSharingGroup(slotSharingGroup);
            JobGraph jobGraph = new JobGraph(new JobVertex[]{jobVertex, jobVertex2});
            ActorGateway leaderGateway = testingCluster.getLeaderGateway(fromNow.timeLeft());
            testingCluster.submitJobDetached(jobGraph);
            Await.ready(leaderGateway.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), fromNow.timeLeft()), fromNow.timeLeft());
            Future ask = leaderGateway.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.CANCELED), fromNow.timeLeft());
            boolean z = false;
            for (int i = 0; i < 50; i++) {
                Thread thread = ASYNC_PRODUCER_THREAD;
                if (thread != null && thread.isAlive()) {
                    z = isInBlockingBufferRequest(thread.getStackTrace());
                }
                if (z) {
                    break;
                }
                Thread.sleep(500L);
            }
            Assert.assertTrue("Producer thread is not blocked.", z);
            boolean z2 = false;
            for (int i2 = 0; i2 < 50; i2++) {
                Thread thread2 = ASYNC_CONSUMER_THREAD;
                if (thread2 != null && thread2.isAlive()) {
                    z2 = isInBlockingQueuePoll(thread2.getStackTrace());
                }
                if (z2) {
                    break;
                }
                Thread.sleep(500L);
            }
            Assert.assertTrue("Consumer thread is not blocked.", z2);
            Await.ready(leaderGateway.ask(new JobManagerMessages.CancelJob(jobGraph.getJobID()), fromNow.timeLeft()), fromNow.timeLeft());
            Await.ready(ask, fromNow.timeLeft());
            Assert.assertNotNull(ASYNC_PRODUCER_EXCEPTION);
            Assert.assertEquals(IllegalStateException.class, ASYNC_PRODUCER_EXCEPTION.getClass());
            Assert.assertNotNull(ASYNC_CONSUMER_EXCEPTION);
            Assert.assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass());
            if (testingCluster != null) {
                testingCluster.shutdown();
            }
        } catch (Throwable th) {
            if (testingCluster != null) {
                testingCluster.shutdown();
            }
            throw th;
        }
    }

    private boolean isInBlockingBufferRequest(StackTraceElement[] stackTraceElementArr) {
        return stackTraceElementArr.length >= 3 && stackTraceElementArr[0].getMethodName().equals("wait") && stackTraceElementArr[1].getMethodName().equals("requestBuffer") && stackTraceElementArr[2].getMethodName().equals("requestBufferBlocking");
    }

    private boolean isInBlockingQueuePoll(StackTraceElement[] stackTraceElementArr) {
        for (StackTraceElement stackTraceElement : stackTraceElementArr) {
            if (stackTraceElement.getMethodName().equals("poll") && stackTraceElement.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) {
                return true;
            }
        }
        return false;
    }
}
