package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.Option;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.class */
public class BackPressureStatsTrackerITCase extends TestLogger {
    private static NetworkBufferPool networkBufferPool;
    private static ActorSystem testActorSystem;
    private static BufferPool testBufferPool;

    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase$BackPressuredTask.class */
    public static class BackPressuredTask extends AbstractInvokable {
        public void invoke() throws Exception {
            while (true) {
                BackPressureStatsTrackerITCase.testBufferPool.requestBufferBlocking().recycle();
                new CountDownLatch(1).await();
            }
        }
    }

    @BeforeClass
    public static void setup() {
        testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
        networkBufferPool = new NetworkBufferPool(100, 8192, MemoryType.HEAP);
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(testActorSystem);
        networkBufferPool.destroy();
    }

    @Test
    public void testBackPressuredProducer() throws Exception {
        new JavaTestKit(testActorSystem) { // from class: org.apache.flink.runtime.webmonitor.BackPressureStatsTrackerITCase.1
            {
                final FiniteDuration finiteDuration = new FiniteDuration(60L, TimeUnit.SECONDS);
                final JobGraph jobGraph = new JobGraph();
                final JobVertex jobVertex = new JobVertex("Task");
                jobVertex.setInvokableClass(BackPressuredTask.class);
                jobVertex.setParallelism(4);
                jobGraph.addVertex(jobVertex);
                final ActorGateway actorGateway = null;
                ActorGateway actorGateway2 = null;
                BufferPool unused = BackPressureStatsTrackerITCase.testBufferPool = BackPressureStatsTrackerITCase.networkBufferPool.createBufferPool(1, false);
                final ArrayList arrayList = new ArrayList();
                while (true) {
                    Buffer requestBuffer = BackPressureStatsTrackerITCase.testBufferPool.requestBuffer();
                    if (requestBuffer == null) {
                        try {
                            break;
                        } catch (Throwable th) {
                            TestingUtils.stopActor(actorGateway);
                            TestingUtils.stopActor(actorGateway2);
                            Iterator it = arrayList.iterator();
                            while (it.hasNext()) {
                                ((Buffer) it.next()).recycle();
                            }
                            BackPressureStatsTrackerITCase.testBufferPool.lazyDestroy();
                            throw th;
                        }
                    }
                    arrayList.add(requestBuffer);
                }
                actorGateway = TestingUtils.createJobManager(BackPressureStatsTrackerITCase.testActorSystem, new Configuration());
                Configuration configuration = new Configuration();
                configuration.setInteger("taskmanager.numberOfTaskSlots", 4);
                actorGateway2 = TestingUtils.createTaskManager(BackPressureStatsTrackerITCase.testActorSystem, actorGateway, configuration, true, true);
                new JavaTestKit.Within(finiteDuration) { // from class: org.apache.flink.runtime.webmonitor.BackPressureStatsTrackerITCase.1.1
                    protected void run() {
                        int i;
                        int i2;
                        try {
                            AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), (UUID) null);
                            JobClient.submitJobDetached(actorGateway, jobGraph, finiteDuration, ClassLoader.getSystemClassLoader());
                            actorGateway.tell(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), akkaActorGateway);
                            expectMsgEquals(new TestingJobManagerMessages.AllVerticesRunning(jobGraph.getJobID()));
                            actorGateway.tell(new TestingJobManagerMessages.RequestExecutionGraph(jobGraph.getJobID()), akkaActorGateway);
                            ExecutionJobVertex jobVertex2 = ((TestingJobManagerMessages.ExecutionGraphFound) expectMsgClass(TestingJobManagerMessages.ExecutionGraphFound.class)).executionGraph().getJobVertex(jobVertex.getID());
                            BackPressureStatsTracker backPressureStatsTracker = new BackPressureStatsTracker(new StackTraceSampleCoordinator(BackPressureStatsTrackerITCase.testActorSystem, 60000), 100000, 20, new FiniteDuration(10L, TimeUnit.MILLISECONDS));
                            int i3 = 0;
                            for (int i4 = 0; i4 < 10; i4++) {
                                try {
                                    OperatorBackPressureStats triggerStatsSample = BackPressureStatsTrackerITCase.this.triggerStatsSample(backPressureStatsTracker, jobVertex2);
                                    Assert.assertEquals(i3 + i4, triggerStatsSample.getSampleId());
                                    Assert.assertEquals(4L, triggerStatsSample.getNumberOfSubTasks());
                                    Assert.assertEquals(1.0d, triggerStatsSample.getMaxBackPressureRatio(), 0.0d);
                                    for (int i5 = 0; i5 < 4; i5++) {
                                        Assert.assertEquals(1.0d, triggerStatsSample.getBackPressureRatio(i5), 0.0d);
                                    }
                                    i3 = triggerStatsSample.getSampleId() + 1;
                                    break;
                                } finally {
                                    if (i4 != 10 - 1) {
                                        Thread.sleep(500L);
                                    }
                                }
                            }
                            Iterator it2 = arrayList.iterator();
                            while (it2.hasNext()) {
                                ((Buffer) it2.next()).recycle();
                            }
                            while (BackPressureStatsTrackerITCase.testBufferPool.getNumberOfAvailableMemorySegments() < 100) {
                                Thread.sleep(100L);
                            }
                            int i6 = 0;
                            loop4: while (i4 < 10) {
                                try {
                                    OperatorBackPressureStats triggerStatsSample2 = BackPressureStatsTrackerITCase.this.triggerStatsSample(backPressureStatsTracker, jobVertex2);
                                    Assert.assertEquals(i3 + i4, triggerStatsSample2.getSampleId());
                                    Assert.assertEquals(4L, triggerStatsSample2.getNumberOfSubTasks());
                                    for (int i7 = 0; i7 < 4; i7++) {
                                        Assert.assertEquals(0.0d, triggerStatsSample2.getBackPressureRatio(i7), 0.0d);
                                    }
                                    break loop4;
                                } finally {
                                    if (i == i2) {
                                    }
                                }
                            }
                            actorGateway.tell(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), akkaActorGateway);
                            actorGateway.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
                            expectMsgEquals(true);
                            backPressureStatsTracker.invalidateOperatorStatsCache();
                            Assert.assertFalse("Unexpected trigger", backPressureStatsTracker.triggerStackTraceSample(jobVertex2));
                        } catch (Exception e) {
                            e.printStackTrace();
                            Assert.fail(e.getMessage());
                        }
                    }
                };
                TestingUtils.stopActor(actorGateway);
                TestingUtils.stopActor(actorGateway2);
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    ((Buffer) it2.next()).recycle();
                }
                BackPressureStatsTrackerITCase.testBufferPool.lazyDestroy();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public OperatorBackPressureStats triggerStatsSample(BackPressureStatsTracker backPressureStatsTracker, ExecutionJobVertex executionJobVertex) throws InterruptedException {
        backPressureStatsTracker.invalidateOperatorStatsCache();
        Assert.assertTrue("Failed to trigger", backPressureStatsTracker.triggerStackTraceSample(executionJobVertex));
        Thread.sleep(200L);
        while (true) {
            Option operatorBackPressureStats = backPressureStatsTracker.getOperatorBackPressureStats(executionJobVertex);
            if (!operatorBackPressureStats.isEmpty()) {
                return (OperatorBackPressureStats) operatorBackPressureStats.get();
            }
            Thread.sleep(10L);
        }
    }
}
