package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.class */
public class BackPressureStatsTrackerImplITCase extends TestLogger {
    private static final int BACKPRESSURE_NUM_SAMPLES = 2;
    private static final int JOB_PARALLELISM = 4;
    private NetworkBufferPool networkBufferPool;
    private static BufferPool testBufferPool;
    private TestingMiniCluster testingMiniCluster;
    private DispatcherGateway dispatcherGateway;
    private static final long TIMEOUT_SECONDS = 10;
    private static final Duration TIMEOUT = Duration.ofSeconds(TIMEOUT_SECONDS);
    private static final JobID TEST_JOB_ID = new JobID();
    private static final JobVertex TEST_JOB_VERTEX = new JobVertex("Task");

    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase$BackPressuredTask.class */
    public static class BackPressuredTask extends AbstractInvokable {
        public BackPressuredTask(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            BufferBuilderTestUtils.buildSingleBuffer(BackPressureStatsTrackerImplITCase.testBufferPool.requestBufferBuilderBlocking()).recycleBuffer();
            Thread.currentThread().join();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase$OperatorBackPressureRatioMatcher.class */
    public static class OperatorBackPressureRatioMatcher extends TypeSafeDiagnosingMatcher<OperatorBackPressureStats> {
        private final double expectedBackPressureRatio;

        private OperatorBackPressureRatioMatcher(double d) {
            this.expectedBackPressureRatio = d;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public boolean matchesSafely(OperatorBackPressureStats operatorBackPressureStats, Description description) {
            if (isBackPressureRatioCorrect(operatorBackPressureStats)) {
                return true;
            }
            description.appendText("Not all subtask back pressure ratios in " + getBackPressureRatios(operatorBackPressureStats) + " are " + this.expectedBackPressureRatio);
            return false;
        }

        private static List<Double> getBackPressureRatios(OperatorBackPressureStats operatorBackPressureStats) {
            IntStream range = IntStream.range(0, operatorBackPressureStats.getNumberOfSubTasks());
            operatorBackPressureStats.getClass();
            return (List) range.mapToObj(operatorBackPressureStats::getBackPressureRatio).collect(Collectors.toList());
        }

        private boolean isBackPressureRatioCorrect(OperatorBackPressureStats operatorBackPressureStats) {
            IntStream range = IntStream.range(0, operatorBackPressureStats.getNumberOfSubTasks());
            operatorBackPressureStats.getClass();
            return range.mapToObj(operatorBackPressureStats::getBackPressureRatio).allMatch(d -> {
                return d.doubleValue() == this.expectedBackPressureRatio;
            });
        }

        public void describeTo(Description description) {
            description.appendText("All subtask back pressure ratios are " + this.expectedBackPressureRatio);
        }
    }

    @Before
    public void setUp() throws Exception {
        this.networkBufferPool = new NetworkBufferPool(100, 8192);
        testBufferPool = this.networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
        Configuration configuration = new Configuration();
        configuration.setInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES, BACKPRESSURE_NUM_SAMPLES);
        this.testingMiniCluster = new TestingMiniCluster(new TestingMiniClusterConfiguration.Builder().setNumTaskManagers(4).setConfiguration(configuration).build());
        this.testingMiniCluster.start();
        this.dispatcherGateway = this.testingMiniCluster.getDispatcherGatewayFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
    }

    @After
    public void tearDown() throws Exception {
        if (this.testingMiniCluster != null) {
            this.testingMiniCluster.close();
        }
        if (testBufferPool != null) {
            testBufferPool.lazyDestroy();
        }
        if (this.networkBufferPool != null) {
            this.networkBufferPool.destroyAllBufferPools();
            this.networkBufferPool.destroy();
        }
    }

    @Test
    public void testBackPressureShouldBeReflectedInStats() throws Exception {
        List<Buffer> requestAllBuffers = requestAllBuffers();
        try {
            this.testingMiniCluster.submitJob(createJobWithBackPressure()).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
            OperatorBackPressureStats backPressureStatsForTestVertex = getBackPressureStatsForTestVertex();
            Assert.assertThat(Integer.valueOf(backPressureStatsForTestVertex.getNumberOfSubTasks()), Matchers.is(Matchers.equalTo(4)));
            Assert.assertThat(backPressureStatsForTestVertex, isFullyBackpressured());
            releaseBuffers(requestAllBuffers);
        } catch (Throwable th) {
            releaseBuffers(requestAllBuffers);
            throw th;
        }
    }

    @Test
    public void testAbsenceOfBackPressureShouldBeReflectedInStats() throws Exception {
        this.testingMiniCluster.submitJob(createJobWithoutBackPressure()).get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
        OperatorBackPressureStats backPressureStatsForTestVertex = getBackPressureStatsForTestVertex();
        Assert.assertThat(Integer.valueOf(backPressureStatsForTestVertex.getNumberOfSubTasks()), Matchers.is(Matchers.equalTo(4)));
        Assert.assertThat(backPressureStatsForTestVertex, isNotBackpressured());
    }

    private static JobGraph createJobWithBackPressure() {
        JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job");
        TEST_JOB_VERTEX.setInvokableClass(BackPressuredTask.class);
        TEST_JOB_VERTEX.setParallelism(4);
        jobGraph.addVertex(TEST_JOB_VERTEX);
        return jobGraph;
    }

    private static JobGraph createJobWithoutBackPressure() {
        JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Test Job");
        TEST_JOB_VERTEX.setInvokableClass(BlockingNoOpInvokable.class);
        TEST_JOB_VERTEX.setParallelism(4);
        jobGraph.addVertex(TEST_JOB_VERTEX);
        return jobGraph;
    }

    private static List<Buffer> requestAllBuffers() throws IOException {
        ArrayList arrayList = new ArrayList();
        while (true) {
            Buffer requestBuffer = testBufferPool.requestBuffer();
            if (requestBuffer == null) {
                return arrayList;
            }
            arrayList.add(requestBuffer);
        }
    }

    private static void releaseBuffers(List<Buffer> list) {
        for (Buffer buffer : list) {
            buffer.recycleBuffer();
            Assert.assertTrue(buffer.isRecycled());
        }
    }

    private OperatorBackPressureStats getBackPressureStatsForTestVertex() {
        waitUntilBackPressureStatsAvailable();
        Optional<OperatorBackPressureStats> backPressureStats = getBackPressureStats();
        Preconditions.checkState(backPressureStats.isPresent());
        return backPressureStats.get();
    }

    private void waitUntilBackPressureStatsAvailable() {
        try {
            CommonTestUtils.waitUntilCondition(() -> {
                return Boolean.valueOf(getBackPressureStats().isPresent());
            }, Deadline.fromNow(TIMEOUT));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Optional<OperatorBackPressureStats> getBackPressureStats() {
        try {
            return ((OperatorBackPressureStatsResponse) this.dispatcherGateway.requestOperatorBackPressureStats(TEST_JOB_ID, TEST_JOB_VERTEX.getID()).get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)).getOperatorBackPressureStats();
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    private static Matcher<OperatorBackPressureStats> isNotBackpressured() {
        return new OperatorBackPressureRatioMatcher(0.0d);
    }

    private static Matcher<OperatorBackPressureStats> isFullyBackpressured() {
        return new OperatorBackPressureRatioMatcher(1.0d);
    }
}
