package org.apache.flink.runtime.concurrent;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtilsTest.class */
public class FutureUtilsTest extends TestLogger {
    @Test
    public void testRetrySuccess() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Assert.assertTrue(((Boolean) FutureUtils.retry(() -> {
            return CompletableFuture.supplyAsync(() -> {
                if (atomicInteger.incrementAndGet() == 10) {
                    return true;
                }
                throw new CompletionException((Throwable) new FlinkException("Test exception"));
            }, TestingUtils.defaultExecutor());
        }, 10, TestingUtils.defaultExecutor()).get()).booleanValue());
        Assert.assertTrue(10 == atomicInteger.get());
    }

    @Test(expected = FutureUtils.RetryException.class)
    public void testRetryFailure() throws Throwable {
        try {
            FutureUtils.retry(() -> {
                return FutureUtils.completedExceptionally(new FlinkException("Test exception"));
            }, 3, TestingUtils.defaultExecutor()).get();
        } catch (ExecutionException e) {
            throw ExceptionUtils.stripExecutionException(e);
        }
    }

    @Test
    public void testRetryCancellation() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        AtomicReference atomicReference = new AtomicReference(null);
        CompletableFuture retry = FutureUtils.retry(() -> {
            return CompletableFuture.supplyAsync(() -> {
                if (atomicInteger.incrementAndGet() == 2) {
                    oneShotLatch.trigger();
                    try {
                        oneShotLatch2.await();
                    } catch (InterruptedException e) {
                        atomicReference.compareAndSet(null, e);
                    }
                }
                throw new CompletionException((Throwable) new FlinkException("Test exception"));
            }, TestingUtils.defaultExecutor());
        }, 10, TestingUtils.defaultExecutor());
        oneShotLatch.await();
        Assert.assertFalse(retry.isDone());
        retry.cancel(false);
        oneShotLatch2.trigger();
        Assert.assertTrue(retry.isCancelled());
        Assert.assertEquals(2L, atomicInteger.get());
        if (atomicReference.get() != null) {
            throw new FlinkException("Exception occurred in the retry operation.", (Throwable) atomicReference.get());
        }
    }

    @Test(expected = FutureUtils.RetryException.class)
    public void testRetryWithDelayFailure() throws Throwable {
        try {
            FutureUtils.retryWithDelay(() -> {
                return FutureUtils.completedExceptionally(new FlinkException("Test exception"));
            }, 3, Time.milliseconds(1L), TestingUtils.defaultScheduledExecutor()).get(TestingUtils.TIMEOUT().toMilliseconds(), TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            throw ExceptionUtils.stripExecutionException(e);
        }
    }

    @Test
    public void testRetryWithDelay() throws Exception {
        Time milliseconds = Time.milliseconds(50L);
        AtomicInteger atomicInteger = new AtomicInteger(4);
        CompletableFuture retryWithDelay = FutureUtils.retryWithDelay(() -> {
            return atomicInteger.getAndDecrement() == 0 ? CompletableFuture.completedFuture(true) : FutureUtils.completedExceptionally(new FlinkException("Test exception."));
        }, 4, milliseconds, TestingUtils.defaultScheduledExecutor());
        long currentTimeMillis = System.currentTimeMillis();
        Boolean bool = (Boolean) retryWithDelay.get();
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        Assert.assertTrue(bool.booleanValue());
        Assert.assertTrue("The completion time should be at least rertries times delay between retries.", currentTimeMillis2 >= 4 * milliseconds.toMilliseconds());
    }

    @Test
    public void testRetryWithDelayCancellation() {
        ScheduledFuture scheduledFuture = (ScheduledFuture) Mockito.mock(ScheduledFuture.class);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor) Mockito.mock(ScheduledExecutor.class);
        ((ScheduledExecutor) Mockito.doReturn(scheduledFuture).when(scheduledExecutor)).schedule((Runnable) Matchers.any(Runnable.class), Matchers.anyLong(), (TimeUnit) Matchers.any(TimeUnit.class));
        ((ScheduledExecutor) Mockito.doAnswer(invocationOnMock -> {
            ((Runnable) invocationOnMock.getArgumentAt(0, Runnable.class)).run();
            return null;
        }).when(scheduledExecutor)).execute((Runnable) Matchers.any(Runnable.class));
        CompletableFuture retryWithDelay = FutureUtils.retryWithDelay(() -> {
            return FutureUtils.completedExceptionally(new FlinkException("Test exception"));
        }, 1, TestingUtils.infiniteTime(), scheduledExecutor);
        Assert.assertFalse(retryWithDelay.isDone());
        ((ScheduledExecutor) Mockito.verify(scheduledExecutor)).schedule((Runnable) Matchers.any(Runnable.class), Matchers.anyLong(), (TimeUnit) Matchers.any(TimeUnit.class));
        retryWithDelay.cancel(false);
        Assert.assertTrue(retryWithDelay.isCancelled());
        ((ScheduledFuture) Mockito.verify(scheduledFuture)).cancel(Matchers.anyBoolean());
    }

    @Test
    public void testOrTimeout() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        FutureUtils.orTimeout(completableFuture, 10L, TimeUnit.MILLISECONDS);
        try {
            completableFuture.get();
        } catch (ExecutionException e) {
            Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
        }
    }
}
