/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tajo.rpc;

import com.google.protobuf.Descriptors;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.MessageLite;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.ProtoPipelineFactory;
import org.apache.tajo.rpc.RemoteException;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.RpcProtos;
import org.apache.tajo.util.NetUtils;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.ConnectTimeoutException;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;

public class AsyncRpcClient
extends NettyClientBase {
    private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
    private final ChannelUpstreamHandler handler;
    private final ChannelPipelineFactory pipeFactory;
    private final ProxyRpcChannel rpcChannel;
    private final AtomicInteger sequence = new AtomicInteger(0);
    private final Map<Integer, ResponseCallback> requests = new ConcurrentHashMap<Integer, ResponseCallback>();
    private final Class<?> protocol;
    private final Method stubMethod;
    private RpcConnectionPool.RpcConnectionKey key;

    AsyncRpcClient(Class<?> protocol, InetSocketAddress addr, ClientSocketChannelFactory factory, int retries) throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException {
        this.protocol = protocol;
        String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service";
        Class<?> serviceClass = Class.forName(serviceClassName);
        this.stubMethod = serviceClass.getMethod("newStub", RpcChannel.class);
        this.handler = new ClientChannelUpstreamHandler();
        this.pipeFactory = new ProtoPipelineFactory(this.handler, (MessageLite)RpcProtos.RpcResponse.getDefaultInstance());
        super.init(addr, this.pipeFactory, factory, retries);
        this.rpcChannel = new ProxyRpcChannel();
        this.key = new RpcConnectionPool.RpcConnectionKey(addr, protocol, true);
    }

    @Override
    public RpcConnectionPool.RpcConnectionKey getKey() {
        return this.key;
    }

    @Override
    public <T> T getStub() {
        try {
            return (T)this.stubMethod.invoke(null, this.rpcChannel);
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    public RpcChannel getRpcChannel() {
        return this.rpcChannel;
    }

    private String getErrorMessage(String message) {
        return "Exception [" + this.protocol.getCanonicalName() + "(" + NetUtils.normalizeInetSocketAddress((InetSocketAddress)((InetSocketAddress)this.getChannel().getRemoteAddress())) + ")]: " + message;
    }

    private class ClientChannelUpstreamHandler
    extends SimpleChannelUpstreamHandler {
        private ClientChannelUpstreamHandler() {
        }

        synchronized void registerCallback(int seqId, ResponseCallback callback) {
            if (AsyncRpcClient.this.requests.containsKey(seqId)) {
                throw new RemoteException(AsyncRpcClient.this.getErrorMessage("Duplicate Sequence Id " + seqId));
            }
            AsyncRpcClient.this.requests.put(seqId, callback);
        }

        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            RpcProtos.RpcResponse response = (RpcProtos.RpcResponse)e.getMessage();
            ResponseCallback callback = (ResponseCallback)AsyncRpcClient.this.requests.remove(response.getId());
            if (callback == null) {
                LOG.warn((Object)"Dangling rpc call");
            } else {
                callback.run(response);
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            LOG.error((Object)(AsyncRpcClient.this.getRemoteAddress() + "," + AsyncRpcClient.this.protocol + "," + e.getCause().getMessage()), e.getCause());
            for (Map.Entry callbackEntry : AsyncRpcClient.this.requests.entrySet()) {
                ResponseCallback callback = (ResponseCallback)callbackEntry.getValue();
                Integer id = (Integer)callbackEntry.getKey();
                RpcProtos.RpcResponse.Builder responseBuilder = RpcProtos.RpcResponse.newBuilder().setErrorMessage(e.toString()).setId(id);
                callback.run(responseBuilder.build());
            }
            if (LOG.isDebugEnabled()) {
                LOG.error((Object)("" + e.getCause()), e.getCause());
            } else {
                LOG.error((Object)("RPC Exception:" + e.getCause()));
            }
        }
    }

    private class ResponseCallback
    implements RpcCallback<RpcProtos.RpcResponse> {
        private final RpcController controller;
        private final Message responsePrototype;
        private final RpcCallback<Message> callback;

        public ResponseCallback(RpcController controller, Message responsePrototype, RpcCallback<Message> callback) {
            this.controller = controller;
            this.responsePrototype = responsePrototype;
            this.callback = callback;
        }

        public void run(RpcProtos.RpcResponse rpcResponse) {
            if (rpcResponse.hasErrorMessage()) {
                if (this.controller != null) {
                    this.controller.setFailed(rpcResponse.getErrorMessage());
                }
                this.callback.run(null);
            } else {
                try {
                    Message responseMessage = !rpcResponse.hasResponseMessage() ? null : this.responsePrototype.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage()).build();
                    this.callback.run(responseMessage);
                }
                catch (InvalidProtocolBufferException e) {
                    throw new RemoteException(AsyncRpcClient.this.getErrorMessage(""), e);
                }
            }
        }
    }

    private class ProxyRpcChannel
    implements RpcChannel {
        private final ClientChannelUpstreamHandler handler;

        public ProxyRpcChannel() {
            this.handler = (ClientChannelUpstreamHandler)AsyncRpcClient.this.getChannel().getPipeline().get(ClientChannelUpstreamHandler.class);
            if (this.handler == null) {
                throw new IllegalArgumentException("Channel does not have proper handler");
            }
        }

        public void callMethod(Descriptors.MethodDescriptor method, RpcController controller, Message param, Message responseType, RpcCallback<Message> done) {
            int nextSeqId = AsyncRpcClient.this.sequence.getAndIncrement();
            Message rpcRequest = this.buildRequest(nextSeqId, method, param);
            this.handler.registerCallback(nextSeqId, new ResponseCallback(controller, responseType, done));
            AsyncRpcClient.this.getChannel().write((Object)rpcRequest);
        }

        private Message buildRequest(int seqId, Descriptors.MethodDescriptor method, Message param) {
            RpcProtos.RpcRequest.Builder requestBuilder = RpcProtos.RpcRequest.newBuilder().setId(seqId).setMethodName(method.getName());
            if (param != null) {
                requestBuilder.setRequestMessage(param.toByteString());
            }
            return requestBuilder.build();
        }
    }
}

