package com.googlecode.protobuf.pro.duplex.client;

import com.google.protobuf.ExtensionRegistry;
import com.googlecode.protobuf.pro.duplex.PeerInfo;
import com.googlecode.protobuf.pro.duplex.RpcClient;
import com.googlecode.protobuf.pro.duplex.RpcClientChannel;
import com.googlecode.protobuf.pro.duplex.RpcSSLContext;
import com.googlecode.protobuf.pro.duplex.RpcServer;
import com.googlecode.protobuf.pro.duplex.RpcServiceRegistry;
import com.googlecode.protobuf.pro.duplex.execute.RpcServerCallExecutor;
import com.googlecode.protobuf.pro.duplex.handler.ClientConnectResponseHandler;
import com.googlecode.protobuf.pro.duplex.handler.Handler;
import com.googlecode.protobuf.pro.duplex.handler.RpcClientHandler;
import com.googlecode.protobuf.pro.duplex.handler.RpcServerHandler;
import com.googlecode.protobuf.pro.duplex.listener.TcpConnectionEventListener;
import com.googlecode.protobuf.pro.duplex.logging.CategoryPerServiceLogger;
import com.googlecode.protobuf.pro.duplex.logging.RpcLogger;
import com.googlecode.protobuf.pro.duplex.server.RpcClientRegistry;
import com.googlecode.protobuf.pro.duplex.wire.DuplexProtocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.compression.JZlibDecoder;
import io.netty.handler.codec.compression.JZlibEncoder;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.ssl.SslHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/googlecode/protobuf/pro/duplex/client/DuplexTcpClientPipelineFactory.class */
public class DuplexTcpClientPipelineFactory extends ChannelInitializer<Channel> {
    private static Logger log = LoggerFactory.getLogger(DuplexTcpClientPipelineFactory.class);
    private PeerInfo clientInfo;
    private ExtensionRegistry extensionRegistry;
    private RpcSSLContext sslContext;
    private List<TcpConnectionEventListener> connectionEventListeners = new ArrayList();
    private boolean compression = false;
    private AtomicInteger correlationId = new AtomicInteger(1);
    private final RpcClientRegistry rpcClientRegistry = new RpcClientRegistry();
    private final RpcServiceRegistry rpcServiceRegistry = new RpcServiceRegistry();
    private RpcServerCallExecutor rpcServerCallExecutor = null;
    private RpcLogger logger = new CategoryPerServiceLogger();
    private long connectResponseTimeoutMillis = ClientConnectResponseHandler.DEFAULT_CONNECT_RESPONSE_TIMEOUT_MS;

    public DuplexTcpClientPipelineFactory(PeerInfo peerInfo) {
        this.clientInfo = peerInfo;
    }

    public RpcClient peerWith(PeerInfo peerInfo, Bootstrap bootstrap) throws IOException {
        return peerWith(new InetSocketAddress(peerInfo.getHostName(), peerInfo.getPort()), bootstrap);
    }

    public RpcClient peerWith(String str, int i, Bootstrap bootstrap) throws IOException {
        return peerWith(new InetSocketAddress(str, i), bootstrap);
    }

