/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tajo.rpc;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.MessageLite;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.rpc.NettyRpcController;
import org.apache.tajo.rpc.NettyServerBase;
import org.apache.tajo.rpc.ProtoPipelineFactory;
import org.apache.tajo.rpc.RemoteCallException;
import org.apache.tajo.rpc.RpcProtos;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

public class AsyncRpcServer
extends NettyServerBase {
    private static final Log LOG = LogFactory.getLog(AsyncRpcServer.class);
    private final Service service;
    private final ChannelPipelineFactory pipeline;

    public AsyncRpcServer(Class<?> protocol, Object instance, InetSocketAddress bindAddress, int workerNum) throws Exception {
        super(protocol.getSimpleName(), bindAddress);
        String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service";
        Class<?> serviceClass = Class.forName(serviceClassName);
        Class<?> interfaceClass = Class.forName(serviceClassName + "$Interface");
        Method method = serviceClass.getMethod("newReflectiveService", interfaceClass);
        this.service = (Service)method.invoke(null, instance);
        ServerHandler handler = new ServerHandler();
        this.pipeline = new ProtoPipelineFactory((ChannelUpstreamHandler)handler, (MessageLite)RpcProtos.RpcRequest.getDefaultInstance());
        super.init(this.pipeline, workerNum);
    }

    private class ServerHandler
    extends SimpleChannelUpstreamHandler {
        private ServerHandler() {
        }

        public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) throws Exception {
            AsyncRpcServer.this.accepted.add((Object)evt.getChannel());
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)String.format(AsyncRpcServer.this.serviceName + " accepted number of connections (%d)", AsyncRpcServer.this.accepted.size()));
            }
            super.channelOpen(ctx, evt);
        }

        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            final RpcProtos.RpcRequest request = (RpcProtos.RpcRequest)e.getMessage();
            String methodName = request.getMethodName();
            Descriptors.MethodDescriptor methodDescriptor = AsyncRpcServer.this.service.getDescriptorForType().findMethodByName(methodName);
            if (methodDescriptor == null) {
                throw new RemoteCallException(request.getId(), (Throwable)new NoSuchMethodException(methodName));
            }
            Message paramProto = null;
            if (request.hasRequestMessage()) {
                try {
                    paramProto = AsyncRpcServer.this.service.getRequestPrototype(methodDescriptor).newBuilderForType().mergeFrom(request.getRequestMessage()).build();
                }
                catch (Throwable t) {
                    throw new RemoteCallException(request.getId(), methodDescriptor, t);
                }
            }
            final Channel channel = e.getChannel();
            final NettyRpcController controller = new NettyRpcController();
            RpcCallback<Message> callback = !request.hasId() ? null : new RpcCallback<Message>(){

                public void run(Message returnValue) {
                    RpcProtos.RpcResponse.Builder builder = RpcProtos.RpcResponse.newBuilder().setId(request.getId());
                    if (returnValue != null) {
                        builder.setResponseMessage(returnValue.toByteString());
                    }
                    if (controller.failed()) {
                        builder.setErrorMessage(controller.errorText());
                    }
                    channel.write((Object)builder.build());
                }
            };
            AsyncRpcServer.this.service.callMethod(methodDescriptor, (RpcController)controller, paramProto, (RpcCallback)callback);
        }

        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            if (e.getCause() instanceof RemoteCallException) {
                RemoteCallException callException = (RemoteCallException)e.getCause();
                e.getChannel().write((Object)callException.getResponse());
            } else {
                LOG.error((Object)e.getCause());
            }
        }
    }
}

