package org.apache.flink.runtime.util;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.flink.runtime.util.KeyedBudgetManager;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/util/KeyedBudgetManagerTest.class */
public class KeyedBudgetManagerTest extends TestLogger {
    private static final String[] TEST_KEYS = {"k1", "k2", "k3", "k4"};
    private static final long[] TEST_BUDGETS = {15, 17, 22, 11};
    private static final Executor NEW_THREAD_EXECUTOR = runnable -> {
        new Thread(runnable).start();
    };
    private KeyedBudgetManager<String> keyedBudgetManager;

    @Before
    public void setup() {
        this.keyedBudgetManager = createSimpleKeyedBudget();
    }

    @After
    public void teardown() {
        this.keyedBudgetManager.releaseAll();
        checkNoKeyBudgetChange();
    }

    @Test
    public void testSuccessfulAcquisitionForKey() {
        Assert.assertThat(Long.valueOf(this.keyedBudgetManager.acquireBudgetForKey("k1", 10L)), CoreMatchers.is(10L));
        checkOneKeyBudgetChange("k1", 5L);
    }

    @Test
    public void testFailedAcquisitionForKey() {
        Assert.assertThat(Long.valueOf(this.keyedBudgetManager.acquireBudgetForKey("k1", 20L)), CoreMatchers.is(15L));
        checkNoKeyBudgetChange();
    }

    @Test
    public void testSuccessfulReleaseForKey() {
        this.keyedBudgetManager.acquireBudgetForKey("k1", 10L);
        this.keyedBudgetManager.releaseBudgetForKey("k1", 5L);
        checkOneKeyBudgetChange("k1", 10L);
    }

    @Test
    public void testFailedReleaseForKey() {
        this.keyedBudgetManager.acquireBudgetForKey("k1", 10L);
        try {
            this.keyedBudgetManager.releaseBudgetForKey("k1", 15L);
            Assert.fail("IllegalStateException is expected to fail over-sized release");
        } catch (IllegalStateException e) {
        }
        checkOneKeyBudgetChange("k1", 5L);
    }

