/*
 * Decompiled with CFR 0.152.
 */
package tech.picnic.rx;

import com.google.common.util.concurrent.MoreExecutors;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.Scheduler;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.schedulers.TestScheduler;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import tech.picnic.rx.PicnicRxPlugins;
import tech.picnic.rx.RxThreadLocal;

@Test(singleThreaded=true)
public final class PicnicRxPluginsTest {
    private final ConcurrentMap<Context, AtomicInteger> verificationCounters = new ConcurrentHashMap<Context, AtomicInteger>();

    @BeforeClass
    void init() {
        PicnicRxPlugins.configureContextPropagation((RxThreadLocal[])new RxThreadLocal[]{Context.createRxThreadLocal()});
    }

    @AfterClass
    void clean() {
        PicnicRxPlugins.unsetContextPropagation();
    }

    public void testPropagate() {
        Observable obs = Observable.just((Object)1, (Object)2, (Object)3);
        Context ctx1 = Context.createRandom().applyToCurrentThread();
        RxJavaPlugins.setScheduleHandler(null);
        Context ctx2 = Context.createEmpty();
        obs.subscribeOn(Schedulers.io()).doOnNext(i -> this.verifyActive(ctx2)).ignoreElements().blockingAwait();
        this.verifyVerificationCounter(ctx2, 3);
        this.init();
        obs.subscribeOn(Schedulers.io()).doOnNext(i -> this.verifyActive(ctx1)).ignoreElements().blockingAwait();
        this.verifyVerificationCounter(ctx1, 3);
    }

    public void testObserveOnAnotherScheduler() {
        Observable obs = Observable.just((Object)1, (Object)2, (Object)3);
        Context ctx = Context.createRandom().applyToCurrentThread();
        obs.subscribeOn(Schedulers.io()).doOnNext(i -> this.verifyActive(ctx)).observeOn(Schedulers.io()).doOnNext(i -> this.verifyActive(ctx)).ignoreElements().blockingAwait();
        this.verifyVerificationCounter(ctx, 6);
    }

    public void testPropagateOnMultipleSchedulers() {
        Observable obs = Observable.just((Object)1, (Object)2, (Object)3);
        Context ctx = Context.createRandom().applyToCurrentThread();
        obs.flatMap(i -> Observable.just((Object)i).subscribeOn(Schedulers.io())).doOnNext(i -> this.verifyActive(ctx)).flatMap(i -> Observable.just((Object)i).subscribeOn(Schedulers.computation())).doOnNext(i -> this.verifyActive(ctx)).ignoreElements().blockingAwait();
        this.verifyVerificationCounter(ctx, 6);
    }

    public void testNotLeaking() throws InterruptedException {
        Observable obs = Observable.just((Object)1, (Object)2, (Object)3);
        Scheduler singleScheduler = Schedulers.single();
        Context ctx1 = Context.createRandom().applyToCurrentThread();
        obs.subscribeOn(singleScheduler).doOnNext(i -> this.verifyActive(ctx1)).ignoreElements().blockingAwait();
        this.verifyVerificationCounter(ctx1, 3);
        Context ctx2 = Context.createRandom().applyToCurrentThread();
        obs.subscribeOn(singleScheduler).doOnNext(i -> this.verifyActive(ctx2)).ignoreElements().blockingAwait();
        this.verifyVerificationCounter(ctx2, 3);
        Context ctx3 = Context.createEmpty();
        Runnable runnable = () -> obs.subscribeOn(singleScheduler).doOnNext(i -> this.verifyActive(ctx3)).ignoreElements().blockingAwait();
        Thread thread = new Thread(runnable);
        thread.start();
        thread.join();
        this.verifyVerificationCounter(ctx3, 3);
        Context ctx4 = Context.createEmpty().applyToCurrentThread();
        obs.subscribeOn(singleScheduler).doOnNext(i -> this.verifyActive(ctx4)).ignoreElements().blockingAwait();
        this.verifyVerificationCounter(ctx4, 3);
    }