    public RpcClient peerWith(InetSocketAddress inetSocketAddress, Bootstrap bootstrap) throws IOException {
        if (inetSocketAddress == null) {
            throw new NullPointerException("remotedAddress");
        }
        ChannelFuture awaitUninterruptibly = bootstrap.connect(inetSocketAddress).awaitUninterruptibly();
        if (!awaitUninterruptibly.isSuccess()) {
            throw new IOException("Failed to connect to " + inetSocketAddress, awaitUninterruptibly.cause());
        }
        Channel channel = awaitUninterruptibly.channel();
        DuplexProtocol.ConnectRequest m200build = DuplexProtocol.ConnectRequest.newBuilder().setClientHostName(this.clientInfo.getHostName()).setClientPort(this.clientInfo.getPort()).setClientPID(this.clientInfo.getPid()).setCorrelationId(this.correlationId.incrementAndGet()).setCompress(isCompression()).m200build();
        DuplexProtocol.WirePayload m452build = DuplexProtocol.WirePayload.newBuilder().setConnectRequest(m200build).m452build();
        if (log.isDebugEnabled()) {
            log.debug("Sending [" + m200build.getCorrelationId() + "]ConnectRequest.");
        }
        channel.writeAndFlush(m452build);
        ClientConnectResponseHandler clientConnectResponseHandler = channel.pipeline().get(Handler.CLIENT_CONNECT);
        if (clientConnectResponseHandler == null) {
            throw new IllegalStateException("No connectReponse handler in channel pipeline.");
        }
        DuplexProtocol.ConnectResponse connectResponse = clientConnectResponseHandler.getConnectResponse(this.connectResponseTimeoutMillis);
        if (connectResponse == null) {
            awaitUninterruptibly.channel().close().awaitUninterruptibly();
            throw new IOException("No Channel response received before " + this.connectResponseTimeoutMillis + " millis timeout.");
        }
        if (connectResponse.hasErrorCode()) {
            awaitUninterruptibly.channel().close().awaitUninterruptibly();
            throw new IOException("DuplexTcpServer CONNECT_RESPONSE indicated error " + connectResponse.getErrorCode());
        }
        if (!connectResponse.hasCorrelationId()) {
            awaitUninterruptibly.channel().close().awaitUninterruptibly();
            throw new IOException("DuplexTcpServer CONNECT_RESPONSE missing correlationId.");
        }
        if (connectResponse.getCorrelationId() != m200build.getCorrelationId()) {
            awaitUninterruptibly.channel().close().awaitUninterruptibly();
            throw new IOException("DuplexTcpServer CONNECT_RESPONSE correlationId mismatch. TcpClient sent " + m200build.getCorrelationId() + " received " + connectResponse.getCorrelationId() + " from TcpServer.");
        }
        RpcClient rpcClient = new RpcClient(channel, this.clientInfo, connectResponse.hasServerPID() ? new PeerInfo(inetSocketAddress.getHostName(), inetSocketAddress.getPort(), connectResponse.getServerPID()) : new PeerInfo(inetSocketAddress.getHostName(), inetSocketAddress.getPort()), connectResponse.getCompress(), getRpcLogger());
        completePipeline(rpcClient).notifyOpened();
        if (!getRpcClientRegistry().registerRpcClient(rpcClient)) {
            log.warn("Client RpcClient already registered. Bug??");
        }
        return rpcClient;
    }

    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        RpcSSLContext sslContext = getSslContext();
        if (sslContext != null) {
            pipeline.addLast(Handler.SSL, new SslHandler(sslContext.createClientEngine()));
        }
        pipeline.addLast(Handler.FRAME_DECODER, new ProtobufVarint32FrameDecoder());
        pipeline.addLast(Handler.PROTOBUF_DECODER, new ProtobufDecoder(DuplexProtocol.WirePayload.getDefaultInstance(), getExtensionRegistry()));
        pipeline.addLast(Handler.FRAME_ENCODER, new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast(Handler.PROTOBUF_ENCODER, new ProtobufEncoder());
        pipeline.addLast(Handler.CLIENT_CONNECT, new ClientConnectResponseHandler());
    }

