package org.apache.flink.runtime.concurrent;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtilsTest.class */
public class FutureUtilsTest extends TestLogger {
    @Test
    public void testRetrySuccess() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Assert.assertTrue(((Boolean) FutureUtils.retry(() -> {
            return CompletableFuture.supplyAsync(() -> {
                if (atomicInteger.incrementAndGet() == 10) {
                    return true;
                }
                throw new CompletionException((Throwable) new FlinkException("Test exception"));
            }, TestingUtils.defaultExecutor());
        }, 10, TestingUtils.defaultExecutor()).get()).booleanValue());
        Assert.assertTrue(10 == atomicInteger.get());
    }

    @Test(expected = FutureUtils.RetryException.class)
    public void testRetryFailure() throws Throwable {
        try {
            FutureUtils.retry(() -> {
                return FutureUtils.completedExceptionally(new FlinkException("Test exception"));
            }, 3, TestingUtils.defaultExecutor()).get();
        } catch (ExecutionException e) {
            throw ExceptionUtils.stripExecutionException(e);
        }
    }

    @Test
    public void testRetryCancellation() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        AtomicReference atomicReference = new AtomicReference(null);
        CompletableFuture retry = FutureUtils.retry(() -> {
            return CompletableFuture.supplyAsync(() -> {
                if (atomicInteger.incrementAndGet() == 2) {
                    oneShotLatch.trigger();
                    try {
                        oneShotLatch2.await();
                    } catch (InterruptedException e) {
                        atomicReference.compareAndSet(null, e);
                    }
                }
                throw new CompletionException((Throwable) new FlinkException("Test exception"));
            }, TestingUtils.defaultExecutor());
        }, 10, TestingUtils.defaultExecutor());
        oneShotLatch.await();
        Assert.assertFalse(retry.isDone());
        retry.cancel(false);
        oneShotLatch2.trigger();
        Assert.assertTrue(retry.isCancelled());
        Assert.assertEquals(2L, atomicInteger.get());
        if (atomicReference.get() != null) {
            throw new FlinkException("Exception occurred in the retry operation.", (Throwable) atomicReference.get());
        }
    }

    @Test(expected = FutureUtils.RetryException.class)
    public void testRetryWithDelayFailure() throws Throwable {
        try {
            FutureUtils.retryWithDelay(() -> {
                return FutureUtils.completedExceptionally(new FlinkException("Test exception"));
            }, 3, Time.milliseconds(1L), TestingUtils.defaultScheduledExecutor()).get(TestingUtils.TIMEOUT().toMilliseconds(), TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            throw ExceptionUtils.stripExecutionException(e);
        }
    }

    @Test
    public void testRetryWithDelay() throws Exception {
        Time milliseconds = Time.milliseconds(5L);
        AtomicInteger atomicInteger = new AtomicInteger(4);
        long currentTimeMillis = System.currentTimeMillis();
        Boolean bool = (Boolean) FutureUtils.retryWithDelay(() -> {
            return atomicInteger.getAndDecrement() == 0 ? CompletableFuture.completedFuture(true) : FutureUtils.completedExceptionally(new FlinkException("Test exception."));
        }, 4, milliseconds, TestingUtils.defaultScheduledExecutor()).get();
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        Assert.assertTrue(bool.booleanValue());
        Assert.assertTrue("The completion time should be at least rertries times delay between retries.", currentTimeMillis2 >= 4 * milliseconds.toMilliseconds());
    }

    @Test
    public void testRetryWithDelayCancellation() {
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        CompletableFuture retryWithDelay = FutureUtils.retryWithDelay(() -> {
            return FutureUtils.completedExceptionally(new FlinkException("Test exception"));
        }, 1, TestingUtils.infiniteTime(), manuallyTriggeredScheduledExecutor);
        Assert.assertFalse(retryWithDelay.isDone());
        Collection<ScheduledFuture<?>> scheduledTasks = manuallyTriggeredScheduledExecutor.getScheduledTasks();
        Assert.assertFalse(scheduledTasks.isEmpty());
        ScheduledFuture<?> next = scheduledTasks.iterator().next();
        Assert.assertFalse(next.isDone());
        retryWithDelay.cancel(false);
        Assert.assertTrue(retryWithDelay.isCancelled());
        Assert.assertTrue(next.isCancelled());
    }

    @Test
    public void testOrTimeout() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        FutureUtils.orTimeout(completableFuture, 10L, TimeUnit.MILLISECONDS);
        try {
            completableFuture.get();
        } catch (ExecutionException e) {
            Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
        }
    }

    @Test
    public void testRetryWithDelayAndPredicate() throws Exception {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            try {
                FutureUtils.retryWithDelay(new Supplier<CompletableFuture<String>>() { // from class: org.apache.flink.runtime.concurrent.FutureUtilsTest.1TestStringSupplier
                    private final AtomicInteger counter = new AtomicInteger();

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.function.Supplier
                    public CompletableFuture<String> get() {
                        return this.counter.getAndIncrement() == 0 ? FutureUtils.completedExceptionally(new RuntimeException("first exception")) : FutureUtils.completedExceptionally(new RuntimeException("should propagate"));
                    }
                }, 1, Time.seconds(0L), th -> {
                    return (th instanceof RuntimeException) && th.getMessage().contains("first exception");
                }, new ScheduledExecutorServiceAdapter(newSingleThreadScheduledExecutor)).get();
                newSingleThreadScheduledExecutor.shutdownNow();
            } catch (ExecutionException e) {
                Assert.assertThat(e.getMessage(), CoreMatchers.containsString("Could not complete the operation"));
                newSingleThreadScheduledExecutor.shutdownNow();
            }
        } catch (Throwable th2) {
            newSingleThreadScheduledExecutor.shutdownNow();
            throw th2;
        }
    }

    @Test
    public void testRunAfterwards() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        oneShotLatch.getClass();
        CompletableFuture runAfterwards = FutureUtils.runAfterwards(completableFuture, oneShotLatch::trigger);
        Assert.assertThat(Boolean.valueOf(oneShotLatch.isTriggered()), Matchers.is(false));
        Assert.assertThat(Boolean.valueOf(runAfterwards.isDone()), Matchers.is(false));
        completableFuture.complete(null);
        Assert.assertThat(Boolean.valueOf(oneShotLatch.isTriggered()), Matchers.is(true));
        Assert.assertThat(Boolean.valueOf(runAfterwards.isDone()), Matchers.is(true));
        runAfterwards.get();
    }

    @Test
    public void testRunAfterwardsExceptional() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        FlinkException flinkException = new FlinkException("Test exception");
        oneShotLatch.getClass();
        CompletableFuture runAfterwards = FutureUtils.runAfterwards(completableFuture, oneShotLatch::trigger);
        Assert.assertThat(Boolean.valueOf(oneShotLatch.isTriggered()), Matchers.is(false));
        Assert.assertThat(Boolean.valueOf(runAfterwards.isDone()), Matchers.is(false));
        completableFuture.completeExceptionally(flinkException);
        Assert.assertThat(Boolean.valueOf(oneShotLatch.isTriggered()), Matchers.is(true));
        Assert.assertThat(Boolean.valueOf(runAfterwards.isDone()), Matchers.is(true));
        try {
            runAfterwards.get();
            Assert.fail("Expected an exceptional completion");
        } catch (ExecutionException e) {
            Assert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.is(flinkException));
        }
    }

    @Test
    public void testComposeAfterwards() throws ExecutionException, InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        CompletableFuture composeAfterwards = FutureUtils.composeAfterwards(completableFuture, () -> {
            oneShotLatch.trigger();
            return CompletableFuture.completedFuture(null);
        });
        Assert.assertThat(Boolean.valueOf(oneShotLatch.isTriggered()), Matchers.is(false));
        Assert.assertThat(Boolean.valueOf(composeAfterwards.isDone()), Matchers.is(false));
        completableFuture.complete(null);
        Assert.assertThat(Boolean.valueOf(oneShotLatch.isTriggered()), Matchers.is(true));
        Assert.assertThat(Boolean.valueOf(composeAfterwards.isDone()), Matchers.is(true));
        composeAfterwards.get();
    }

    @Test
    public void testComposeAfterwardsFirstExceptional() throws InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        FlinkException flinkException = new FlinkException("Test exception");
        CompletableFuture composeAfterwards = FutureUtils.composeAfterwards(completableFuture, () -> {
            oneShotLatch.trigger();
            return CompletableFuture.completedFuture(null);
        });
        Assert.assertThat(Boolean.valueOf(oneShotLatch.isTriggered()), Matchers.is(false));
        Assert.assertThat(Boolean.valueOf(composeAfterwards.isDone()), Matchers.is(false));
        completableFuture.completeExceptionally(flinkException);
        Assert.assertThat(Boolean.valueOf(oneShotLatch.isTriggered()), Matchers.is(true));
        Assert.assertThat(Boolean.valueOf(composeAfterwards.isDone()), Matchers.is(true));
        try {
            composeAfterwards.get();
            Assert.fail("Expected an exceptional completion");
        } catch (ExecutionException e) {
            Assert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.is(flinkException));
        }
    }

    @Test
    public void testComposeAfterwardsSecondExceptional() throws InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        FlinkException flinkException = new FlinkException("Test exception");
        CompletableFuture composeAfterwards = FutureUtils.composeAfterwards(completableFuture, () -> {
            oneShotLatch.trigger();
            return FutureUtils.completedExceptionally(flinkException);
        });
        Assert.assertThat(Boolean.valueOf(oneShotLatch.isTriggered()), Matchers.is(false));
        Assert.assertThat(Boolean.valueOf(composeAfterwards.isDone()), Matchers.is(false));
        completableFuture.complete(null);
        Assert.assertThat(Boolean.valueOf(oneShotLatch.isTriggered()), Matchers.is(true));
        Assert.assertThat(Boolean.valueOf(composeAfterwards.isDone()), Matchers.is(true));
        try {
            composeAfterwards.get();
            Assert.fail("Expected an exceptional completion");
        } catch (ExecutionException e) {
            Assert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.is(flinkException));
        }
    }

    @Test
    public void testComposeAfterwardsBothExceptional() throws InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        FlinkException flinkException = new FlinkException("Test exception1");
        FlinkException flinkException2 = new FlinkException("Test exception2");
        OneShotLatch oneShotLatch = new OneShotLatch();
        CompletableFuture composeAfterwards = FutureUtils.composeAfterwards(completableFuture, () -> {
            oneShotLatch.trigger();
            return FutureUtils.completedExceptionally(flinkException2);
        });
        Assert.assertThat(Boolean.valueOf(oneShotLatch.isTriggered()), Matchers.is(false));
        Assert.assertThat(Boolean.valueOf(composeAfterwards.isDone()), Matchers.is(false));
        completableFuture.completeExceptionally(flinkException);
        Assert.assertThat(Boolean.valueOf(oneShotLatch.isTriggered()), Matchers.is(true));
        Assert.assertThat(Boolean.valueOf(composeAfterwards.isDone()), Matchers.is(true));
        try {
            composeAfterwards.get();
            Assert.fail("Expected an exceptional completion");
        } catch (ExecutionException e) {
            Throwable stripExecutionException = ExceptionUtils.stripExecutionException(e);
            Assert.assertThat(stripExecutionException, Matchers.is(flinkException));
            Assert.assertThat(stripExecutionException.getSuppressed(), Matchers.arrayWithSize(1));
            Assert.assertThat(stripExecutionException.getSuppressed()[0], Matchers.is(flinkException2));
        }
    }

    @Test
    public void testCompleteAll() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        List asList = Arrays.asList(completableFuture, completableFuture2);
        FutureUtils.ConjunctFuture completeAll = FutureUtils.completeAll(asList);
        Assert.assertThat(Boolean.valueOf(completeAll.isDone()), Matchers.is(false));
        Assert.assertThat(Integer.valueOf(completeAll.getNumFuturesCompleted()), Matchers.is(0));
        Assert.assertThat(Integer.valueOf(completeAll.getNumFuturesTotal()), Matchers.is(Integer.valueOf(asList.size())));
        completableFuture2.complete(42);
        Assert.assertThat(Boolean.valueOf(completeAll.isDone()), Matchers.is(false));
        Assert.assertThat(Integer.valueOf(completeAll.getNumFuturesCompleted()), Matchers.is(1));
        completableFuture.complete("foobar");
        Assert.assertThat(Boolean.valueOf(completeAll.isDone()), Matchers.is(true));
        Assert.assertThat(Integer.valueOf(completeAll.getNumFuturesCompleted()), Matchers.is(2));
        completeAll.get();
    }

    @Test
    public void testCompleteAllPartialExceptional() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        List asList = Arrays.asList(completableFuture, completableFuture2);
        FutureUtils.ConjunctFuture completeAll = FutureUtils.completeAll(asList);
        Assert.assertThat(Boolean.valueOf(completeAll.isDone()), Matchers.is(false));
        Assert.assertThat(Integer.valueOf(completeAll.getNumFuturesCompleted()), Matchers.is(0));
        Assert.assertThat(Integer.valueOf(completeAll.getNumFuturesTotal()), Matchers.is(Integer.valueOf(asList.size())));
        FlinkException flinkException = new FlinkException("Test exception 1");
        completableFuture2.completeExceptionally(flinkException);
        Assert.assertThat(Boolean.valueOf(completeAll.isDone()), Matchers.is(false));
        Assert.assertThat(Integer.valueOf(completeAll.getNumFuturesCompleted()), Matchers.is(1));
        completableFuture.complete("foobar");
        Assert.assertThat(Boolean.valueOf(completeAll.isDone()), Matchers.is(true));
        Assert.assertThat(Integer.valueOf(completeAll.getNumFuturesCompleted()), Matchers.is(2));
        try {
            completeAll.get();
            Assert.fail("Expected an exceptional completion");
        } catch (ExecutionException e) {
            Assert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.is(flinkException));
        }
    }

    @Test
    public void testCompleteAllExceptional() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        List asList = Arrays.asList(completableFuture, completableFuture2);
        FutureUtils.ConjunctFuture completeAll = FutureUtils.completeAll(asList);
        Assert.assertThat(Boolean.valueOf(completeAll.isDone()), Matchers.is(false));
        Assert.assertThat(Integer.valueOf(completeAll.getNumFuturesCompleted()), Matchers.is(0));
        Assert.assertThat(Integer.valueOf(completeAll.getNumFuturesTotal()), Matchers.is(Integer.valueOf(asList.size())));
        Throwable flinkException = new FlinkException("Test exception 1");
        completableFuture.completeExceptionally(flinkException);
        Assert.assertThat(Boolean.valueOf(completeAll.isDone()), Matchers.is(false));
        Assert.assertThat(Integer.valueOf(completeAll.getNumFuturesCompleted()), Matchers.is(1));
        Throwable flinkException2 = new FlinkException("Test exception 2");
        completableFuture2.completeExceptionally(flinkException2);
        Assert.assertThat(Boolean.valueOf(completeAll.isDone()), Matchers.is(true));
        Assert.assertThat(Integer.valueOf(completeAll.getNumFuturesCompleted()), Matchers.is(2));
        try {
            completeAll.get();
            Assert.fail("Expected an exceptional completion");
        } catch (ExecutionException e) {
            Throwable stripExecutionException = ExceptionUtils.stripExecutionException(e);
            Throwable[] suppressed = stripExecutionException.getSuppressed();
            Throwable th = stripExecutionException.equals(flinkException) ? flinkException2 : flinkException;
            Assert.assertThat(suppressed, Matchers.is(Matchers.not(Matchers.emptyArray())));
            Assert.assertThat(suppressed, Matchers.arrayContaining(new Throwable[]{th}));
        }
    }

    @Test
    public void testCancelWaitingConjunctFuture() {
        cancelConjunctFuture(collection -> {
            return FutureUtils.waitForAll(collection);
        });
    }

    @Test
    public void testCancelResultConjunctFuture() {
        cancelConjunctFuture(collection -> {
            return FutureUtils.combineAll(collection);
        });
    }

    @Test
    public void testCancelCompleteConjunctFuture() {
        cancelConjunctFuture(collection -> {
            return FutureUtils.completeAll(collection);
        });
    }

    private void cancelConjunctFuture(Function<Collection<? extends CompletableFuture<?>>, FutureUtils.ConjunctFuture<?>> function) {
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i++) {
            arrayList.add(new CompletableFuture());
        }
        function.apply(arrayList).cancel(false);
        Iterator<? extends CompletableFuture<?>> it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertThat(Boolean.valueOf(it.next().isCancelled()), Matchers.is(true));
        }
    }

    @Test
    public void testSupplyAsyncFailure() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception");
        try {
            FutureUtils.supplyAsync(() -> {
                throw flinkException;
            }, TestingUtils.defaultExecutor()).get();
            Assert.fail("Expected an exception.");
        } catch (ExecutionException e) {
            Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowableWithMessage(e, "Test exception").isPresent()), Matchers.is(true));
        }
    }

    @Test
    public void testSupplyAsync() throws Exception {
        Assert.assertThat(FutureUtils.supplyAsync(Acknowledge::get, TestingUtils.defaultExecutor()).get(), Matchers.is(Acknowledge.get()));
    }
}