    public void testContextSwitchBeforeConsumption() {
        Observable obs = Observable.just((Object)1, (Object)2, (Object)3);
        Context ctx1 = Context.createRandom().applyToCurrentThread();
        Observable obsOnScheduler = obs.subscribeOn(Schedulers.single());
        Context ctx2 = Context.createRandom().applyToCurrentThread();
        obsOnScheduler.doOnNext(i -> this.verifyActive(ctx2)).ignoreElements().blockingAwait();
        this.verifyVerificationCounter(ctx1, 0);
        this.verifyVerificationCounter(ctx2, 3);
    }

    public void testPropagationOnSameThread() {
        Observable obs = Observable.just((Object)1, (Object)2, (Object)3);
        Context ctx = Context.createRandom().applyToCurrentThread();
        obs.subscribeOn(Schedulers.from((Executor)MoreExecutors.directExecutor())).doOnNext(i -> this.verifyActive(ctx)).ignoreElements().blockingAwait();
        this.verifyVerificationCounter(ctx, 3);
        this.verifyActive(ctx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testEmptyContextIsPropagated() {
        Observable obs = Observable.just((Object)1, (Object)2, (Object)3);
        ExecutorService es = Executors.newSingleThreadExecutor();
        try {
            es.execute(() -> Context.createRandom().applyToCurrentThread());
            Context ctx = Context.createEmpty().applyToCurrentThread();
            obs.subscribeOn(Schedulers.from((Executor)es)).doOnNext(i -> this.verifyActive(ctx)).ignoreElements().blockingAwait();
            this.verifyVerificationCounter(ctx, 3);
            this.verifyActive(ctx);
        }
        finally {
            es.shutdownNow();
        }
    }

    public void testTestSchedulerContextPropagation() throws InterruptedException {
        Context ctx = Context.createRandom().applyToCurrentThread();
        TestScheduler testScheduler = new TestScheduler();
        Observable observable = Observable.just((Object)1, (Object)2, (Object)3).subscribeOn((Scheduler)testScheduler).doOnNext(i -> this.verifyActive(ctx));
        TestObserver observer = (TestObserver)observable.subscribeWith((Observer)TestObserver.create());
        testScheduler.advanceTimeBy(2L, TimeUnit.SECONDS);
        observer.await();
        this.verifyVerificationCounter(ctx, 3);
    }

    public void testDirectExecutor() {
        Context ctx = Context.createRandom().applyToCurrentThread();
        TestScheduler io = new TestScheduler();
        io.createWorker().schedule(() -> this.verifyActive(ctx));
        io.triggerActions();
        this.verifyVerificationCounter(ctx, 1);
    }

    private void verifyActive(Context ctx) {
        ctx.verifyCurrentThread();
        this.verificationCounters.computeIfAbsent(ctx, v -> new AtomicInteger()).incrementAndGet();
    }

    private void verifyVerificationCounter(Context ctx, int expected) {
        Assert.assertEquals((int)this.verificationCounters.getOrDefault(ctx, new AtomicInteger()).intValue(), (int)expected);
    }

    static final class Context {
        private static final ThreadLocal<String> threadLocalContext = new ThreadLocal();
        private final Optional<String> token;

        private Context(Optional<String> token) {
            this.token = token;
        }

        static Context createEmpty() {
            return new Context(Optional.empty());
        }

        static Context createRandom() {
            return new Context(Optional.of(UUID.randomUUID().toString()));
        }

        Context applyToCurrentThread() {
            if (this.token.isPresent()) {
                threadLocalContext.set(this.token.get());
            } else {
                threadLocalContext.remove();
            }
            return this;
        }

        void verifyCurrentThread() {
            Assert.assertEquals((String)threadLocalContext.get(), (String)this.token.orElse(null));
        }

        static RxThreadLocal<String> createRxThreadLocal() {
            return RxThreadLocal.from(threadLocalContext);
        }
    }
}

