package io.servicetalk.transport.netty.internal;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCounted;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.internal.SubscribablePublisher;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import java.util.ArrayDeque;
import java.util.Objects;
import java.util.Queue;
import java.util.function.Predicate;
import javax.annotation.Nullable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/servicetalk/transport/netty/internal/NettyChannelPublisher.class */
public final class NettyChannelPublisher<T> extends SubscribablePublisher<T> {
    private long requestCount;
    private boolean requested;
    private boolean inProcessPending;

    @Nullable
    private NettyChannelPublisher<T>.SubscriptionImpl subscription;

    @Nullable
    private Queue<Object> pending;

    @Nullable
    private Throwable fatalError;
    private final Channel channel;
    private final CloseHandler closeHandler;
    private final EventLoop eventLoop;
    private final Predicate<T> terminalSignalPredicate;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/servicetalk/transport/netty/internal/NettyChannelPublisher$SubscriptionImpl.class */
    public final class SubscriptionImpl implements PublisherSource.Subscription {
        final PublisherSource.Subscriber<? super T> associatedSub;

        private SubscriptionImpl(PublisherSource.Subscriber<? super T> subscriber) {
            this.associatedSub = subscriber;
        }

        public void request(long j) {
            if (NettyChannelPublisher.this.eventLoop.inEventLoop()) {
                NettyChannelPublisher.this.requestN(j, this);
            } else {
                NettyChannelPublisher.this.eventLoop.execute(() -> {
                    NettyChannelPublisher.this.requestN(j, this);
                });
            }
        }

