package org.apache.flink.runtime.rest.handler.legacy.backpressure;

import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertexTest;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.class */
public class BackPressureStatsTrackerImplTest extends TestLogger {
    private static final int requestId = 0;
    private static final int cleanUpInterval = 60000;
    private static final int backPressureStatsRefreshInterval = 60000;
    private static final long timeGap = 60000;

    @Rule
    public Timeout caseTimeout = new Timeout(10, TimeUnit.SECONDS);
    private static final ExecutionJobVertex executionJobVertex = createExecutionJobVertex();
    private static final ExecutionVertex[] taskVertices = executionJobVertex.getTaskVertices();
    private static final double backPressureRatio = 0.1d;
    private static final BackPressureStats backPressureStats = createBackPressureStats(0, 1, backPressureRatio);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest$TestingBackPressureRequestCoordinator.class */
    public static class TestingBackPressureRequestCoordinator extends BackPressureRequestCoordinator {
        private final BackPressureStats[] backPressureStats;
        private int counter;

        TestingBackPressureRequestCoordinator(Executor executor, long j, BackPressureStats... backPressureStatsArr) {
            super(executor, j);
            this.counter = 0;
            this.backPressureStats = backPressureStatsArr;
        }

        CompletableFuture<BackPressureStats> triggerBackPressureRequest(ExecutionVertex[] executionVertexArr) {
            BackPressureStats[] backPressureStatsArr = this.backPressureStats;
            int i = this.counter;
            this.counter = i + 1;
            return CompletableFuture.completedFuture(backPressureStatsArr[i % this.backPressureStats.length]);
        }
    }

    @Test
    public void testGetOperatorBackPressureStats() throws Exception {
        doInitialRequestAndVerifyResult(createBackPressureTracker());
    }

    @Test
    public void testCachedStatsNotUpdatedWithinRefreshInterval() throws Exception {
        BackPressureStatsTracker createBackPressureTracker = createBackPressureTracker(60000, 60000, backPressureStats, createBackPressureStats(1, timeGap, 0.2d));
        doInitialRequestAndVerifyResult(createBackPressureTracker);
        checkOperatorBackPressureStats(createBackPressureTracker.getOperatorBackPressureStats(executionJobVertex));
    }

    @Test
    public void testCachedStatsUpdatedAfterRefreshInterval() throws Exception {
        BackPressureStats createBackPressureStats = createBackPressureStats(1, timeGap, 0.2d);
        BackPressureStatsTracker createBackPressureTracker = createBackPressureTracker(60000, 10, backPressureStats, createBackPressureStats);
        doInitialRequestAndVerifyResult(createBackPressureTracker);
        Thread.sleep(20L);
        Assert.assertTrue(createBackPressureTracker.getOperatorBackPressureStats(executionJobVertex).isPresent());
        checkOperatorBackPressureStats(0.2d, createBackPressureStats, createBackPressureTracker.getOperatorBackPressureStats(executionJobVertex));
    }

    @Test
    public void testShutDown() throws Exception {
        BackPressureStatsTracker createBackPressureTracker = createBackPressureTracker();
        doInitialRequestAndVerifyResult(createBackPressureTracker);
        createBackPressureTracker.shutDown();
        Assert.assertFalse(createBackPressureTracker.getOperatorBackPressureStats(executionJobVertex).isPresent());
        Assert.assertFalse(createBackPressureTracker.getOperatorBackPressureStats(executionJobVertex).isPresent());
    }

    @Test
    public void testCachedStatsNotCleanedWithinCleanupInterval() throws Exception {
        BackPressureStatsTracker createBackPressureTracker = createBackPressureTracker();
        doInitialRequestAndVerifyResult(createBackPressureTracker);
        createBackPressureTracker.cleanUpOperatorStatsCache();
        checkOperatorBackPressureStats(createBackPressureTracker.getOperatorBackPressureStats(executionJobVertex));
    }

    @Test
    public void testCachedStatsCleanedAfterCleanupInterval() throws Exception {
        BackPressureStatsTracker createBackPressureTracker = createBackPressureTracker(10, 60000, backPressureStats);
        doInitialRequestAndVerifyResult(createBackPressureTracker);
        Thread.sleep(20L);
        createBackPressureTracker.cleanUpOperatorStatsCache();
        Assert.assertFalse(createBackPressureTracker.getOperatorBackPressureStats(executionJobVertex).isPresent());
    }

    private void doInitialRequestAndVerifyResult(BackPressureStatsTracker backPressureStatsTracker) {
        Assert.assertFalse(backPressureStatsTracker.getOperatorBackPressureStats(executionJobVertex).isPresent());
        checkOperatorBackPressureStats(backPressureStatsTracker.getOperatorBackPressureStats(executionJobVertex));
    }

    private void checkOperatorBackPressureStats(Optional<OperatorBackPressureStats> optional) {
        checkOperatorBackPressureStats(backPressureRatio, backPressureStats, optional);
    }

    private void checkOperatorBackPressureStats(double d, BackPressureStats backPressureStats2, Optional<OperatorBackPressureStats> optional) {
        Assert.assertTrue(optional.isPresent());
        OperatorBackPressureStats operatorBackPressureStats = optional.get();
        Assert.assertEquals(backPressureStats2.getRequestId(), operatorBackPressureStats.getRequestId());
        Assert.assertEquals(backPressureStats2.getEndTime(), operatorBackPressureStats.getEndTimestamp());
        Assert.assertEquals(taskVertices.length, operatorBackPressureStats.getNumberOfSubTasks());
        for (int i = 0; i < operatorBackPressureStats.getNumberOfSubTasks(); i++) {
            Assert.assertEquals(d, operatorBackPressureStats.getBackPressureRatio(i), 0.0d);
        }
    }

    private BackPressureStatsTracker createBackPressureTracker() {
        return createBackPressureTracker(60000, 60000, backPressureStats);
    }

    private BackPressureStatsTracker createBackPressureTracker(int i, int i2, BackPressureStats... backPressureStatsArr) {
        return new BackPressureStatsTrackerImpl(new TestingBackPressureRequestCoordinator((v0) -> {
            v0.run();
        }, 10000L, backPressureStatsArr), i, i2);
    }

    private static BackPressureStats createBackPressureStats(int i, long j, double d) {
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = currentTimeMillis + j;
        HashMap hashMap = new HashMap();
        for (ExecutionVertex executionVertex : taskVertices) {
            hashMap.put(executionVertex.getCurrentExecutionAttempt().getAttemptId(), Double.valueOf(d));
        }
        return new BackPressureStats(i, currentTimeMillis, j2, hashMap);
    }

    private static ExecutionJobVertex createExecutionJobVertex() {
        try {
            return ExecutionJobVertexTest.createExecutionJobVertex(4, 4);
        } catch (Exception e) {
            throw new RuntimeException("Failed to create ExecutionJobVertex.");
        }
    }
}
