package io.servicetalk.concurrent.test.internal;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.utils.internal.ThrowableUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/* loaded from: input_file:io/servicetalk/concurrent/test/internal/TestPublisherSubscriber.class */
public final class TestPublisherSubscriber<T> implements PublisherSource.Subscriber<T> {
    private static final Object NULL_ON_NEXT;
    private final AtomicLong outstandingDemand;
    private final BlockingQueue<Object> items;
    private final CountDownLatch onTerminalLatch;
    private final CountDownLatch onSubscribeLatch;

    @Nullable
    private TerminalNotification onTerminal;

    @Nullable
    private PublisherSource.Subscription subscription;
    static final /* synthetic */ boolean $assertionsDisabled;

    public TestPublisherSubscriber() {
        this(0L);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TestPublisherSubscriber(long j) {
        this.items = new LinkedBlockingQueue();
        this.onTerminalLatch = new CountDownLatch(1);
        this.onSubscribeLatch = new CountDownLatch(1);
        this.outstandingDemand = new AtomicLong(j);
    }

    public void onSubscribe(final PublisherSource.Subscription subscription) {
        Objects.requireNonNull(subscription, "Null Subscription is not permitted https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.13");
        verifyNoTerminal("onSubscribe", null, false);
        if (this.subscription != null) {
            throw new IllegalStateException("The Subscription has already been set to " + this.subscription + ". New Subscription " + subscription + " is not supported.");
        }
        this.subscription = new PublisherSource.Subscription() { // from class: io.servicetalk.concurrent.test.internal.TestPublisherSubscriber.1
            public void request(long j) {
                if (j > 0) {
                    try {
                        TestPublisherSubscriber.this.outstandingDemand.accumulateAndGet(j, FlowControlUtils::addWithOverflowProtection);
                    } finally {
                        subscription.request(j);
                    }
                }
            }

            public void cancel() {
                subscription.cancel();
            }
        };
        this.onSubscribeLatch.countDown();
    }

    public void onNext(@Nullable T t) {
        verifyOnSubscribedAndNoTerminal("onNext", t, true);
        if (this.outstandingDemand.decrementAndGet() < 0) {
            throw new IllegalStateException("Too many onNext signals relative to Subscription request(n). https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.1");
        }
        this.items.add(wrapNull(t));
    }

    public void onError(Throwable th) {
        verifyOnSubscribedAndNoTerminal("onError", th, true);
        this.onTerminal = TerminalNotification.error(th);
        this.onTerminalLatch.countDown();
    }

    public void onComplete() {
        verifyOnSubscribedAndNoTerminal("onComplete", null, false);
        this.onTerminal = TerminalNotification.complete();
        this.onTerminalLatch.countDown();
    }

    private void verifyNoTerminal(String str, @Nullable Object obj, boolean z) {
        if (this.onTerminal != null) {
            throw new IllegalStateException("Subscriber has already terminated [" + this.onTerminal + "] " + str + (z ? " [ " + obj + "]" : "") + " is not valid. See https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7");
        }
    }

    private void verifyOnSubscribedAndNoTerminal(String str, @Nullable Object obj, boolean z) {
        verifyNoTerminal(str, obj, z);
        if (this.subscription == null) {
            throw new IllegalStateException("onSubscribe must be called before any other signals. https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9");
        }
    }

    public PublisherSource.Subscription awaitSubscription() {
        AwaitUtils.await(this.onSubscribeLatch);
        if ($assertionsDisabled || this.subscription != null) {
            return this.subscription;
        }
        throw new AssertionError();
    }

    @Nullable
    public T takeOnNext() {
        return (T) unwrapNull(AwaitUtils.take(this.items));
    }

    public List<T> takeOnNext(int i) {
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            while (true) {
                try {
                    break;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    ThrowableUtils.throwException(e);
                }
            }
            arrayList.add(unwrapNull(this.items.take()));
        }
        return arrayList;
    }

    public List<T> pollAllOnNext() {
        ArrayList arrayList = new ArrayList();
        this.items.drainTo(arrayList);
        return (List) arrayList.stream().map(TestPublisherSubscriber::unwrapNull).collect(Collectors.toList());
    }

    @Nullable
    public Supplier<T> pollOnNext(long j, TimeUnit timeUnit) {
        Object poll = AwaitUtils.poll(this.items, j, timeUnit);
        if (poll == null) {
            return null;
        }
        return () -> {
            return unwrapNull(poll);
        };
    }

    public Throwable awaitOnError() {
        return awaitOnError(true);
    }

    Throwable awaitOnError(boolean z) {
        AwaitUtils.await(this.onTerminalLatch);
        if (!$assertionsDisabled && this.onTerminal == null) {
            throw new AssertionError();
        }
        if (this.onTerminal == TerminalNotification.complete()) {
            throw new IllegalStateException("wanted onError but Subscriber terminated with onComplete");
        }
        if (!$assertionsDisabled && this.onTerminal.cause() == null) {
            throw new AssertionError();
        }
        if (z) {
            verifyAllOnNextProcessed();
        }
        return this.onTerminal.cause();
    }

    public void awaitOnComplete() {
        awaitOnComplete(true);
    }

    void awaitOnComplete(boolean z) {
        AwaitUtils.await(this.onTerminalLatch);
        if (!$assertionsDisabled && this.onTerminal == null) {
            throw new AssertionError();
        }
        if (this.onTerminal != TerminalNotification.complete()) {
            throw new IllegalStateException("wanted onComplete but Subscriber terminated with onError", this.onTerminal.cause());
        }
        if (z) {
            verifyAllOnNextProcessed();
        }
    }

    @Nullable
    public Supplier<Throwable> pollTerminal(long j, TimeUnit timeUnit) {
        if (!AwaitUtils.await(this.onTerminalLatch, j, timeUnit)) {
            return null;
        }
        if ($assertionsDisabled || this.onTerminal != null) {
            return new Supplier<Throwable>() { // from class: io.servicetalk.concurrent.test.internal.TestPublisherSubscriber.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.function.Supplier
                @Nullable
                public Throwable get() {
                    return TestPublisherSubscriber.this.onTerminal.cause();
                }

                public String toString() {
                    return TestPublisherSubscriber.this.onTerminal.toString();
                }
            };
        }
        throw new AssertionError();
    }

    private void verifyAllOnNextProcessed() {
        if (this.items.isEmpty()) {
            return;
        }
        StringBuilder sb = new StringBuilder();
        int i = 0;
        while (true) {
            Object poll = this.items.poll();
            if (poll == null) {
                break;
            }
            i++;
            sb.append('[').append(unwrapNull(poll)).append("] ");
        }
        throw new IllegalStateException(i + " onNext items were not processed: " + ((Object) sb));
    }

    private static Object wrapNull(@Nullable Object obj) {
        return obj == null ? NULL_ON_NEXT : obj;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    @Nullable
    public static <T> T unwrapNull(Object obj) {
        if (obj == NULL_ON_NEXT) {
            return null;
        }
        return obj;
    }

    static {
        $assertionsDisabled = !TestPublisherSubscriber.class.desiredAssertionStatus();
        NULL_ON_NEXT = new Object();
    }
}