    @Test
    public void testSuccessfulAcquisitionForKeys() {
        Assert.assertThat(Boolean.valueOf(checkAcquisitionSuccess(acquireForMultipleKeys(5L), 4L)), CoreMatchers.is(true));
        Assert.assertThat(Long.valueOf(this.keyedBudgetManager.availableBudgetForKey("k1")), CoreMatchers.is(15L));
        Assert.assertThat(Long.valueOf(this.keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3"))), CoreMatchers.is(19L));
        Assert.assertThat(Long.valueOf(this.keyedBudgetManager.totalAvailableBudget()), CoreMatchers.is(45L));
    }

    @Test
    public void testConcurrentAcquisitionForKeys() throws ExecutionException, InterruptedException {
        CompletableFuture<KeyedBudgetManager.AcquisitionResult<String>> acquireForMultipleKeysAsync = acquireForMultipleKeysAsync(5L);
        CompletableFuture<Long> availableBudgetForKeysAsync = getAvailableBudgetForKeysAsync();
        CompletableFuture<KeyedBudgetManager.AcquisitionResult<String>> acquireForMultipleKeysAsync2 = acquireForMultipleKeysAsync(5L);
        Arrays.asList(acquireForMultipleKeysAsync, acquireForMultipleKeysAsync2, availableBudgetForKeysAsync).forEach((v0) -> {
            waitForFutureSilently(v0);
        });
        Assert.assertThat(Boolean.valueOf(checkFirstAcquisitionSucceeded(acquireForMultipleKeysAsync, acquireForMultipleKeysAsync2) || checkFirstAcquisitionSucceeded(acquireForMultipleKeysAsync2, acquireForMultipleKeysAsync)), CoreMatchers.is(true));
        long longValue = availableBudgetForKeysAsync.get().longValue();
        Assert.assertThat(Boolean.valueOf(longValue == 39 || longValue == 19), CoreMatchers.is(true));
    }

    @Test
    public void testConcurrentReleaseForKeys() throws ExecutionException, InterruptedException {
        long j = 5;
        Map<String, Long> map = (Map) acquireForMultipleKeys(5L).getAcquiredPerKey().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Long.valueOf(((Long) entry.getValue()).longValue() * j);
        }));
        CompletableFuture<Void> releaseKeysAsync = releaseKeysAsync(map);
        CompletableFuture<Long> availableBudgetForKeysAsync = getAvailableBudgetForKeysAsync();
        CompletableFuture<Void> releaseKeysAsync2 = releaseKeysAsync(map);
        Arrays.asList(releaseKeysAsync, availableBudgetForKeysAsync, releaseKeysAsync2).forEach((v0) -> {
            waitForFutureSilently(v0);
        });
        Assert.assertThat(Boolean.valueOf((!releaseKeysAsync.isCompletedExceptionally() && releaseKeysAsync2.isCompletedExceptionally()) || (!releaseKeysAsync2.isCompletedExceptionally() && releaseKeysAsync.isCompletedExceptionally())), CoreMatchers.is(true));
        long longValue = availableBudgetForKeysAsync.get().longValue();
        Assert.assertThat(Boolean.valueOf(longValue == 39 || longValue == 19), CoreMatchers.is(true));
        checkNoKeyBudgetChange();
    }

    @Test
    public void testFailedAcquisitionForKeys() {
        KeyedBudgetManager.AcquisitionResult acquirePagedBudgetForKeys = this.keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 6L, 6L);
        Assert.assertThat(Boolean.valueOf(acquirePagedBudgetForKeys.isFailure()), CoreMatchers.is(true));
        Assert.assertThat(Long.valueOf(acquirePagedBudgetForKeys.getTotalAvailableForAllQueriedKeys()), CoreMatchers.is(5L));
        checkNoKeyBudgetChange();
    }

    @Test
    public void testSuccessfulReleaseForKeys() {
        this.keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4L, 8L);
        this.keyedBudgetManager.releaseBudgetForKeys(createdBudgetMap(new String[]{"k2", "k3"}, new long[]{7, 10}));
        Assert.assertThat(Long.valueOf(this.keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3"))), CoreMatchers.is(24L));
        Assert.assertThat(Long.valueOf(this.keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k1", "k4"))), CoreMatchers.is(26L));
        Assert.assertThat(Long.valueOf(this.keyedBudgetManager.totalAvailableBudget()), CoreMatchers.is(50L));
    }

    @Test
    public void testSuccessfulReleaseForKeysWithMixedRequests() {
        this.keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4L, 8L);
        this.keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k1", "k4"), 6L, 3L);
        this.keyedBudgetManager.releaseBudgetForKeys(createdBudgetMap(new String[]{"k2", "k3"}, new long[]{7, 10}));
        Assert.assertThat(Long.valueOf(this.keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3"))), CoreMatchers.is(24L));
        Assert.assertThat(Long.valueOf(this.keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k1", "k4"))), CoreMatchers.is(8L));
        Assert.assertThat(Long.valueOf(this.keyedBudgetManager.totalAvailableBudget()), CoreMatchers.is(32L));
    }

    private void checkNoKeyBudgetChange() {
        checkKeysBudgetChange(Collections.emptyMap());
    }

    private void checkOneKeyBudgetChange(String str, long j) {
        checkKeysBudgetChange(Collections.singletonMap(str, Long.valueOf(j)));
    }

    private void checkKeysBudgetChange(Map<String, Long> map) {
        long j = 0;
        for (int i = 0; i < TEST_KEYS.length; i++) {
            long longValue = map.containsKey(TEST_KEYS[i]) ? map.get(TEST_KEYS[i]).longValue() : TEST_BUDGETS[i];
            Assert.assertThat(Long.valueOf(this.keyedBudgetManager.availableBudgetForKey(TEST_KEYS[i])), CoreMatchers.is(Long.valueOf(longValue)));
            j += longValue;
        }
        Assert.assertThat(Long.valueOf(this.keyedBudgetManager.maxTotalBudget()), CoreMatchers.is(Long.valueOf(LongStream.of(TEST_BUDGETS).sum())));
        Assert.assertThat(Long.valueOf(this.keyedBudgetManager.totalAvailableBudget()), CoreMatchers.is(Long.valueOf(j)));
    }

    private CompletableFuture<KeyedBudgetManager.AcquisitionResult<String>> acquireForMultipleKeysAsync(long j) {
        return CompletableFuture.supplyAsync(() -> {
            return acquireForMultipleKeys(j);
        }, NEW_THREAD_EXECUTOR);
    }

    private CompletableFuture<Long> getAvailableBudgetForKeysAsync() {
        return CompletableFuture.supplyAsync(() -> {
            return Long.valueOf(this.keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3")));
        }, NEW_THREAD_EXECUTOR);
    }

    private KeyedBudgetManager.AcquisitionResult<String> acquireForMultipleKeys(long j) {
        return this.keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4L, j);
    }

    private CompletableFuture<Void> releaseKeysAsync(Map<String, Long> map) {
        return CompletableFuture.runAsync(() -> {
            this.keyedBudgetManager.releaseBudgetForKeys(map);
        }, NEW_THREAD_EXECUTOR);
    }

    private static boolean checkFirstAcquisitionSucceeded(Future<KeyedBudgetManager.AcquisitionResult<String>> future, Future<KeyedBudgetManager.AcquisitionResult<String>> future2) throws ExecutionException, InterruptedException {
        return checkAcquisitionSuccess(future.get(), 4L) && future2.get().isFailure();
    }

    private static boolean checkAcquisitionSuccess(KeyedBudgetManager.AcquisitionResult<String> acquisitionResult, long j) {
        return acquisitionResult.isSuccess() && acquisitionResult.getAcquiredPerKey().values().stream().mapToLong(l -> {
            return l.longValue();
        }).sum() == j;
    }

    private static KeyedBudgetManager<String> createSimpleKeyedBudget() {
        return new KeyedBudgetManager<>(createdBudgetMap(TEST_KEYS, TEST_BUDGETS), 1L);
    }

    private static Map<String, Long> createdBudgetMap(String[] strArr, long[] jArr) {
        Preconditions.checkArgument(strArr.length == jArr.length);
        HashMap hashMap = new HashMap();
        for (int i = 0; i < strArr.length; i++) {
            hashMap.put(strArr[i], Long.valueOf(jArr[i]));
        }
        return hashMap;
    }

    private static void waitForFutureSilently(Future<?> future) {
        try {
            future.get();
        } catch (InterruptedException | ExecutionException e) {
        }
    }
}
