package org.apache.flink.statefun.flink.core.backpressure;

import org.apache.flink.statefun.flink.core.TestUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValveTest.class */
public class ThresholdBackPressureValveTest {
    @Test
    public void simpleUsage() {
        ThresholdBackPressureValve thresholdBackPressureValve = new ThresholdBackPressureValve(2);
        thresholdBackPressureValve.notifyAsyncOperationRegistered();
        thresholdBackPressureValve.notifyAsyncOperationRegistered();
        Assert.assertTrue(thresholdBackPressureValve.shouldBackPressure());
    }

    @Test
    public void completedOperationReleaseBackpressure() {
        ThresholdBackPressureValve thresholdBackPressureValve = new ThresholdBackPressureValve(1);
        thresholdBackPressureValve.notifyAsyncOperationRegistered();
        thresholdBackPressureValve.notifyAsyncOperationCompleted(TestUtils.FUNCTION_1_ADDR);
        Assert.assertFalse(thresholdBackPressureValve.shouldBackPressure());
    }

    @Test
    public void blockAddressTriggerBackpressure() {
        ThresholdBackPressureValve thresholdBackPressureValve = new ThresholdBackPressureValve(500);
        thresholdBackPressureValve.blockAddress(TestUtils.FUNCTION_1_ADDR);
        Assert.assertTrue(thresholdBackPressureValve.shouldBackPressure());
    }

    @Test
    public void blockingAndUnblockingAddress() {
        ThresholdBackPressureValve thresholdBackPressureValve = new ThresholdBackPressureValve(500);
        thresholdBackPressureValve.blockAddress(TestUtils.FUNCTION_1_ADDR);
        thresholdBackPressureValve.notifyAsyncOperationCompleted(TestUtils.FUNCTION_1_ADDR);
        Assert.assertFalse(thresholdBackPressureValve.shouldBackPressure());
    }

    @Test
    public void unblockingDifferentAddressStillBackpressures() {
        ThresholdBackPressureValve thresholdBackPressureValve = new ThresholdBackPressureValve(500);
        thresholdBackPressureValve.blockAddress(TestUtils.FUNCTION_1_ADDR);
        thresholdBackPressureValve.notifyAsyncOperationCompleted(TestUtils.FUNCTION_2_ADDR);
        Assert.assertTrue(thresholdBackPressureValve.shouldBackPressure());
    }

    @Test
    public void blockTwoAddress() {
        ThresholdBackPressureValve thresholdBackPressureValve = new ThresholdBackPressureValve(500);
        thresholdBackPressureValve.blockAddress(TestUtils.FUNCTION_1_ADDR);
        thresholdBackPressureValve.blockAddress(TestUtils.FUNCTION_2_ADDR);
        Assert.assertTrue(thresholdBackPressureValve.shouldBackPressure());
        thresholdBackPressureValve.notifyAsyncOperationCompleted(TestUtils.FUNCTION_1_ADDR);
        Assert.assertTrue(thresholdBackPressureValve.shouldBackPressure());
        thresholdBackPressureValve.notifyAsyncOperationCompleted(TestUtils.FUNCTION_2_ADDR);
        Assert.assertFalse(thresholdBackPressureValve.shouldBackPressure());
    }
}
