package org.apache.flink.runtime.concurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/concurrent/FlinkFutureTest.class */
public class FlinkFutureTest extends TestLogger {
    private static ExecutorService executor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/concurrent/FlinkFutureTest$TestException.class */
    public static class TestException extends RuntimeException {
        private static final long serialVersionUID = -1274022962838535130L;

        public TestException(String str) {
            super(str);
        }
    }

    @BeforeClass
    public static void setup() {
        executor = Executors.newSingleThreadExecutor();
    }

    @AfterClass
    public static void teardown() {
        executor.shutdown();
    }

    @Test(timeout = 10000)
    public void testFutureApplyAsync() throws Exception {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        Future thenApplyAsync = flinkCompletableFuture.thenApplyAsync(new ApplyFunction<Integer, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.1
            public String apply(Integer num) {
                return String.valueOf(num);
            }
        }, executor);
        flinkCompletableFuture.complete(42);
        Assert.assertEquals(String.valueOf(42), thenApplyAsync.get());
    }

    @Test(expected = TimeoutException.class)
    public void testFutureGetTimeout() throws InterruptedException, ExecutionException, TimeoutException {
        new FlinkCompletableFuture().get(10L, TimeUnit.MILLISECONDS);
        Assert.fail("Get should have thrown a timeout exception.");
    }

    @Test(expected = TestException.class)
    public void testExceptionalCompletion() throws Throwable {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        flinkCompletableFuture.completeExceptionally(new TestException("Test exception"));
        try {
            flinkCompletableFuture.get();
            Assert.fail("Get should have thrown an exception.");
        } catch (ExecutionException e) {
            throw e.getCause();
        }
    }

    @Test(expected = TestException.class)
    public void testExceptionPropagation() throws Throwable {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        Future thenApplyAsync = flinkCompletableFuture.thenApplyAsync(new ApplyFunction<Integer, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.2
            public String apply(Integer num) {
                throw new TestException("Test exception");
            }
        }, executor).thenApplyAsync(new ApplyFunction<String, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.3
            public String apply(String str) {
                return "foobar";
            }
        }, executor);
        flinkCompletableFuture.complete(42);
        try {
            thenApplyAsync.get();
            Assert.fail("Get should have thrown an exception.");
        } catch (ExecutionException e) {
            throw e.getCause();
        }
    }

    @Test(timeout = 10000)
    public void testExceptionallyAsync() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        Future exceptionallyAsync = flinkCompletableFuture.exceptionallyAsync(new ApplyFunction<Throwable, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.4
            public String apply(Throwable th) {
                return th.getMessage();
            }
        }, executor);
        flinkCompletableFuture.completeExceptionally(new TestException("Foobar"));
        Assert.assertEquals("Foobar", (String) exceptionallyAsync.get());
    }

    @Test(timeout = 10000)
    public void testComposeAsync() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        Future thenComposeAsync = flinkCompletableFuture.thenComposeAsync(new ApplyFunction<Integer, Future<Integer>>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.5
            public Future<Integer> apply(Integer num) {
                return FlinkFuture.supplyAsync(new Callable<Integer>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.5.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Integer call() throws Exception {
                        return 42;
                    }
                }, FlinkFutureTest.executor);
            }
        }, executor);
        flinkCompletableFuture.complete(42);
        Assert.assertEquals(42L, ((Integer) thenComposeAsync.get()).intValue());
    }

    @Test(timeout = 10000)
    public void testCombineAsync() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        FlinkCompletableFuture flinkCompletableFuture2 = new FlinkCompletableFuture();
        Future thenCombineAsync = flinkCompletableFuture.thenCombineAsync(flinkCompletableFuture2, new BiFunction<Integer, String, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.6
            public String apply(Integer num, String str) {
                return str + num;
            }
        }, executor);
        flinkCompletableFuture.complete(42);
        flinkCompletableFuture2.complete("foobar");
        Assert.assertEquals("foobar42", (String) thenCombineAsync.get());
    }

    @Test(timeout = 10000)
    public void testCombineAsyncLeftFailure() throws InterruptedException {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        FlinkCompletableFuture flinkCompletableFuture2 = new FlinkCompletableFuture();
        TestException testException = new TestException("barfoo");
        Future thenCombineAsync = flinkCompletableFuture.thenCombineAsync(flinkCompletableFuture2, new BiFunction<Integer, String, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.7
            public String apply(Integer num, String str) {
                return str + num;
            }
        }, executor);
        flinkCompletableFuture.completeExceptionally(testException);
        flinkCompletableFuture2.complete("foobar");
        try {
            thenCombineAsync.get();
            Assert.fail("We should have caught an ExecutionException.");
        } catch (ExecutionException e) {
            Assert.assertEquals(testException, e.getCause());
        }
    }

    @Test(timeout = 10000)
    public void testCombineAsyncRightFailure() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        FlinkCompletableFuture flinkCompletableFuture2 = new FlinkCompletableFuture();
        TestException testException = new TestException("barfoo");
        Future thenCombineAsync = flinkCompletableFuture.thenCombineAsync(flinkCompletableFuture2, new BiFunction<Integer, String, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.8
            public String apply(Integer num, String str) {
                return str + num;
            }
        }, executor);
        flinkCompletableFuture.complete(42);
        flinkCompletableFuture2.completeExceptionally(testException);
        try {
            thenCombineAsync.get();
            Assert.fail("We should have caught an ExecutionException.");
        } catch (ExecutionException e) {
            Assert.assertEquals(testException, e.getCause());
        }
    }

    @Test
    public void testGetNow() throws ExecutionException {
        Assert.assertEquals(new Integer(41), new FlinkCompletableFuture().getNow(41));
    }

    @Test(timeout = 10000)
    public void testAcceptAsync() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Future thenAcceptAsync = flinkCompletableFuture.thenAcceptAsync(new AcceptFunction<Integer>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.9
            public void accept(Integer num) {
                atomicInteger.set(num.intValue());
            }
        }, executor);
        flinkCompletableFuture.complete(42);
        thenAcceptAsync.get();
        Assert.assertEquals(42, atomicInteger.get());
    }

    @Test(timeout = 10000)
    public void testHandleAsync() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        Future handleAsync = flinkCompletableFuture.handleAsync(new BiFunction<Integer, Throwable, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.10
            public String apply(Integer num, Throwable th) {
                return num != null ? String.valueOf(num) : th.getMessage();
            }
        }, executor);
        flinkCompletableFuture.complete(43);
        Assert.assertEquals(String.valueOf(43), handleAsync.get());
    }

    @Test(timeout = 10000)
    public void testHandleAsyncException() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        Future handleAsync = flinkCompletableFuture.handleAsync(new BiFunction<Integer, Throwable, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.11
            public String apply(Integer num, Throwable th) {
                return num != null ? String.valueOf(num) : th.getMessage();
            }
        }, executor);
        flinkCompletableFuture.completeExceptionally(new TestException("foobar"));
        Assert.assertEquals("foobar", handleAsync.get());
    }

    @Test(timeout = 10000)
    public void testMultipleCompleteOperations() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        Assert.assertTrue(flinkCompletableFuture.complete(42));
        Assert.assertFalse(flinkCompletableFuture.complete(1337));
        Assert.assertFalse(flinkCompletableFuture.completeExceptionally(new TestException("foobar")));
        Assert.assertEquals(new Integer(42), flinkCompletableFuture.get());
    }

    @Test
    public void testApply() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        Future thenApply = flinkCompletableFuture.thenApply(new ApplyFunction<Integer, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.12
            public String apply(Integer num) {
                return String.valueOf(num);
            }
        });
        flinkCompletableFuture.complete(42);
        Assert.assertEquals(String.valueOf(42), thenApply.get());
    }

    @Test
    public void testAccept() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture completed = FlinkCompletableFuture.completed(42);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        completed.thenAccept(new AcceptFunction<Integer>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.13
            public void accept(Integer num) {
                atomicInteger.set(num.intValue());
            }
        }).get();
        Assert.assertEquals(42, atomicInteger.get());
    }

    @Test
    public void testExceptionally() throws ExecutionException, InterruptedException {
        Assert.assertEquals("Foobar", (String) FlinkCompletableFuture.completedExceptionally(new TestException("Foobar")).exceptionally(new ApplyFunction<Throwable, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.14
            public String apply(Throwable th) {
                return th.getMessage();
            }
        }).get());
    }

    @Test
    public void testHandle() throws ExecutionException, InterruptedException {
        Assert.assertEquals(String.valueOf(43), FlinkCompletableFuture.completed(43).handle(new BiFunction<Integer, Throwable, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.15
            public String apply(Integer num, Throwable th) {
                return num != null ? String.valueOf(num) : th.getMessage();
            }
        }).get());
    }

    @Test
    public void testCompose() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        Future thenCompose = flinkCompletableFuture.thenCompose(new ApplyFunction<Integer, Future<Integer>>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.16
            public Future<Integer> apply(Integer num) {
                return FlinkFuture.supplyAsync(new Callable<Integer>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.16.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Integer call() throws Exception {
                        return 42;
                    }
                }, FlinkFutureTest.executor);
            }
        });
        flinkCompletableFuture.complete(42);
        Assert.assertEquals(42L, ((Integer) thenCompose.get()).intValue());
    }

    @Test
    public void testCombine() throws ExecutionException, InterruptedException {
        Assert.assertEquals(1 + 2, ((Integer) FlinkCompletableFuture.completed(1).thenCombine(FlinkCompletableFuture.completed(2), new BiFunction<Integer, Integer, Integer>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.17
            public Integer apply(Integer num, Integer num2) {
                return Integer.valueOf(num.intValue() + num2.intValue());
            }
        }).get()).intValue());
    }

    @Test(timeout = 10000)
    public void testMultipleFunctionsOnCompleteFuture() throws Exception {
        FlinkCompletableFuture completed = FlinkCompletableFuture.completed("test");
        Future handleAsync = completed.handleAsync(new BiFunction<String, Throwable, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.18
            public String apply(String str, Throwable th) {
                return str != null ? str : th.getMessage();
            }
        }, executor);
        Future thenAcceptAsync = completed.thenAcceptAsync(new AcceptFunction<String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.19
            public void accept(String str) {
            }
        }, executor);
        Assert.assertEquals("test", handleAsync.get());
        Assert.assertNull(thenAcceptAsync.get());
    }

    @Test(timeout = 10000)
    public void testMultipleFunctionsOnIncompleteFuture() throws Exception {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        Future handleAsync = flinkCompletableFuture.handleAsync(new BiFunction<String, Throwable, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.20
            public String apply(String str, Throwable th) {
                return str != null ? str : th.getMessage();
            }
        }, executor);
        Future thenAcceptAsync = flinkCompletableFuture.thenAcceptAsync(new AcceptFunction<String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.21
            public void accept(String str) {
            }
        }, executor);
        flinkCompletableFuture.complete("value");
        Assert.assertEquals("value", handleAsync.get());
        Assert.assertNull(thenAcceptAsync.get());
    }

    @Test(timeout = 10000)
    public void testMultipleFunctionsExceptional() throws Exception {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        Future handleAsync = flinkCompletableFuture.handleAsync(new BiFunction<String, Throwable, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.22
            public String apply(String str, Throwable th) {
                return str != null ? str : th.getMessage();
            }
        }, executor);
        Future thenAcceptAsync = flinkCompletableFuture.thenAcceptAsync(new AcceptFunction<String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.23
            public void accept(String str) {
            }
        }, executor);
        flinkCompletableFuture.completeExceptionally(new TestException("test"));
        Assert.assertEquals("test", handleAsync.get());
        try {
            thenAcceptAsync.get();
            Assert.fail("We should have caught an ExecutionException.");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof TestException);
        }
    }

    @Test(timeout = 10000)
    public void testChainedFutureExceptionalCompletion() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        Future exceptionallyAsync = flinkCompletableFuture.thenApplyAsync(new ApplyFunction<String, String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.24
            public String apply(String str) {
                return str;
            }
        }, executor).exceptionallyAsync(new ApplyFunction<Throwable, Throwable>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.25
            public Throwable apply(Throwable th) {
                return th;
            }
        }, executor);
        Future exceptionallyAsync2 = flinkCompletableFuture.thenAcceptAsync(new AcceptFunction<String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.26
            public void accept(String str) {
            }
        }, executor).exceptionallyAsync(new ApplyFunction<Throwable, Throwable>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.27
            public Throwable apply(Throwable th) {
                return th;
            }
        }, executor);
        Future exceptionallyAsync3 = flinkCompletableFuture.thenAcceptAsync(new AcceptFunction<String>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.28
            public void accept(String str) {
            }
        }, executor).exceptionallyAsync(new ApplyFunction<Throwable, Throwable>() { // from class: org.apache.flink.runtime.concurrent.FlinkFutureTest.29
            public Throwable apply(Throwable th) {
                return th;
            }
        }, executor);
        TestException testException = new TestException("test");
        flinkCompletableFuture.completeExceptionally(testException);
        Assert.assertEquals(testException, exceptionallyAsync.get());
        Assert.assertEquals(testException, exceptionallyAsync2.get());
        Assert.assertEquals(testException, exceptionallyAsync3.get());
    }
}