        public void cancel() {
            if (NettyChannelPublisher.this.eventLoop.inEventLoop()) {
                NettyChannelPublisher.this.cancel(this);
            } else {
                NettyChannelPublisher.this.eventLoop.execute(() -> {
                    NettyChannelPublisher.this.cancel(this);
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyChannelPublisher(Channel channel, Predicate<T> predicate, CloseHandler closeHandler) {
        this.eventLoop = channel.eventLoop();
        this.channel = channel;
        this.closeHandler = closeHandler;
        this.terminalSignalPredicate = (Predicate) Objects.requireNonNull(predicate);
    }

    protected void handleSubscribe(PublisherSource.Subscriber<? super T> subscriber) {
        if (this.eventLoop.inEventLoop()) {
            subscribe0(subscriber);
        } else {
            this.eventLoop.execute(() -> {
                subscribe0(subscriber);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void channelRead(T t) {
        assertInEventloop();
        if (t instanceof ReferenceCounted) {
            channelReadReferenceCounted((ReferenceCounted) t);
            return;
        }
        if (this.fatalError != null) {
            return;
        }
        if (this.subscription != null && !shouldBuffer()) {
            emit(this.subscription, t);
            return;
        }
        addPending(t);
        if (this.subscription != null) {
            processPending(this.subscription);
        }
    }

    private void channelReadReferenceCounted(ReferenceCounted referenceCounted) {
        try {
            referenceCounted.release();
        } finally {
            this.pending = null;
            if (this.fatalError == null) {
                this.fatalError = new IllegalArgumentException("Reference counted leaked netty's pipeline. Object: " + referenceCounted.getClass().getSimpleName());
                exceptionCaught0(this.fatalError);
            }
            ChannelCloseUtils.close(this.channel, this.fatalError);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onReadComplete() {
        assertInEventloop();
        this.requested = false;
        if (this.requestCount > 0) {
            requestChannel();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void exceptionCaught(Throwable th) {
        assertInEventloop();
        if (this.fatalError != null) {
            return;
        }
        exceptionCaught0(th);
    }

    private void exceptionCaught0(Throwable th) {
        ChannelCloseUtils.assignConnectionError(this.channel, th);
        if (this.subscription != null && !shouldBuffer()) {
            sendErrorToTarget(this.subscription, th);
            return;
        }
        addPending(TerminalNotification.error(th));
        if (this.subscription != null) {
            processPending(this.subscription);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void channelInboundClosed(Throwable th) {
        assertInEventloop();
        if (this.fatalError == null) {
            this.fatalError = th;
            exceptionCaught0(this.fatalError);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void requestN(long j, NettyChannelPublisher<T>.SubscriptionImpl subscriptionImpl) {
        if (subscriptionImpl != this.subscription) {
            return;
        }
        if (!SubscriberUtils.isRequestNValid(j)) {
            resetSubscription();
            IllegalArgumentException newExceptionForInvalidRequestN = SubscriberUtils.newExceptionForInvalidRequestN(j);
            subscriptionImpl.associatedSub.onError(newExceptionForInvalidRequestN);
            ChannelCloseUtils.close(this.channel, newExceptionForInvalidRequestN);
            return;
        }
        this.requestCount = FlowControlUtils.addWithOverflowProtection(this.requestCount, j);
        if (processPending(subscriptionImpl) || this.requested || this.requestCount <= 0) {
            return;
        }
        requestChannel();
    }

    private boolean processPending(NettyChannelPublisher<T>.SubscriptionImpl subscriptionImpl) {
        Object poll;
        if (this.inProcessPending || this.pending == null || this.pending.isEmpty()) {
            return false;
        }
        this.inProcessPending = true;
        while (this.requestCount > 0 && (poll = this.pending.poll()) != null) {
            try {
                if (poll instanceof TerminalNotification) {
                    sendErrorToTarget(subscriptionImpl, (TerminalNotification) poll);
                    this.inProcessPending = false;
                    return true;
                }
                if (emit(subscriptionImpl, poll)) {
                    if (this.subscription == subscriptionImpl || this.subscription == null) {
                        return true;
                    }
                    subscriptionImpl = this.subscription;
                }
            } finally {
                this.inProcessPending = false;
            }
        }
        if (!(this.pending.peek() instanceof TerminalNotification)) {
            this.inProcessPending = false;
            return false;
        }
        sendErrorToTarget(subscriptionImpl, (TerminalNotification) this.pending.poll());
        this.inProcessPending = false;
        return true;
    }

    private boolean emit(NettyChannelPublisher<T>.SubscriptionImpl subscriptionImpl, Object obj) {
        this.requestCount--;
        boolean test = this.terminalSignalPredicate.test(obj);
        if (test) {
            resetSubscription();
        }
        try {
            subscriptionImpl.associatedSub.onNext(obj);
            if (!test) {
                return false;
            }
            subscriptionImpl.associatedSub.onComplete();
            return true;
        } catch (Throwable th) {
            sendErrorToTarget(subscriptionImpl, th);
            return true;
        }
    }

    private void sendErrorToTarget(NettyChannelPublisher<T>.SubscriptionImpl subscriptionImpl, TerminalNotification terminalNotification) {
        Throwable cause = terminalNotification.cause();
        if (!$assertionsDisabled && cause == null) {
            throw new AssertionError();
        }
        sendErrorToTarget(subscriptionImpl, cause);
    }

    private void sendErrorToTarget(NettyChannelPublisher<T>.SubscriptionImpl subscriptionImpl, Throwable th) {
        resetSubscription();
        try {
            subscriptionImpl.associatedSub.onError(th);
        } finally {
            closeChannelInbound();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cancel(NettyChannelPublisher<T>.SubscriptionImpl subscriptionImpl) {
        if (subscriptionImpl != this.subscription) {
            return;
        }
        resetSubscription();
        this.pending = null;
        if (this.fatalError == null) {
            this.fatalError = StacklessClosedChannelException.newInstance(NettyChannelPublisher.class, "cancel");
        }
        closeChannelInbound();
    }

    private void closeChannelInbound() {
        this.closeHandler.closeChannelInbound(this.channel);
    }

    private void resetSubscription() {
        this.subscription = null;
        this.requestCount = 0L;
    }

    private void requestChannel() {
        this.requested = true;
        this.channel.read();
    }

    private void addPending(Object obj) {
        if (this.pending == null) {
            this.pending = new ArrayDeque(4);
        }
        this.pending.add(obj);
    }

    private boolean shouldBuffer() {
        return !(this.pending == null || this.pending.isEmpty()) || this.requestCount == 0;
    }

    private void subscribe0(PublisherSource.Subscriber<? super T> subscriber) {
        NettyChannelPublisher<T>.SubscriptionImpl subscriptionImpl = this.subscription;
        if (subscriptionImpl != null) {
            SubscriberUtils.deliverErrorFromSource(subscriber, new DuplicateSubscribeException(subscriptionImpl.associatedSub, subscriber));
            return;
        }
        if (!$assertionsDisabled && this.requestCount != 0) {
            throw new AssertionError();
        }
        NettyChannelPublisher<T>.SubscriptionImpl subscriptionImpl2 = new SubscriptionImpl(subscriber);
        this.subscription = subscriptionImpl2;
        subscriber.onSubscribe(subscriptionImpl2);
        if (subscriptionImpl2 != this.subscription || processPending(subscriptionImpl2) || this.fatalError == null) {
            return;
        }
        if (this.pending == null || this.pending.isEmpty()) {
            sendErrorToTarget(subscriptionImpl2, this.fatalError);
        }
    }

    private void assertInEventloop() {
        if (!$assertionsDisabled && !this.eventLoop.inEventLoop()) {
            throw new AssertionError("Must be called from the associated eventloop.");
        }
    }

    static {
        $assertionsDisabled = !NettyChannelPublisher.class.desiredAssertionStatus();
    }
}
