package nats.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import nats.NatsException;
import nats.client.ConnectionStateListener;
import nats.client.ServerList;
import nats.codec.AbstractClientInboundMessageHandlerAdapter;
import nats.codec.ClientCodec;
import nats.codec.ClientConnectFrame;
import nats.codec.ClientPublishFrame;
import nats.codec.ClientSubscribeFrame;
import nats.codec.ClientUnsubscribeFrame;
import nats.codec.ConnectBody;
import nats.codec.ServerErrorFrame;
import nats.codec.ServerInfoFrame;
import nats.codec.ServerOkFrame;
import nats.codec.ServerPongFrame;
import nats.codec.ServerPublishFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:nats/client/NatsImpl.class */
public class NatsImpl implements Nats {
    private static final Logger LOGGER = LoggerFactory.getLogger(NatsImpl.class);
    private final EventLoopGroup eventLoopGroup;
    private final boolean shutDownEventLoop;
    private Channel channel;
    private final boolean automaticReconnect;
    private final long reconnectTimeWait;
    private final boolean pedantic;
    private final int maxFrameSize;
    private final Executor executor;
    private boolean closed = false;
    private final Object lock = new Object();
    private boolean serverReady = false;
    private final ServerList serverList = new ServerList();
    private final Queue<ClientPublishFrame> publishQueue = new LinkedList();
    private final Map<String, NatsSubscription> subscriptions = new HashMap();
    final List<ConnectionStateListener> listeners = new ArrayList();
    private final AtomicInteger subscriptionId = new AtomicInteger();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: nats.client.NatsImpl$5, reason: invalid class name */
    /* loaded from: input_file:nats/client/NatsImpl$5.class */
    public class AnonymousClass5 extends NatsSubscription {
        AnonymousClass5(String str, String str2, Integer num, String str3, MessageHandler... messageHandlerArr) {
            super(str, str2, num, str3, messageHandlerArr);
        }

        @Override // nats.client.DefaultSubscription, nats.client.Subscription, java.lang.AutoCloseable
        public void close() {
            super.close();
            synchronized (NatsImpl.this.lock) {
                NatsImpl.this.subscriptions.remove(this.id);
                if (NatsImpl.this.serverReady) {
                    NatsImpl.this.channel.write(new ClientUnsubscribeFrame(this.id));
                }
            }
        }

