/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.connection.netty;

import com.mongodb.MongoException;
import com.mongodb.MongoInternalException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.MongoSocketOpenException;
import com.mongodb.MongoSocketReadTimeoutException;
import com.mongodb.ServerAddress;
import com.mongodb.connection.AsyncCompletionHandler;
import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.SslSettings;
import com.mongodb.connection.Stream;
import com.mongodb.connection.netty.NettyByteBuf;
import com.mongodb.connection.netty.ReadTimeoutHandler;
import com.mongodb.internal.connection.SslHelper;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.bson.ByteBuf;

final class NettyStream
implements Stream {
    private static final String READ_HANDLER_NAME = "ReadTimeoutHandler";
    private final ServerAddress address;
    private final SocketSettings settings;
    private final SslSettings sslSettings;
    private final EventLoopGroup workerGroup;
    private final Class<? extends SocketChannel> socketChannelClass;
    private final ByteBufAllocator allocator;
    private volatile boolean isClosed;
    private volatile Channel channel;
    private final LinkedList<io.netty.buffer.ByteBuf> pendingInboundBuffers = new LinkedList();
    private volatile PendingReader pendingReader;
    private volatile Throwable pendingException;

    public NettyStream(ServerAddress address, SocketSettings settings, SslSettings sslSettings, EventLoopGroup workerGroup, Class<? extends SocketChannel> socketChannelClass, ByteBufAllocator allocator) {
        this.address = address;
        this.settings = settings;
        this.sslSettings = sslSettings;
        this.workerGroup = workerGroup;
        this.socketChannelClass = socketChannelClass;
        this.allocator = allocator;
    }

    @Override
    public ByteBuf getBuffer(int size2) {
        return new NettyByteBuf(this.allocator.buffer(size2, size2));
    }

    @Override
    public void open() throws IOException {
        FutureAsyncCompletionHandler<Void> handler = new FutureAsyncCompletionHandler<Void>();
        this.openAsync(handler);
        handler.get();
    }

    @Override
    public void openAsync(final AsyncCompletionHandler<Void> handler) {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(this.workerGroup);
        bootstrap.channel(this.socketChannelClass);
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)this.settings.getConnectTimeout(TimeUnit.MILLISECONDS));
        bootstrap.option(ChannelOption.TCP_NODELAY, (Object)true);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, (Object)this.settings.isKeepAlive());
        if (this.settings.getReceiveBufferSize() > 0) {
            bootstrap.option(ChannelOption.SO_RCVBUF, (Object)this.settings.getReceiveBufferSize());
        }
        if (this.settings.getSendBufferSize() > 0) {
            bootstrap.option(ChannelOption.SO_SNDBUF, (Object)this.settings.getSendBufferSize());
        }
        bootstrap.option(ChannelOption.ALLOCATOR, (Object)this.allocator);
        bootstrap.handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel ch) throws Exception {
                int readTimeout;
                if (NettyStream.this.sslSettings.isEnabled()) {
                    SSLEngine engine = SSLContext.getDefault().createSSLEngine(NettyStream.this.address.getHost(), NettyStream.this.address.getPort());
                    engine.setUseClientMode(true);
                    if (!NettyStream.this.sslSettings.isInvalidHostNameAllowed()) {
                        engine.setSSLParameters(SslHelper.enableHostNameVerification(engine.getSSLParameters()));
                    }
                    ch.pipeline().addFirst("ssl", (ChannelHandler)new SslHandler(engine, false));
                }
                if ((readTimeout = NettyStream.this.settings.getReadTimeout(TimeUnit.MILLISECONDS)) > 0) {
                    ch.pipeline().addLast(NettyStream.READ_HANDLER_NAME, (ChannelHandler)new ReadTimeoutHandler(readTimeout));
                }
                ch.pipeline().addLast(new ChannelHandler[]{new InboundBufferHandler()});
            }
        });
        final ChannelFuture channelFuture = bootstrap.connect(this.address.getHost(), this.address.getPort());
        channelFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    NettyStream.this.channel = channelFuture.channel();
                    handler.completed(null);
                } else {
                    handler.failed(new MongoSocketOpenException("Exception opening socket", NettyStream.this.getAddress(), future.cause()));
                }
            }
        });
    }

    @Override
    public void write(List<ByteBuf> buffers) throws IOException {
        FutureAsyncCompletionHandler<Void> future = new FutureAsyncCompletionHandler<Void>();
        this.writeAsync(buffers, future);
        future.get();
    }

    @Override
    public ByteBuf read(int numBytes) throws IOException {
        FutureAsyncCompletionHandler<ByteBuf> future = new FutureAsyncCompletionHandler<ByteBuf>();
        this.readAsync(numBytes, future);
        return future.get();
    }

    @Override
    public void writeAsync(List<ByteBuf> buffers, final AsyncCompletionHandler<Void> handler) {
        CompositeByteBuf composite = PooledByteBufAllocator.DEFAULT.compositeBuffer();
        for (ByteBuf cur : buffers) {
            io.netty.buffer.ByteBuf byteBuf = ((NettyByteBuf)cur).asByteBuf();
            composite.addComponent(byteBuf.retain());
            composite.writerIndex(composite.writerIndex() + byteBuf.writerIndex());
        }
        this.channel.writeAndFlush((Object)composite).addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    handler.failed(future.cause());
                } else {
                    handler.completed(null);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void readAsync(int numBytes, AsyncCompletionHandler<ByteBuf> handler) {
        this.scheduleReadTimeout();
        ByteBuf buffer = null;
        Throwable exceptionResult = null;
        NettyStream nettyStream = this;
        synchronized (nettyStream) {
            exceptionResult = this.pendingException;
            if (exceptionResult == null) {
                if (!this.hasBytesAvailable(numBytes)) {
                    this.pendingReader = new PendingReader(numBytes, handler);
                } else {
                    CompositeByteBuf composite = this.allocator.compositeBuffer(this.pendingInboundBuffers.size());
                    int bytesNeeded = numBytes;
                    Iterator iter2 = this.pendingInboundBuffers.iterator();
                    while (iter2.hasNext()) {
                        io.netty.buffer.ByteBuf next2 = (io.netty.buffer.ByteBuf)iter2.next();
                        int bytesNeededFromCurrentBuffer = Math.min(next2.readableBytes(), bytesNeeded);
                        if (bytesNeededFromCurrentBuffer == next2.readableBytes()) {
                            composite.addComponent(next2);
                            iter2.remove();
                        } else {
                            next2.retain();
                            composite.addComponent(next2.readSlice(bytesNeededFromCurrentBuffer));
                        }
                        composite.writerIndex(composite.writerIndex() + bytesNeededFromCurrentBuffer);
                        if ((bytesNeeded -= bytesNeededFromCurrentBuffer) != 0) continue;
                        break;
                    }
                    buffer = new NettyByteBuf((io.netty.buffer.ByteBuf)composite).flip();
                }
            }
        }
        if (exceptionResult != null) {
            this.disableReadTimeout();
            handler.failed(exceptionResult);
        }
        if (buffer != null) {
            this.disableReadTimeout();
            handler.completed(buffer);
        }
    }

    private boolean hasBytesAvailable(int numBytes) {
        int bytesAvailable = 0;
        for (io.netty.buffer.ByteBuf cur : this.pendingInboundBuffers) {
            if ((bytesAvailable += cur.readableBytes()) < numBytes) continue;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleReadResponse(io.netty.buffer.ByteBuf buffer, Throwable t) {
        PendingReader localPendingReader = null;
        NettyStream nettyStream = this;
        synchronized (nettyStream) {
            if (buffer != null) {
                this.pendingInboundBuffers.add(buffer.retain());
            } else {
                this.pendingException = t;
            }
            if (this.pendingReader != null) {
                localPendingReader = this.pendingReader;
                this.pendingReader = null;
            }
        }
        if (localPendingReader != null) {
            this.readAsync(localPendingReader.numBytes, localPendingReader.handler);
        }
    }

    @Override
    public ServerAddress getAddress() {
        return this.address;
    }

    @Override
    public void close() {
        this.isClosed = true;
        if (this.channel != null) {
            this.channel.close();
            this.channel = null;
        }
        Iterator iterator2 = this.pendingInboundBuffers.iterator();
        while (iterator2.hasNext()) {
            io.netty.buffer.ByteBuf nextByteBuf = (io.netty.buffer.ByteBuf)iterator2.next();
            iterator2.remove();
            nextByteBuf.release();
        }
    }

    @Override
    public boolean isClosed() {
        return this.isClosed;
    }

    public SocketSettings getSettings() {
        return this.settings;
    }

    public SslSettings getSslSettings() {
        return this.sslSettings;
    }

    public EventLoopGroup getWorkerGroup() {
        return this.workerGroup;
    }

    public Class<? extends SocketChannel> getSocketChannelClass() {
        return this.socketChannelClass;
    }

    public ByteBufAllocator getAllocator() {
        return this.allocator;
    }

    private void scheduleReadTimeout() {
        this.adjustTimeout(false);
    }

    private void disableReadTimeout() {
        this.adjustTimeout(true);
    }

    private void adjustTimeout(boolean disable) {
        ChannelHandler timeoutHandler = this.channel.pipeline().get(READ_HANDLER_NAME);
        if (timeoutHandler != null) {
            final ReadTimeoutHandler readTimeoutHandler = (ReadTimeoutHandler)timeoutHandler;
            final ChannelHandlerContext handlerContext = this.channel.pipeline().context(timeoutHandler);
            EventExecutor executor = handlerContext.executor();
            if (disable) {
                if (executor.inEventLoop()) {
                    readTimeoutHandler.removeTimeout(handlerContext);
                } else {
                    executor.submit(new Runnable(){

                        @Override
                        public void run() {
                            readTimeoutHandler.removeTimeout(handlerContext);
                        }
                    });
                }
            } else if (executor.inEventLoop()) {
                readTimeoutHandler.scheduleTimeout(handlerContext);
            } else {
                executor.submit(new Runnable(){

                    @Override
                    public void run() {
                        readTimeoutHandler.scheduleTimeout(handlerContext);
                    }
                });
            }
        }
    }

    private static final class FutureAsyncCompletionHandler<T>
    implements AsyncCompletionHandler<T> {
        private final CountDownLatch latch = new CountDownLatch(1);
        private volatile T t;
        private volatile Throwable throwable;

        @Override
        public void completed(T t) {
            this.t = t;
            this.latch.countDown();
        }

        @Override
        public void failed(Throwable t) {
            this.throwable = t;
            this.latch.countDown();
        }

        public T get() throws IOException {
            try {
                this.latch.await();
                if (this.throwable != null) {
                    if (this.throwable instanceof IOException) {
                        throw (IOException)this.throwable;
                    }
                    if (this.throwable instanceof MongoException) {
                        throw (MongoException)this.throwable;
                    }
                    throw new MongoInternalException("Exception thrown from Netty Stream", this.throwable);
                }
                return this.t;
            }
            catch (InterruptedException e) {
                throw new MongoInterruptedException("Interrupted", e);
            }
        }
    }

    private static final class PendingReader {
        private final int numBytes;
        private final AsyncCompletionHandler<ByteBuf> handler;

        private PendingReader(int numBytes, AsyncCompletionHandler<ByteBuf> handler) {
            this.numBytes = numBytes;
            this.handler = handler;
        }
    }

    private class InboundBufferHandler
    extends SimpleChannelInboundHandler<io.netty.buffer.ByteBuf> {
        private InboundBufferHandler() {
        }

        protected void channelRead0(ChannelHandlerContext ctx, io.netty.buffer.ByteBuf buffer) throws Exception {
            NettyStream.this.handleReadResponse(buffer, null);
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) {
            if (t instanceof ReadTimeoutException) {
                NettyStream.this.handleReadResponse(null, new MongoSocketReadTimeoutException("Timeout while receiving message", NettyStream.this.address, t));
            } else {
                NettyStream.this.handleReadResponse(null, t);
            }
            ctx.close();
        }
    }
}

