package org.apache.flink.core.state;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.core.state.StateFutureImpl;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/core/state/StateFutureTest.class */
public class StateFutureTest {
    static StateFutureImpl.AsyncFrameworkExceptionHandler exceptionHandler = (str, th) -> {
        throw new RuntimeException(str, th);
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/core/state/StateFutureTest$MockValueState.class */
    public static class MockValueState {
        AtomicInteger value = new AtomicInteger(0);
        ExecutorService stateExecutor = Executors.newFixedThreadPool(3);
        StateFutureImpl.CallbackRunner runner;

        MockValueState(ExecutorService executorService) {
            this.runner = new TestCallbackRunner(executorService);
        }

        StateFuture<Integer> get() {
            StateFutureImpl stateFutureImpl = new StateFutureImpl(this.runner, StateFutureTest.exceptionHandler);
            this.stateExecutor.submit(() -> {
                if (ThreadLocalRandom.current().nextInt() > 0) {
                    try {
                        Thread.sleep(r0 % 1000);
                    } catch (Throwable th) {
                    }
                }
                stateFutureImpl.complete(Integer.valueOf(this.value.getAndIncrement()));
            });
            return stateFutureImpl;
        }
    }

    /* loaded from: input_file:org/apache/flink/core/state/StateFutureTest$TestCallbackRunner.class */
    private static class TestCallbackRunner implements StateFutureImpl.CallbackRunner {
        private final ExecutorService stateExecutor;

        TestCallbackRunner(ExecutorService executorService) {
            this.stateExecutor = executorService;
        }

        public void submit(ThrowingRunnable throwingRunnable) {
            if (this.stateExecutor == null) {
                ThrowingRunnable.unchecked(throwingRunnable).run();
            } else {
                this.stateExecutor.submit(() -> {
                    ThrowingRunnable.unchecked(throwingRunnable).run();
                });
            }
        }
    }

    StateFutureTest() {
    }

    @Test
    void basicSyncComplete() {
        TestCallbackRunner testCallbackRunner = new TestCallbackRunner(null);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        StateFutureImpl stateFutureImpl = new StateFutureImpl(testCallbackRunner, exceptionHandler);
        atomicInteger.getClass();
        stateFutureImpl.thenAccept((v1) -> {
            r1.addAndGet(v1);
        });
        Assertions.assertThat(atomicInteger).hasValue(0);
        stateFutureImpl.complete(5);
        Assertions.assertThat(atomicInteger).hasValue(5);
        StateFutureImpl stateFutureImpl2 = new StateFutureImpl(testCallbackRunner, exceptionHandler);
        StateFuture thenApply = stateFutureImpl2.thenApply(num -> {
            return String.valueOf(atomicInteger.addAndGet(num.intValue()));
        });
        Assertions.assertThat(atomicInteger).hasValue(5);
        stateFutureImpl2.complete(3);
        Assertions.assertThat(atomicInteger).hasValue(8);
        thenApply.thenAccept(str -> {
            atomicInteger.addAndGet(-Integer.parseInt(str));
        });
        Assertions.assertThat(atomicInteger).hasValue(0);
        StateFutureImpl stateFutureImpl3 = new StateFutureImpl(testCallbackRunner, exceptionHandler);
        StateFutureImpl stateFutureImpl4 = new StateFutureImpl(testCallbackRunner, exceptionHandler);
        StateFuture thenCompose = stateFutureImpl3.thenCompose(num2 -> {
            atomicInteger.addAndGet(num2.intValue());
            return stateFutureImpl4;
        });
        atomicInteger.getClass();
        thenCompose.thenAccept((v1) -> {
            r1.addAndGet(v1);
        });
        Assertions.assertThat(atomicInteger).hasValue(0);
        stateFutureImpl3.complete(6);
        Assertions.assertThat(atomicInteger).hasValue(6);
        stateFutureImpl4.complete(3);
        Assertions.assertThat(atomicInteger).hasValue(9);
        StateFutureImpl stateFutureImpl5 = new StateFutureImpl(testCallbackRunner, exceptionHandler);
        StateFutureImpl stateFutureImpl6 = new StateFutureImpl(testCallbackRunner, exceptionHandler);
        stateFutureImpl5.thenCombine(stateFutureImpl6, (num3, num4) -> {
            atomicInteger.addAndGet(num3.intValue() - num4.intValue());
            return StateFutureUtils.completedVoidFuture();
        });
        Assertions.assertThat(atomicInteger).hasValue(9);
        stateFutureImpl5.complete(4);
        Assertions.assertThat(atomicInteger).hasValue(9);
        stateFutureImpl6.complete(13);
        Assertions.assertThat(atomicInteger).hasValue(0);
        StateFuture completedFuture = StateFutureUtils.completedFuture(3);
        atomicInteger.getClass();
        completedFuture.thenAccept((v1) -> {
            r1.addAndGet(v1);
        });
        Assertions.assertThat(atomicInteger).hasValue(3);
        atomicInteger.set(0);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            arrayList.add(new StateFutureImpl(testCallbackRunner, exceptionHandler));
        }
        StateFutureUtils.combineAll(arrayList).thenAccept(collection -> {
            int i2 = 0;
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                i2 = (i2 * 10) + ((Integer) it.next()).intValue();
            }
            atomicInteger.addAndGet(i2);
        });
        Assertions.assertThat(atomicInteger).hasValue(0);
        for (int i2 = 0; i2 < 5; i2++) {
            ((StateFutureImpl) arrayList.get(i2)).complete(Integer.valueOf(i2 + 1));
            if (i2 != 4) {
                Assertions.assertThat(atomicInteger).hasValue(0);
            }
        }
        Assertions.assertThat(atomicInteger).hasValue(12345);
    }

    @Test
    void testRunOnCorrectThread() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        atomicInteger.getClass();
        ThreadLocal withInitial = ThreadLocal.withInitial(atomicInteger::getAndIncrement);
        AtomicReference atomicReference = new AtomicReference();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Assertions.assertThat((Integer) withInitial.get()).isZero();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory(getClass().getSimpleName()));
        newSingleThreadExecutor.execute(() -> {
            try {
                try {
                    Assertions.assertThat((Integer) withInitial.get()).isOne();
                    countDownLatch.countDown();
                } catch (Throwable th) {
                    atomicReference.set(th);
                    countDownLatch.countDown();
                }
            } catch (Throwable th2) {
                countDownLatch.countDown();
                throw th2;
            }
        });
        countDownLatch.await(20L, TimeUnit.SECONDS);
        Assertions.assertThat(countDownLatch.getCount()).isZero();
        Assertions.assertThat(atomicReference).hasValue((Object) null);
        MockValueState mockValueState = new MockValueState(newSingleThreadExecutor);
        Runnable runnable = () -> {
            try {
                Assertions.assertThat((Integer) withInitial.get()).isOne();
            } catch (Throwable th) {
                atomicReference.set(th);
            }
        };
        CountDownLatch countDownLatch2 = new CountDownLatch(2);
        ArrayList arrayList = new ArrayList();
        newSingleThreadExecutor.execute(() -> {
            ArrayList arrayList2 = new ArrayList();
            for (int i = 0; i < 5; i++) {
                arrayList2.add(mockValueState.get());
            }
            StateFutureUtils.combineAll(arrayList2).thenCombine(mockValueState.get(), (collection, num) -> {
                arrayList.addAll(collection);
                arrayList.add(num);
                runnable.run();
                return 0;
            }).thenCompose(num2 -> {
                runnable.run();
                return mockValueState.get();
            }).thenApply(num3 -> {
                arrayList.add(num3);
                runnable.run();
                return 0;
            }).thenAccept(num4 -> {
                runnable.run();
                countDownLatch2.countDown();
            });
            countDownLatch2.countDown();
        });
        countDownLatch2.await(20L, TimeUnit.SECONDS);
        Assertions.assertThat(countDownLatch2.getCount()).isZero();
        Assertions.assertThat((Throwable) atomicReference.get()).isNull();
        Assertions.assertThat(arrayList).hasSize(7);
    }
}
