/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.channel;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.NettyPipeline;
import reactor.ipc.netty.channel.ContextHandler;
import reactor.ipc.netty.channel.FluxReceive;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

public class ChannelOperations<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound>
implements NettyInbound,
NettyOutbound,
NettyContext,
CoreSubscriber<Void> {
    final BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> handler;
    final Channel channel;
    final FluxReceive inbound;
    final DirectProcessor<Void> onInactive;
    final ContextHandler<?> context;
    volatile Subscription outboundSubscription;
    protected static final AttributeKey<ChannelOperations> OPERATIONS_KEY = AttributeKey.newInstance("nettyOperations");
    static final Logger log = Loggers.getLogger(ChannelOperations.class);
    static final BiFunction PING = (i, o) -> Flux.empty();
    static final AtomicReferenceFieldUpdater<ChannelOperations, Subscription> OUTBOUND_CLOSE = AtomicReferenceFieldUpdater.newUpdater(ChannelOperations.class, Subscription.class, "outboundSubscription");

    public static <INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> ChannelOperations<INBOUND, OUTBOUND> bind(Channel channel, BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> handler, ContextHandler<?> context) {
        ChannelOperations<? super INBOUND, ? super OUTBOUND> ops = new ChannelOperations<INBOUND, OUTBOUND>(channel, handler, context);
        return ops;
    }

    public static <INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> noopHandler() {
        return PING;
    }

    public static ChannelOperations<?, ?> get(Channel ch) {
        return ch.attr(OPERATIONS_KEY).get();
    }

    static ChannelOperations<?, ?> tryGetAndSet(Channel ch, ChannelOperations<?, ?> ops) {
        Attribute<ChannelOperations> attr = ch.attr(OPERATIONS_KEY);
        do {
            ChannelOperations op;
            if ((op = attr.get()) == null) continue;
            return op;
        } while (!attr.compareAndSet(null, ops));
        return null;
    }

    protected ChannelOperations(Channel channel, ChannelOperations<INBOUND, OUTBOUND> replaced) {
        this(channel, replaced.handler, replaced.context, replaced.onInactive);
    }

    protected ChannelOperations(Channel channel, BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> handler, ContextHandler<?> context) {
        this(channel, handler, context, DirectProcessor.create());
    }

    protected ChannelOperations(Channel channel, BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> handler, ContextHandler<?> context, DirectProcessor<Void> processor) {
        this.handler = Objects.requireNonNull(handler, "handler");
        this.channel = Objects.requireNonNull(channel, "channel");
        this.context = Objects.requireNonNull(context, "context");
        this.inbound = new FluxReceive(this);
        this.onInactive = processor;
        Subscription[] _s = new Subscription[1];
        Mono.fromDirect(context.onCloseOrRelease(channel)).doOnSubscribe(s2 -> {
            _s[0] = s2;
        }).subscribe((CoreSubscriber<Void>)this.onInactive);
        if (_s[0] != null) {
            this.onInactive.subscribe(null, null, _s[0]::cancel);
        }
    }

    @Override
    public InetSocketAddress address() {
        Channel c = this.channel();
        if (c instanceof SocketChannel) {
            return ((SocketChannel)c).remoteAddress();
        }
        if (c instanceof DatagramChannel) {
            InetSocketAddress a = ((DatagramChannel)c).remoteAddress();
            return a != null ? a : ((DatagramChannel)c).localAddress();
        }
        throw new IllegalStateException("Does not have an InetSocketAddress");
    }

    @Override
    public final Channel channel() {
        return this.channel;
    }

    @Override
    public final NettyContext context() {
        return this;
    }

    @Override
    public ChannelOperations<INBOUND, OUTBOUND> context(Consumer<NettyContext> contextCallback) {
        contextCallback.accept(this.context());
        return this;
    }

    @Override
    public void dispose() {
        this.inbound.cancel();
        this.channel.close();
    }

    @Override
    public final boolean isDisposed() {
        return !this.channel().isActive() || ChannelOperations.get(this.channel()) != this;
    }

    @Override
    public final Mono<Void> onClose() {
        return Mono.fromDirect(this.onInactive);
    }

    @Override
    public NettyContext onClose(Runnable onClose) {
        this.onInactive.subscribe(null, e -> onClose.run(), onClose);
        return this;
    }

    @Override
    public final void onComplete() {
        Subscription s2 = OUTBOUND_CLOSE.getAndSet(this, Operators.cancelledSubscription());
        if (s2 == Operators.cancelledSubscription() || this.isDisposed()) {
            return;
        }
        this.onOutboundComplete();
    }

    @Override
    public final void onError(Throwable t) {
        Subscription s2 = OUTBOUND_CLOSE.getAndSet(this, Operators.cancelledSubscription());
        if (s2 == Operators.cancelledSubscription() || this.isDisposed()) {
            if (log.isDebugEnabled()) {
                log.error("An outbound error could not be processed", t);
            }
            return;
        }
        this.onOutboundError(t);
    }

    @Override
    public final void onNext(Void aVoid) {
    }

    @Override
    public final void onSubscribe(Subscription s2) {
        if (Operators.setOnce(OUTBOUND_CLOSE, this, s2)) {
            s2.request(Long.MAX_VALUE);
        }
    }

    @Override
    public Flux<?> receiveObject() {
        return this.inbound;
    }

    @Override
    public final InetSocketAddress remoteAddress() {
        return (InetSocketAddress)this.channel.remoteAddress();
    }

    public String toString() {
        return this.channel.toString();
    }

    protected final boolean isInboundCancelled() {
        return this.inbound.isCancelled();
    }

    protected final boolean isInboundDisposed() {
        return this.inbound.isDisposed();
    }

    protected final BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> handler() {
        return this.handler;
    }

    protected void onHandlerStart() {
        this.applyHandler();
        this.context.fireContextActive(this);
    }

    protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
        if (msg == null) {
            this.onInboundError(new NullPointerException("msg is null"));
            return;
        }
        this.inbound.onInboundNext(msg);
    }

    protected final boolean replace(ChannelOperations<?, ?> ops) {
        return this.channel.attr(OPERATIONS_KEY).compareAndSet(this, ops);
    }

    protected void onInboundCancel() {
    }

    protected void onInboundComplete() {
        this.inbound.onInboundComplete();
    }

    protected void onInboundClose() {
        this.onHandlerTerminate();
    }

    protected void onOutboundComplete() {
        if (log.isDebugEnabled()) {
            log.debug("[{}] {} User Handler requesting close connection", this.formatName(), this.channel());
        }
        this.markPersistent(false);
        this.onHandlerTerminate();
    }

    protected void onOutboundError(Throwable err) {
        this.markPersistent(false);
        this.onHandlerTerminate();
    }

    protected final void applyHandler() {
        if (log.isDebugEnabled()) {
            log.debug("[{}] {} handler is being applied: {}", this.formatName(), this.channel(), this.handler);
        }
        try {
            Mono.fromDirect(this.handler.apply(this, this)).subscribe(this);
        }
        catch (Throwable t) {
            log.error("", t);
            this.channel.close();
        }
    }

    protected final void onHandlerTerminate() {
        if (this.replace(null)) {
            if (log.isTraceEnabled()) {
                log.trace("{} Disposing ChannelOperation from a channel", this.channel(), new Exception("ChannelOperation terminal stack"));
            }
            try {
                Operators.terminate(OUTBOUND_CLOSE, this);
                this.onInactive.onComplete();
                this.onInboundComplete();
            }
            finally {
                this.channel.pipeline().fireUserEventTriggered(NettyPipeline.handlerTerminatedEvent());
            }
        }
    }

    protected final void discard() {
        if (log.isDebugEnabled()) {
            log.debug("{} Discarding inbound content", this.channel);
        }
        this.inbound.discard();
    }

    protected final void onInboundError(Throwable err) {
        this.inbound.onInboundError(err);
    }

    protected final ContextHandler<?> parentContext() {
        return this.context;
    }

    protected final String formatName() {
        return this.getClass().getSimpleName().replace("Operations", "");
    }

    @Override
    public Context currentContext() {
        return this.context.sink.currentContext();
    }

    @FunctionalInterface
    public static interface OnNew<CHANNEL extends Channel> {
        public ChannelOperations<?, ?> create(CHANNEL var1, ContextHandler<?> var2, Object var3);
    }
}

