package org.apache.flink.util.concurrent;

import java.lang.Thread;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/util/concurrent/FutureUtilsTest.class */
class FutureUtilsTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    /* loaded from: input_file:org/apache/flink/util/concurrent/FutureUtilsTest$TestingUncaughtExceptionHandler.class */
    private static class TestingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
        private Throwable exception;

        private TestingUncaughtExceptionHandler() {
            this.exception = null;
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            this.exception = th;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean hasBeenCalled() {
            return this.exception != null;
        }
    }

    FutureUtilsTest() {
    }

    @Test
    void testRetrySuccess() {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        FlinkAssertions.assertThatFuture(FutureUtils.retry(() -> {
            return CompletableFuture.supplyAsync(() -> {
                if (atomicInteger.incrementAndGet() == 10) {
                    return true;
                }
                throw new CompletionException((Throwable) new FlinkException("Test exception"));
            }, EXECUTOR_RESOURCE.getExecutor());
        }, 10, EXECUTOR_RESOURCE.getExecutor())).eventuallySucceeds().isEqualTo(true);
        Assertions.assertThat(atomicInteger).hasValue(10);
    }

    @Test
    void testRetryFailureFixedRetries() {
        FlinkAssertions.assertThatFuture(FutureUtils.retry(() -> {
            return FutureUtils.completedExceptionally(new FlinkException("Test exception"));
        }, 3, EXECUTOR_RESOURCE.getExecutor())).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(FutureUtils.RetryException.class);
    }

    @Test
    void testRetryCancellation() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        AtomicReference atomicReference = new AtomicReference(null);
        CompletableFuture retry = FutureUtils.retry(() -> {
            return CompletableFuture.supplyAsync(() -> {
                if (atomicInteger.incrementAndGet() == 2) {
                    oneShotLatch.trigger();
                    try {
                        oneShotLatch2.await();
                    } catch (InterruptedException e) {
                        atomicReference.compareAndSet(null, e);
                    }
                }
                throw new CompletionException((Throwable) new FlinkException("Test exception"));
            }, EXECUTOR_RESOURCE.getExecutor());
        }, 10, EXECUTOR_RESOURCE.getExecutor());
        oneShotLatch.await();
        Assertions.assertThat(retry).isNotDone();
        retry.cancel(false);
        oneShotLatch2.trigger();
        Assertions.assertThat(retry).isCancelled();
        Assertions.assertThat(atomicInteger).hasValue(2);
        Assertions.assertThat((Throwable) atomicReference.get()).isNull();
    }

    @Test
    void testStopAtNonRetryableException() {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        FlinkRuntimeException flinkRuntimeException = new FlinkRuntimeException("Non-retryable exception");
        FlinkAssertions.assertThatFuture(FutureUtils.retry(() -> {
            return CompletableFuture.supplyAsync(() -> {
                if (atomicInteger.incrementAndGet() == 3) {
                    throw new CompletionException((Throwable) flinkRuntimeException);
                }
                throw new CompletionException((Throwable) new FlinkException("Test exception"));
            }, EXECUTOR_RESOURCE.getExecutor());
        }, 10, th -> {
            return ExceptionUtils.findThrowable(th, FlinkException.class).isPresent();
        }, EXECUTOR_RESOURCE.getExecutor())).eventuallyFailsWith(ExecutionException.class).extracting((v0) -> {
            return FlinkAssertions.chainOfCauses(v0);
        }, FlinkAssertions.STREAM_THROWABLE).last().isEqualTo(flinkRuntimeException);
        Assertions.assertThat(atomicInteger).hasValue(3);
    }

    @Test
    void testRetryWithDelayRetryStrategyFailure() {
        FlinkAssertions.assertThatFuture(FutureUtils.retryWithDelay(() -> {
            return FutureUtils.completedExceptionally(new FlinkException("Test exception"));
        }, new FixedRetryStrategy(3, Duration.ofMillis(1L)), new ScheduledExecutorServiceAdapter((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()))).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(FutureUtils.RetryException.class);
    }

    @Test
    void testRetryWithDelayRetryStrategy() {
        AtomicInteger atomicInteger = new AtomicInteger(4);
        long currentTimeMillis = System.currentTimeMillis();
        FlinkAssertions.assertThatFuture(FutureUtils.retryWithDelay(() -> {
            return atomicInteger.getAndDecrement() == 0 ? CompletableFuture.completedFuture(true) : FutureUtils.completedExceptionally(new FlinkException("Test exception."));
        }, new ExponentialBackoffRetryStrategy(4, Duration.ofMillis(2L), Duration.ofMillis(5L)), new ScheduledExecutorServiceAdapter((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()))).eventuallySucceeds().isEqualTo(true);
        Assertions.assertThat(System.currentTimeMillis() - currentTimeMillis).as("The completion time should be at least retries times delay between retries.", new Object[0]).isGreaterThanOrEqualTo(16L);
    }

    @Test
    void testRetryWithDelayRetryStrategyCancellation() {
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        CompletableFuture retryWithDelay = FutureUtils.retryWithDelay(() -> {
            return FutureUtils.completedExceptionally(new FlinkException("Test exception"));
        }, new FixedRetryStrategy(1, TestingUtils.infiniteDuration()), manuallyTriggeredScheduledExecutor);
        Assertions.assertThat(retryWithDelay).isNotDone();
        Collection<ScheduledFuture<?>> activeScheduledTasks = manuallyTriggeredScheduledExecutor.getActiveScheduledTasks();
        Assertions.assertThat(activeScheduledTasks).isNotEmpty();
        ScheduledFuture<?> next = activeScheduledTasks.iterator().next();
        Assertions.assertThat(next.isDone()).isFalse();
        retryWithDelay.cancel(false);
        Assertions.assertThat(retryWithDelay).isCancelled();
        Assertions.assertThat(next.isCancelled()).isTrue();
    }

    @Test
    void testOrTimeout() {
        CompletableFuture completableFuture = new CompletableFuture();
        FutureUtils.orTimeout(completableFuture, 10L, TimeUnit.MILLISECONDS, "testOrTimeout");
        FlinkAssertions.assertThatFuture(completableFuture).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(TimeoutException.class).withMessageContaining("testOrTimeout");
    }

    @Test
    void testRetryWithDelayRetryStrategyAndPredicate() {
        FlinkAssertions.assertThatFuture(FutureUtils.retryWithDelay(new Supplier<CompletableFuture<String>>() { // from class: org.apache.flink.util.concurrent.FutureUtilsTest.1TestStringSupplier
            private final AtomicInteger counter = new AtomicInteger();

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.function.Supplier
            public CompletableFuture<String> get() {
                return this.counter.getAndIncrement() == 0 ? FutureUtils.completedExceptionally(new RuntimeException("first exception")) : FutureUtils.completedExceptionally(new RuntimeException("should propagate"));
            }
        }, new FixedRetryStrategy(1, Duration.ZERO), th -> {
            return (th instanceof RuntimeException) && th.getMessage().contains("first exception");
        }, new ScheduledExecutorServiceAdapter((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()))).eventuallyFailsWith(ExecutionException.class).withMessageContaining("should propagate");
    }

    @Test
    void testRunAfterwards() {
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        oneShotLatch.getClass();
        CompletableFuture runAfterwards = FutureUtils.runAfterwards(completableFuture, oneShotLatch::trigger);
        Assertions.assertThat(oneShotLatch.isTriggered()).isFalse();
        Assertions.assertThat(runAfterwards).isNotDone();
        completableFuture.complete(null);
        Assertions.assertThat(oneShotLatch.isTriggered()).isTrue();
        FlinkAssertions.assertThatFuture(runAfterwards).eventuallySucceeds();
    }

    @Test
    void testRunAfterwardsExceptional() {
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        FlinkException flinkException = new FlinkException("Test exception");
        oneShotLatch.getClass();
        CompletableFuture runAfterwards = FutureUtils.runAfterwards(completableFuture, oneShotLatch::trigger);
        Assertions.assertThat(oneShotLatch.isTriggered()).isFalse();
        Assertions.assertThat(runAfterwards).isNotDone();
        completableFuture.completeExceptionally(flinkException);
        Assertions.assertThat(oneShotLatch.isTriggered()).isTrue();
        Assertions.assertThat(runAfterwards).isDone();
        FlinkAssertions.assertThatFuture(runAfterwards).eventuallyFailsWith(ExecutionException.class).withCause(flinkException);
    }

    @Test
    void testComposeAfterwards() {
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        CompletableFuture composeAfterwards = FutureUtils.composeAfterwards(completableFuture, () -> {
            oneShotLatch.trigger();
            return CompletableFuture.completedFuture(null);
        });
        Assertions.assertThat(oneShotLatch.isTriggered()).isFalse();
        Assertions.assertThat(composeAfterwards).isNotDone();
        completableFuture.complete(null);
        Assertions.assertThat(oneShotLatch.isTriggered()).isTrue();
        FlinkAssertions.assertThatFuture(composeAfterwards).eventuallySucceeds();
    }

    @Test
    void testComposeAfterwardsFirstExceptional() {
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        FlinkException flinkException = new FlinkException("Test exception");
        CompletableFuture composeAfterwards = FutureUtils.composeAfterwards(completableFuture, () -> {
            oneShotLatch.trigger();
            return CompletableFuture.completedFuture(null);
        });
        Assertions.assertThat(oneShotLatch.isTriggered()).isFalse();
        Assertions.assertThat(composeAfterwards).isNotDone();
        completableFuture.completeExceptionally(flinkException);
        Assertions.assertThat(oneShotLatch.isTriggered()).isTrue();
        Assertions.assertThat(composeAfterwards).isDone();
        FlinkAssertions.assertThatFuture(composeAfterwards).eventuallyFailsWith(ExecutionException.class).withCause(flinkException);
    }

    @Test
    void testComposeAfterwardsSecondExceptional() {
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        FlinkException flinkException = new FlinkException("Test exception");
        CompletableFuture composeAfterwards = FutureUtils.composeAfterwards(completableFuture, () -> {
            oneShotLatch.trigger();
            return FutureUtils.completedExceptionally(flinkException);
        });
        Assertions.assertThat(oneShotLatch.isTriggered()).isFalse();
        Assertions.assertThat(composeAfterwards).isNotDone();
        completableFuture.complete(null);
        Assertions.assertThat(oneShotLatch.isTriggered()).isTrue();
        Assertions.assertThat(composeAfterwards).isDone();
        FlinkAssertions.assertThatFuture(composeAfterwards).eventuallyFailsWith(ExecutionException.class).withCause(flinkException);
    }

    @Test
    void testComposeAfterwardsBothExceptional() {
        CompletableFuture completableFuture = new CompletableFuture();
        FlinkException flinkException = new FlinkException("Test exception1");
        FlinkException flinkException2 = new FlinkException("Test exception2");
        OneShotLatch oneShotLatch = new OneShotLatch();
        CompletableFuture composeAfterwards = FutureUtils.composeAfterwards(completableFuture, () -> {
            oneShotLatch.trigger();
            return FutureUtils.completedExceptionally(flinkException2);
        });
        Assertions.assertThat(oneShotLatch.isTriggered()).isFalse();
        Assertions.assertThat(composeAfterwards).isNotDone();
        completableFuture.completeExceptionally(flinkException);
        Assertions.assertThat(oneShotLatch.isTriggered()).isTrue();
        Assertions.assertThat(composeAfterwards).isDone();
        FlinkAssertions.assertThatFuture(composeAfterwards).eventuallyFailsWith(ExecutionException.class).extracting((v0) -> {
            return v0.getCause();
        }).isEqualTo(flinkException).satisfies(new ThrowingConsumer[]{th -> {
            Assertions.assertThat(th.getSuppressed()).containsExactly(new Throwable[]{flinkException2});
        }});
    }

    @Test
    void testCompleteAll() {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        List asList = Arrays.asList(completableFuture, completableFuture2);
        FutureUtils.ConjunctFuture completeAll = FutureUtils.completeAll(asList);
        Assertions.assertThat(completeAll).isNotDone();
        Assertions.assertThat(completeAll.getNumFuturesCompleted()).isZero();
        Assertions.assertThat(completeAll.getNumFuturesTotal()).isEqualTo(asList.size());
        completableFuture2.complete(42);
        Assertions.assertThat(completeAll).isNotDone();
        Assertions.assertThat(completeAll.getNumFuturesCompleted()).isOne();
        completableFuture.complete("foobar");
        Assertions.assertThat(completeAll).isDone();
        Assertions.assertThat(completeAll.getNumFuturesCompleted()).isEqualTo(2);
        FlinkAssertions.assertThatFuture(completeAll).eventuallySucceeds();
    }

    @Test
    void testCompleteAllPartialExceptional() {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        List asList = Arrays.asList(completableFuture, completableFuture2);
        FutureUtils.ConjunctFuture completeAll = FutureUtils.completeAll(asList);
        Assertions.assertThat(completeAll).isNotDone();
        Assertions.assertThat(completeAll.getNumFuturesCompleted()).isZero();
        Assertions.assertThat(completeAll.getNumFuturesTotal()).isEqualTo(asList.size());
        FlinkException flinkException = new FlinkException("Test exception 1");
        completableFuture2.completeExceptionally(flinkException);
        Assertions.assertThat(completeAll).isNotDone();
        Assertions.assertThat(completeAll.getNumFuturesCompleted()).isOne();
        completableFuture.complete("foobar");
        Assertions.assertThat(completeAll).isDone();
        Assertions.assertThat(completeAll.getNumFuturesCompleted()).isEqualTo(2);
        FlinkAssertions.assertThatFuture(completeAll).eventuallyFailsWith(ExecutionException.class).withCause(flinkException);
    }

    @Test
    void testCompleteAllExceptional() {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        List asList = Arrays.asList(completableFuture, completableFuture2);
        FutureUtils.ConjunctFuture completeAll = FutureUtils.completeAll(asList);
        Assertions.assertThat(completeAll).isNotDone();
        Assertions.assertThat(completeAll.getNumFuturesCompleted()).isZero();
        Assertions.assertThat(completeAll.getNumFuturesTotal()).isEqualTo(asList.size());
        FlinkException flinkException = new FlinkException("Test exception 1");
        completableFuture.completeExceptionally(flinkException);
        Assertions.assertThat(completeAll).isNotDone();
        Assertions.assertThat(completeAll.getNumFuturesCompleted()).isOne();
        FlinkException flinkException2 = new FlinkException("Test exception 2");
        completableFuture2.completeExceptionally(flinkException2);
        Assertions.assertThat(completeAll.getNumFuturesCompleted()).isEqualTo(2);
        FlinkAssertions.assertThatFuture(completeAll).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(FlinkException.class).extracting((v0) -> {
            return v0.getCause();
        }).satisfies(new ThrowingConsumer[]{th -> {
            Assertions.assertThat(th.getSuppressed()).containsExactly(new Throwable[]{th.equals(flinkException) ? flinkException2 : flinkException});
        }});
    }

    @Test
    void testSupplyAsyncFailure() {
        FlinkException flinkException = new FlinkException("Test exception");
        FlinkAssertions.assertThatFuture(FutureUtils.supplyAsync(() -> {
            throw flinkException;
        }, EXECUTOR_RESOURCE.getExecutor())).eventuallyFailsWith(ExecutionException.class).withCause(flinkException);
    }

    @Test
    void testSupplyAsync() {
        Object obj = new Object();
        FlinkAssertions.assertThatFuture(FutureUtils.supplyAsync(() -> {
            return obj;
        }, EXECUTOR_RESOURCE.getExecutor())).eventuallySucceeds().isEqualTo(obj);
    }

    @Test
    void testHandleAsyncIfNotDone() {
        testFutureContinuation((completableFuture, executor) -> {
            return FutureUtils.handleAsyncIfNotDone(completableFuture, executor, (obj, th) -> {
                return null;
            });
        });
    }

    @Test
    void testApplyAsyncIfNotDone() {
        testFutureContinuation((completableFuture, executor) -> {
            return FutureUtils.thenApplyAsyncIfNotDone(completableFuture, executor, obj -> {
                return null;
            });
        });
    }

    @Test
    void testComposeAsyncIfNotDone() {
        testFutureContinuation((completableFuture, executor) -> {
            return FutureUtils.thenComposeAsyncIfNotDone(completableFuture, executor, obj -> {
                return null;
            });
        });
    }

    @Test
    void testWhenCompleteAsyncIfNotDone() {
        testFutureContinuation((completableFuture, executor) -> {
            return FutureUtils.whenCompleteAsyncIfNotDone(completableFuture, executor, (obj, th) -> {
            });
        });
    }

    @Test
    void testThenAcceptAsyncIfNotDone() {
        testFutureContinuation((completableFuture, executor) -> {
            return FutureUtils.thenAcceptAsyncIfNotDone(completableFuture, executor, obj -> {
            });
        });
    }

    private void testFutureContinuation(BiFunction<CompletableFuture<?>, Executor, CompletableFuture<?>> biFunction) {
        CompletableFuture<?> completableFuture = new CompletableFuture<>();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Executor executor = runnable -> {
            runnable.run();
            atomicBoolean.set(true);
        };
        CompletableFuture<?> apply = biFunction.apply(completableFuture, executor);
        Assertions.assertThat(apply).isNotDone();
        completableFuture.complete(null);
        Assertions.assertThat(atomicBoolean).isTrue();
        Assertions.assertThat(apply).isDone();
        atomicBoolean.set(false);
        CompletableFuture<?> apply2 = biFunction.apply(completableFuture, executor);
        Assertions.assertThat(atomicBoolean).isFalse();
        Assertions.assertThat(apply2).isDone();
    }

    @Test
    void testHandleExceptionWithCompletedFuture() {
        FlinkAssertions.assertThatFuture(FutureUtils.handleException(CompletableFuture.completedFuture("foobar"), Exception.class, exc -> {
            return "handled";
        })).eventuallySucceeds().isEqualTo("foobar");
    }

    @Test
    void testHandleExceptionWithNormalCompletion() {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture handleException = FutureUtils.handleException(completableFuture, Exception.class, exc -> {
            return "handled";
        });
        completableFuture.complete("foobar");
        Assertions.assertThat(handleException).isCompletedWithValue("foobar");
    }

    @Test
    void testHandleExceptionWithMatchingExceptionallyCompletedFuture() {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture handleException = FutureUtils.handleException(completableFuture, UnsupportedOperationException.class, unsupportedOperationException -> {
            return "handled";
        });
        completableFuture.completeExceptionally(new UnsupportedOperationException("foobar"));
        Assertions.assertThat(handleException).isCompletedWithValue("handled");
    }

    @Test
    void testHandleExceptionWithNotMatchingExceptionallyCompletedFuture() {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture handleException = FutureUtils.handleException(completableFuture, UnsupportedOperationException.class, unsupportedOperationException -> {
            return "handled";
        });
        IllegalArgumentException illegalArgumentException = new IllegalArgumentException("foobar");
        completableFuture.completeExceptionally(illegalArgumentException);
        FlinkAssertions.assertThatFuture(handleException).eventuallyFailsWith(ExecutionException.class).withCause(illegalArgumentException);
    }

    @Test
    void testHandleExceptionWithThrowingExceptionHandler() {
        CompletableFuture completableFuture = new CompletableFuture();
        IllegalStateException illegalStateException = new IllegalStateException("something went terribly wrong");
        CompletableFuture handleException = FutureUtils.handleException(completableFuture, UnsupportedOperationException.class, unsupportedOperationException -> {
            throw illegalStateException;
        });
        completableFuture.completeExceptionally(new UnsupportedOperationException("foobar"));
        FlinkAssertions.assertThatFuture(handleException).eventuallyFailsWith(ExecutionException.class).withCause(illegalStateException);
    }

    @Test
    void testHandleUncaughtExceptionWithCompletedFuture() {
        CompletableFuture completedFuture = CompletableFuture.completedFuture("foobar");
        TestingUncaughtExceptionHandler testingUncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        FutureUtils.handleUncaughtException(completedFuture, testingUncaughtExceptionHandler);
        Assertions.assertThat(testingUncaughtExceptionHandler.hasBeenCalled()).isFalse();
    }

    @Test
    void testHandleUncaughtExceptionWithNormalCompletion() {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingUncaughtExceptionHandler testingUncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        FutureUtils.handleUncaughtException(completableFuture, testingUncaughtExceptionHandler);
        completableFuture.complete("barfoo");
        Assertions.assertThat(testingUncaughtExceptionHandler.hasBeenCalled()).isFalse();
    }

    @Test
    void testHandleUncaughtExceptionWithExceptionallyCompletedFuture() {
        CompletableFuture completedExceptionally = FutureUtils.completedExceptionally(new FlinkException("foobar"));
        TestingUncaughtExceptionHandler testingUncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        FutureUtils.handleUncaughtException(completedExceptionally, testingUncaughtExceptionHandler);
        Assertions.assertThat(testingUncaughtExceptionHandler.hasBeenCalled()).isTrue();
    }

    @Test
    void testHandleUncaughtExceptionWithExceptionallyCompletion() {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingUncaughtExceptionHandler testingUncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        FutureUtils.handleUncaughtException(completableFuture, testingUncaughtExceptionHandler);
        Assertions.assertThat(testingUncaughtExceptionHandler.hasBeenCalled()).isFalse();
        completableFuture.completeExceptionally(new FlinkException("barfoo"));
        Assertions.assertThat(testingUncaughtExceptionHandler.hasBeenCalled()).isTrue();
    }

    @Test
    void testHandleUncaughtExceptionWithBuggyErrorHandlingCode() {
        Exception exc = new Exception("Actual production code error that should be caught by the error handler.");
        RuntimeException runtimeException = new RuntimeException("Expected test error in error handling code.");
        Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (thread, th) -> {
            throw runtimeException;
        };
        AtomicReference atomicReference = new AtomicReference();
        FutureUtils.handleUncaughtException(FutureUtils.completedExceptionally(exc), uncaughtExceptionHandler, (thread2, th2) -> {
            atomicReference.set(th2);
        });
        Assertions.assertThat(atomicReference).hasValueSatisfying(th3 -> {
            Assertions.assertThat(th3).isInstanceOf(IllegalStateException.class).hasRootCause(runtimeException).satisfies(new ThrowingConsumer[]{th3 -> {
                Assertions.assertThat(th3.getSuppressed()).containsExactly(new Throwable[]{exc});
            }});
        });
    }

    @Test
    void testForwardNormal() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        FutureUtils.forward(completableFuture, completableFuture2);
        Assertions.assertThat(completableFuture).isNotDone();
        Assertions.assertThat(completableFuture2).isNotDone();
        completableFuture.complete("foobar");
        Assertions.assertThat(completableFuture).isDone();
        Assertions.assertThat(completableFuture2).isDone();
        Assertions.assertThat((String) completableFuture.get()).isEqualTo((String) completableFuture2.get());
    }

    @Test
    void testForwardExceptionally() {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        FutureUtils.forward(completableFuture, completableFuture2);
        Assertions.assertThat(completableFuture).isNotDone();
        Assertions.assertThat(completableFuture2).isNotDone();
        FlinkException flinkException = new FlinkException("Expected exception");
        completableFuture.completeExceptionally(flinkException);
        Assertions.assertThat(completableFuture).isDone();
        Assertions.assertThat(completableFuture2).isDone();
        FlinkAssertions.assertThatFuture(completableFuture).eventuallyFailsWith(ExecutionException.class).extracting((v0) -> {
            return v0.getCause();
        }).isEqualTo(flinkException);
        FlinkAssertions.assertThatFuture(completableFuture2).eventuallyFailsWith(ExecutionException.class).extracting((v0) -> {
            return v0.getCause();
        }).isEqualTo(flinkException);
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor, java.util.concurrent.Executor] */
    @Test
    void testForwardAsync() {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        ?? manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        FutureUtils.forwardAsync(completableFuture, completableFuture2, (Executor) manuallyTriggeredScheduledExecutor);
        completableFuture.complete("foobar");
        Assertions.assertThat(completableFuture2).isNotDone();
        manuallyTriggeredScheduledExecutor.triggerAll();
        FlinkAssertions.assertThatFuture(completableFuture2).eventuallySucceeds().isEqualTo("foobar");
    }

    @Test
    void testGetWithoutException() {
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.complete(1);
        Assertions.assertThat((Integer) FutureUtils.getWithoutException(completableFuture)).isEqualTo(1);
    }

    @Test
    void testGetWithoutExceptionWithAnException() {
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.completeExceptionally(new RuntimeException("expected"));
        Assertions.assertThat((Integer) FutureUtils.getWithoutException(completableFuture)).isNull();
    }

    @Test
    void testGetWithoutExceptionWithoutFinishing() {
        Assertions.assertThat((Integer) FutureUtils.getWithoutException(new CompletableFuture())).isNull();
    }

    @Test
    void testSwitchExecutorForNormallyCompletedFuture() {
        CompletableFuture completableFuture = new CompletableFuture();
        ExecutorService executor = EXECUTOR_RESOURCE.getExecutor();
        CompletableFuture switchExecutor = FutureUtils.switchExecutor(completableFuture, executor);
        String str = (String) FutureUtils.supplyAsync(() -> {
            return Thread.currentThread().getName();
        }, executor).join();
        CompletableFuture handle = switchExecutor.handle((str2, th) -> {
            Assertions.assertThat(str2).isEqualTo("foobar");
            Assertions.assertThat(Thread.currentThread().getName()).isEqualTo(str);
            return null;
        });
        completableFuture.complete("foobar");
        handle.join();
    }

    @Test
    void testSwitchExecutorForExceptionallyCompletedFuture() {
        CompletableFuture completableFuture = new CompletableFuture();
        ExecutorService executor = EXECUTOR_RESOURCE.getExecutor();
        CompletableFuture switchExecutor = FutureUtils.switchExecutor(completableFuture, executor);
        String str = (String) FutureUtils.supplyAsync(() -> {
            return Thread.currentThread().getName();
        }, executor).join();
        Exception exc = new Exception("foobar");
        CompletableFuture handle = switchExecutor.handle((str2, th) -> {
            Assertions.assertThat(th).isInstanceOf(CompletionException.class).extracting((v0) -> {
                return v0.getCause();
            }).isEqualTo(exc);
            Assertions.assertThat(Thread.currentThread().getName()).isEqualTo(str);
            return null;
        });
        completableFuture.completeExceptionally(exc);
        handle.join();
    }
}