    protected RpcClientHandler completePipeline(RpcClient rpcClient) {
        ChannelPipeline pipeline = rpcClient.getChannel().pipeline();
        if (rpcClient.isCompression()) {
            pipeline.addBefore(Handler.FRAME_DECODER, Handler.DECOMPRESSOR, new JZlibEncoder(ZlibWrapper.GZIP));
            pipeline.addAfter(Handler.DECOMPRESSOR, Handler.COMPRESSOR, new JZlibDecoder(ZlibWrapper.GZIP));
        }
        RpcClientHandler rpcClientHandler = new RpcClientHandler(rpcClient, new TcpConnectionEventListener() { // from class: com.googlecode.protobuf.pro.duplex.client.DuplexTcpClientPipelineFactory.1
            @Override // com.googlecode.protobuf.pro.duplex.listener.TcpConnectionEventListener
            public void connectionClosed(RpcClientChannel rpcClientChannel) {
                Iterator it = DuplexTcpClientPipelineFactory.this.getListenersCopy().iterator();
                while (it.hasNext()) {
                    ((TcpConnectionEventListener) it.next()).connectionClosed(rpcClientChannel);
                }
            }

            @Override // com.googlecode.protobuf.pro.duplex.listener.TcpConnectionEventListener
            public void connectionOpened(RpcClientChannel rpcClientChannel) {
                Iterator it = DuplexTcpClientPipelineFactory.this.getListenersCopy().iterator();
                while (it.hasNext()) {
                    ((TcpConnectionEventListener) it.next()).connectionOpened(rpcClientChannel);
                }
            }
        });
        pipeline.replace(Handler.CLIENT_CONNECT, Handler.RPC_CLIENT, rpcClientHandler);
        pipeline.addAfter(Handler.RPC_CLIENT, Handler.RPC_SERVER, new RpcServerHandler(new RpcServer(rpcClient, this.rpcServiceRegistry, this.rpcServerCallExecutor, this.logger), this.rpcClientRegistry));
        return rpcClientHandler;
    }

    public String toString() {
        return "DuplexTcpClientPipelineFactory:" + this.clientInfo;
    }

    public void registerConnectionEventListener(TcpConnectionEventListener tcpConnectionEventListener) {
        getConnectionEventListeners().add(tcpConnectionEventListener);
    }

    public void removeConnectionEventListener(TcpConnectionEventListener tcpConnectionEventListener) {
        getConnectionEventListeners().remove(tcpConnectionEventListener);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<TcpConnectionEventListener> getListenersCopy() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(getConnectionEventListeners());
        return Collections.unmodifiableList(arrayList);
    }

    public List<TcpConnectionEventListener> getConnectionEventListeners() {
        return this.connectionEventListeners == null ? new ArrayList(0) : this.connectionEventListeners;
    }

    public void setConnectionEventListeners(List<TcpConnectionEventListener> list) {
        this.connectionEventListeners = list;
    }

    public PeerInfo getClientInfo() {
        return this.clientInfo;
    }

    public void setClientInfo(PeerInfo peerInfo) {
        this.clientInfo = peerInfo;
    }

    public RpcServiceRegistry getRpcServiceRegistry() {
        return this.rpcServiceRegistry;
    }

    public RpcClientRegistry getRpcClientRegistry() {
        return this.rpcClientRegistry;
    }

    public RpcServerCallExecutor getRpcServerCallExecutor() {
        return this.rpcServerCallExecutor;
    }

    public void setRpcServerCallExecutor(RpcServerCallExecutor rpcServerCallExecutor) {
        this.rpcServerCallExecutor = rpcServerCallExecutor;
    }

    public RpcLogger getRpcLogger() {
        return this.logger;
    }

    public void setRpcLogger(RpcLogger rpcLogger) {
        this.logger = rpcLogger;
    }

    public RpcSSLContext getSslContext() {
        return this.sslContext;
    }

    public void setSslContext(RpcSSLContext rpcSSLContext) {
        this.sslContext = rpcSSLContext;
    }

    public boolean isCompression() {
        return this.compression;
    }

    public void setCompression(boolean z) {
        this.compression = z;
    }

    public ExtensionRegistry getExtensionRegistry() {
        return this.extensionRegistry;
    }

    public void setExtensionRegistry(ExtensionRegistry extensionRegistry) {
        this.extensionRegistry = extensionRegistry;
    }

    public long getConnectResponseTimeoutMillis() {
        return this.connectResponseTimeoutMillis;
    }

    public void setConnectResponseTimeoutMillis(long j) {
        this.connectResponseTimeoutMillis = j;
    }
}