        @Override // nats.client.DefaultSubscription
        protected Message createMessage(String str, String str2, String str3, final String str4) {
            return (str4 == null || str4.trim().length() == 0) ? new DefaultMessage(str, str2, str3, false) : new DefaultMessage(str, str2, str3, true) { // from class: nats.client.NatsImpl.5.1
                @Override // nats.client.DefaultMessage, nats.client.Message
                public void reply(String str5) throws UnsupportedOperationException {
                    NatsImpl.this.publish(str4, str5);
                }

                @Override // nats.client.DefaultMessage, nats.client.Message
                public void reply(final String str5, long j, TimeUnit timeUnit) throws UnsupportedOperationException {
                    NatsImpl.this.eventLoopGroup.next().schedule(new Runnable() { // from class: nats.client.NatsImpl.5.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            NatsImpl.this.publish(str4, str5);
                        }
                    }, j, timeUnit);
                    super.reply(str5, j, timeUnit);
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:nats/client/NatsImpl$NatsChannelInitializer.class */
    public class NatsChannelInitializer extends ChannelInitializer<SocketChannel> {
        private NatsChannelInitializer() {
        }

        public void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast("codec", new ClientCodec(NatsImpl.this.maxFrameSize));
            pipeline.addLast("handler", new AbstractClientInboundMessageHandlerAdapter() { // from class: nats.client.NatsImpl.NatsChannelInitializer.1
                @Override // nats.codec.AbstractClientInboundMessageHandlerAdapter
                protected void publishedMessage(ChannelHandlerContext channelHandlerContext, ServerPublishFrame serverPublishFrame) {
                    NatsSubscription natsSubscription;
                    synchronized (NatsImpl.this.lock) {
                        natsSubscription = (NatsSubscription) NatsImpl.this.subscriptions.get(serverPublishFrame.getId());
                    }
                    if (natsSubscription == null) {
                        throw new NatsException("Received a body for an unknown subscription.");
                    }
                    natsSubscription.onMessage(serverPublishFrame.getSubject(), serverPublishFrame.getBody(), serverPublishFrame.getReplyTo(), NatsImpl.this.executor);
                }

                @Override // nats.codec.AbstractClientInboundMessageHandlerAdapter
                protected void pongResponse(ChannelHandlerContext channelHandlerContext, ServerPongFrame serverPongFrame) {
                }

                @Override // nats.codec.AbstractClientInboundMessageHandlerAdapter
                protected void serverInfo(ChannelHandlerContext channelHandlerContext, ServerInfoFrame serverInfoFrame) {
                    ServerList.Server currentServer = NatsImpl.this.serverList.getCurrentServer();
                    channelHandlerContext.write(new ClientConnectFrame(new ConnectBody(currentServer.getUser(), currentServer.getPassword(), NatsImpl.this.pedantic, false))).addListener(new ChannelFutureListener() { // from class: nats.client.NatsImpl.NatsChannelInitializer.1.1
                        public void operationComplete(ChannelFuture channelFuture) throws Exception {
                            NatsImpl.LOGGER.debug("Server ready");
                            synchronized (NatsImpl.this.lock) {
                                NatsImpl.this.serverReady = true;
                                Iterator it = NatsImpl.this.subscriptions.values().iterator();
                                while (it.hasNext()) {
                                    NatsImpl.this.writeSubscription((NatsSubscription) it.next());
                                }
                                Iterator it2 = NatsImpl.this.publishQueue.iterator();
                                while (it2.hasNext()) {
                                    channelFuture.channel().write((ClientPublishFrame) it2.next());
                                }
                            }
                            NatsImpl.this.fireStateChange(ConnectionStateListener.State.SERVERY_READY);
                        }
                    });
                }

                @Override // nats.codec.AbstractClientInboundMessageHandlerAdapter
                protected void okResponse(ChannelHandlerContext channelHandlerContext, ServerOkFrame serverOkFrame) {
                }

                @Override // nats.codec.AbstractClientInboundMessageHandlerAdapter
                protected void errorResponse(ChannelHandlerContext channelHandlerContext, ServerErrorFrame serverErrorFrame) {
                    throw new NatsException("Sever error: " + serverErrorFrame.getErrorMessage());
                }

                public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
                    NatsImpl.this.fireStateChange(ConnectionStateListener.State.CONNECTED);
                }

                public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
                    synchronized (NatsImpl.this.lock) {
                        NatsImpl.this.serverReady = false;
                    }
                    NatsImpl.this.fireStateChange(ConnectionStateListener.State.DISCONNECTED);
                    NatsImpl.this.scheduleReconnect();
                }

                public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
                    NatsImpl.LOGGER.error("Error", th);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:nats/client/NatsImpl$NatsSubscription.class */
    public class NatsSubscription extends DefaultSubscription {
        final String id;

        protected NatsSubscription(String str, String str2, Integer num, String str3, MessageHandler... messageHandlerArr) {
            super(str, str2, num, messageHandlerArr);
            this.id = str3;
        }

        String getId() {
            return this.id;
        }
    }

    public static String createInbox() {
        byte[] bArr = new byte[16];
        ThreadLocalRandom.current().nextBytes(bArr);
        return "_INBOX." + new BigInteger(bArr).abs().toString(16);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NatsImpl(NatsConnector natsConnector) {
        this.shutDownEventLoop = natsConnector.eventLoopGroup == null;
        this.eventLoopGroup = this.shutDownEventLoop ? new NioEventLoopGroup() : natsConnector.eventLoopGroup;
        this.serverList.addServers(natsConnector.hosts);
        this.automaticReconnect = natsConnector.automaticReconnect;
        this.reconnectTimeWait = natsConnector.reconnectWaitTime;
        this.pedantic = natsConnector.pedantic;
        this.maxFrameSize = natsConnector.maxFrameSize;
        this.listeners.addAll(natsConnector.listeners);
        this.executor = natsConnector.callbackExecutor;
        connect();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void connect() {
        synchronized (this.lock) {
            if (this.closed) {
                return;
            }
            final ServerList.Server nextServer = this.serverList.nextServer();
            LOGGER.debug("Attempting to connect to {} with user {}", nextServer.getAddress(), nextServer.getUser());
            new Bootstrap().group(this.eventLoopGroup).remoteAddress(nextServer.getAddress()).channel(NioSocketChannel.class).handler(new NatsChannelInitializer()).connect().addListener(new ChannelFutureListener() { // from class: nats.client.NatsImpl.1
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (!channelFuture.isSuccess()) {
                        NatsImpl.LOGGER.warn("Connection to {} failed", nextServer.getAddress());
                        nextServer.connectionFailure();
                        NatsImpl.this.scheduleReconnect();
                        return;
                    }
                    NatsImpl.LOGGER.debug("Connection to {} successful", nextServer.getAddress());
                    nextServer.connectionSuccess();
                    synchronized (NatsImpl.this.lock) {
                        NatsImpl.this.channel = channelFuture.channel();
                        if (NatsImpl.this.closed) {
                            NatsImpl.this.channel.close();
                        }
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleReconnect() {
        synchronized (this.lock) {
            this.serverReady = false;
            if (!this.closed && this.automaticReconnect) {
                this.eventLoopGroup.next().schedule(new Runnable() { // from class: nats.client.NatsImpl.2
                    @Override // java.lang.Runnable
                    public void run() {
                        NatsImpl.this.connect();
                    }
                }, this.reconnectTimeWait, TimeUnit.MILLISECONDS);
            }
        }
    }

    @Override // nats.client.Nats
    public boolean isConnected() {
        boolean z;
        synchronized (this.lock) {
            z = this.channel != null && this.channel.isActive();
        }
        return z;
    }

    @Override // nats.client.Nats
    public boolean isClosed() {
        boolean z;
        synchronized (this.lock) {
            z = this.closed;
        }
        return z;
    }

    @Override // nats.client.Nats, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        synchronized (this.lock) {
            this.closed = true;
            this.serverReady = false;
            if (this.channel != null) {
                this.channel.close();
            }
            if (this.shutDownEventLoop) {
                this.eventLoopGroup.shutdown();
            }
            Iterator it = new ArrayList(this.subscriptions.values()).iterator();
            while (it.hasNext()) {
                ((Subscription) it.next()).close();
            }
        }
    }

    @Override // nats.client.Nats
    public void publish(String str) {
        publish(str, "", null);
    }

    @Override // nats.client.Nats
    public void publish(String str, String str2) {
        publish(str, str2, null);
    }

    @Override // nats.client.Nats
    public void publish(String str, String str2, String str3) {
        assertNatsOpen();
        publish(new ClientPublishFrame(str, str2, str3));
    }

    private void publish(ClientPublishFrame clientPublishFrame) {
        synchronized (this.lock) {
            if (this.serverReady) {
                this.channel.write(clientPublishFrame);
            } else {
                this.publishQueue.add(clientPublishFrame);
            }
        }
    }

    @Override // nats.client.Nats
    public Subscription subscribe(String str, MessageHandler... messageHandlerArr) {
        return subscribe(str, null, null, messageHandlerArr);
    }

    @Override // nats.client.Nats
    public Subscription subscribe(String str, String str2, MessageHandler... messageHandlerArr) {
        return subscribe(str, str2, null, messageHandlerArr);
    }

    @Override // nats.client.Nats
    public Subscription subscribe(String str, Integer num, MessageHandler... messageHandlerArr) {
        return subscribe(str, null, num, messageHandlerArr);
    }

    @Override // nats.client.Nats
    public Subscription subscribe(String str, String str2, Integer num, MessageHandler... messageHandlerArr) {
        assertNatsOpen();
        String num2 = Integer.toString(this.subscriptionId.incrementAndGet());
        NatsSubscription createSubscription = createSubscription(num2, str, str2, num, messageHandlerArr);
        synchronized (this.lock) {
            this.subscriptions.put(num2, createSubscription);
            if (this.serverReady) {
                writeSubscription(createSubscription);
            }
        }
        return createSubscription;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeSubscription(NatsSubscription natsSubscription) {
        synchronized (this.lock) {
            if (this.serverReady) {
                this.channel.write(new ClientSubscribeFrame(natsSubscription.getId(), natsSubscription.getSubject(), natsSubscription.getQueueGroup()));
            }
        }
    }

    @Override // nats.client.Nats
    public Request request(String str, MessageHandler... messageHandlerArr) {
        return request(str, "", null, messageHandlerArr);
    }

    @Override // nats.client.Nats
    public Request request(String str, String str2, MessageHandler... messageHandlerArr) {
        return request(str, str2, null, messageHandlerArr);
    }

    @Override // nats.client.Nats
    public Request request(final String str, String str2, final Integer num, MessageHandler... messageHandlerArr) {
        assertNatsOpen();
        String createInbox = createInbox();
        final Subscription subscribe = subscribe(createInbox, num, new MessageHandler[0]);
        for (MessageHandler messageHandler : messageHandlerArr) {
            subscribe.addMessageHandler(messageHandler);
        }
        publish(new ClientPublishFrame(str, str2, createInbox));
        return new Request() { // from class: nats.client.NatsImpl.3
            @Override // nats.client.Request, java.lang.AutoCloseable
            public void close() {
                subscribe.close();
            }

            @Override // nats.client.Request
            public String getSubject() {
                return str;
            }

            @Override // nats.client.Request
            public int getReceivedReplies() {
                return subscribe.getReceivedMessages();
            }

            @Override // nats.client.Request
            public Integer getMaxReplies() {
                return num;
            }
        };
    }

    private void assertNatsOpen() {
        if (isClosed()) {
            throw new NatsClosedException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fireStateChange(final ConnectionStateListener.State state) {
        for (final ConnectionStateListener connectionStateListener : this.listeners) {
            this.executor.execute(new Runnable() { // from class: nats.client.NatsImpl.4
                @Override // java.lang.Runnable
                public void run() {
                    connectionStateListener.onConnectionStateChange(NatsImpl.this, state);
                }
            });
        }
    }

    private NatsSubscription createSubscription(String str, String str2, String str3, Integer num, MessageHandler... messageHandlerArr) {
        return new AnonymousClass5(str2, str3, num, str, messageHandlerArr);
    }
}
