package org.apache.tajo.rpc;

import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.Closeable;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.UnresolvedAddressException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.rpc.MonitorStateEvent;
import org.apache.tajo.rpc.RpcProtos;

/* loaded from: input_file:org/apache/tajo/rpc/NettyClientBase.class */
public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable {
    public static final Log LOG = LogFactory.getLog(NettyClientBase.class);
    private final RpcConnectionKey key;
    private final int maxRetryNum;
    private final long connTimeoutMillis;
    private boolean enableMonitor;
    private final ConcurrentMap<RpcConnectionKey, ChannelEventListener> channelEventListeners = new ConcurrentHashMap();
    private final ConcurrentMap<Integer, T> requests = new ConcurrentHashMap();
    private Bootstrap bootstrap;
    private volatile ChannelFuture channelFuture;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.tajo.rpc.NettyClientBase$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/tajo/rpc/NettyClientBase$1.class */
    public class AnonymousClass1 implements GenericFutureListener<ChannelFuture> {
        final /* synthetic */ RpcProtos.RpcRequest val$rpcRequest;
        final /* synthetic */ Object val$callback;
        final /* synthetic */ int val$retry;

        AnonymousClass1(RpcProtos.RpcRequest rpcRequest, Object obj, int i) {
            this.val$rpcRequest = rpcRequest;
            this.val$callback = obj;
            this.val$retry = i;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                NettyClientBase.this.getHandler().registerCallback(this.val$rpcRequest.getId(), this.val$callback);
                return;
            }
            if (channelFuture.channel().isActive() || this.val$retry >= NettyClientBase.this.maxRetryNum) {
                NettyClientBase.this.getHandler().registerCallback(this.val$rpcRequest.getId(), this.val$callback);
                NettyClientBase.this.getHandler().exceptionCaught(NettyClientBase.this.getChannel().pipeline().lastContext(), new RecoverableException(this.val$rpcRequest.getId(), channelFuture.cause()));
            } else {
                NettyClientBase.LOG.warn(channelFuture.cause() + " Try to reconnect :" + NettyClientBase.this.getKey().addr);
                channelFuture.channel().eventLoop().schedule(new Runnable() { // from class: org.apache.tajo.rpc.NettyClientBase.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        NettyClientBase.this.doConnect(NettyClientBase.this.getKey().addr).addListener(new GenericFutureListener<ChannelFuture>() { // from class: org.apache.tajo.rpc.NettyClientBase.1.1.1
                            /* JADX WARN: Multi-variable type inference failed */
                            public void operationComplete(ChannelFuture channelFuture2) throws Exception {
                                NettyClientBase.this.invoke(AnonymousClass1.this.val$rpcRequest, AnonymousClass1.this.val$callback, AnonymousClass1.this.val$retry + 1);
                            }
                        });
                    }
                }, 1000L, TimeUnit.MILLISECONDS);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/tajo/rpc/NettyClientBase$NettyChannelInboundHandler.class */
    public abstract class NettyChannelInboundHandler extends SimpleChannelInboundHandler<RpcProtos.RpcResponse> {
        /* JADX INFO: Access modifiers changed from: protected */
        public NettyChannelInboundHandler() {
        }

        protected void registerCallback(int i, T t) {
            if (NettyClientBase.this.requests.putIfAbsent(Integer.valueOf(i), t) != null) {
                throw new RemoteException(NettyClientBase.this.getErrorMessage("Duplicate Sequence Id " + i));
            }
        }

        public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
            if (channelHandlerContext.pipeline().get(MonitorClientHandler.class) != null) {
                NettyClientBase.this.enableMonitor = true;
            }
            Iterator<ChannelEventListener> it = NettyClientBase.this.getSubscribers().iterator();
            while (it.hasNext()) {
                it.next().channelRegistered(channelHandlerContext);
            }
            super.channelRegistered(channelHandlerContext);
        }

        public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
            Iterator<ChannelEventListener> it = NettyClientBase.this.getSubscribers().iterator();
            while (it.hasNext()) {
                it.next().channelUnregistered(channelHandlerContext);
            }
            super.channelUnregistered(channelHandlerContext);
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
            super.channelActive(channelHandlerContext);
            NettyClientBase.LOG.debug("Connection established successfully : " + channelHandlerContext.channel());
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            super.channelInactive(channelHandlerContext);
            sendExceptions("Connection lost :" + NettyClientBase.this.getKey().addr);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Multi-variable type inference failed */
        public final void channelRead0(ChannelHandlerContext channelHandlerContext, RpcProtos.RpcResponse rpcResponse) throws Exception {
            Object remove = NettyClientBase.this.requests.remove(Integer.valueOf(rpcResponse.getId()));
            if (remove == null) {
                NettyClientBase.LOG.warn("Dangling rpc call");
            } else {
                run(rpcResponse, remove);
            }
        }

        protected abstract void run(RpcProtos.RpcResponse rpcResponse, T t) throws Exception;

        protected abstract void handleException(int i, T t, String str);

        /* JADX WARN: Multi-variable type inference failed */
        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
            Throwable rootCause = ExceptionUtils.getRootCause(th);
            NettyClientBase.LOG.error(NettyClientBase.this.getErrorMessage(ExceptionUtils.getMessage(rootCause)), rootCause);
            if (th instanceof RecoverableException) {
                sendException((RecoverableException) th);
                return;
            }
            sendExceptions(ExceptionUtils.getMessage(rootCause));
            if (channelHandlerContext.channel().isOpen()) {
                channelHandlerContext.close();
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void sendExceptions(String str) {
            Iterator it = NettyClientBase.this.requests.keySet().iterator();
            while (it.hasNext()) {
                int intValue = ((Integer) it.next()).intValue();
                handleException(intValue, NettyClientBase.this.requests.remove(Integer.valueOf(intValue)), str);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void sendException(RecoverableException recoverableException) {
            Object remove = NettyClientBase.this.requests.remove(Integer.valueOf(recoverableException.getSeqId()));
            if (remove != null) {
                handleException(recoverableException.getSeqId(), remove, ExceptionUtils.getRootCauseMessage(recoverableException));
            }
        }

        public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            if (NettyClientBase.this.enableMonitor || !(obj instanceof IdleStateEvent)) {
                if ((obj instanceof MonitorStateEvent) && ((MonitorStateEvent) obj).state() == MonitorStateEvent.MonitorState.PING_EXPIRED) {
                    exceptionCaught(channelHandlerContext, new ServiceException("Server has not respond: " + channelHandlerContext.channel()));
                }
            } else if (((IdleStateEvent) obj).state() == IdleState.READER_IDLE && NettyClientBase.this.requests.isEmpty()) {
                channelHandlerContext.close();
                NettyClientBase.LOG.info("Idle connection closed successfully :" + channelHandlerContext.channel());
            }
            super.userEventTriggered(channelHandlerContext, obj);
        }
    }

    public NettyClientBase(RpcConnectionKey rpcConnectionKey, Properties properties) throws ClassNotFoundException, NoSuchMethodException {
        this.key = rpcConnectionKey;
        this.maxRetryNum = Integer.parseInt(properties.getProperty("tajo.rpc.client.retry-num", String.valueOf(0)));
        this.connTimeoutMillis = Integer.parseInt(properties.getProperty("tajo.rpc.client.connection-timeout-ms", String.valueOf(RpcConstants.CLIENT_CONNECTION_TIMEOUT_DEFAULT)));
        Preconditions.checkArgument(this.connTimeoutMillis <= 2147483647L, "Too long connection timeout");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void init(ChannelInitializer<Channel> channelInitializer, EventLoopGroup eventLoopGroup) {
        this.bootstrap = new Bootstrap();
        this.bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(channelInitializer).option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf((int) this.connTimeoutMillis)).option(ChannelOption.SO_RCVBUF, 10485760).option(ChannelOption.TCP_NODELAY, true);
    }

    public RpcConnectionKey getKey() {
        return this.key;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Class<?> getServiceClass() throws ClassNotFoundException {
        return Class.forName(getKey().protocolClass.getName() + "$" + getKey().protocolClass.getSimpleName() + "Service");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final <I> I getStub(Method method, Object obj) {
        try {
            return (I) method.invoke(null, obj);
        } catch (Exception e) {
            throw new RemoteException(e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static RpcProtos.RpcRequest buildRequest(int i, Descriptors.MethodDescriptor methodDescriptor, Message message) {
        RpcProtos.RpcRequest.Builder methodName = RpcProtos.RpcRequest.newBuilder().setId(i).setMethodName(methodDescriptor.getName());
        if (message != null) {
            methodName.setRequestMessage(message.toByteString());
        }
        return methodName.m35build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void invoke(RpcProtos.RpcRequest rpcRequest, T t, int i) {
        if (getChannel().eventLoop().isShuttingDown()) {
            LOG.warn("RPC is shutting down");
            return;
        }
        ChannelPromise newPromise = getChannel().newPromise();
        newPromise.addListener(new AnonymousClass1(rpcRequest, t, i));
        getChannel().writeAndFlush(rpcRequest, newPromise);
    }

    private static InetSocketAddress resolveAddress(InetSocketAddress inetSocketAddress) {
        return inetSocketAddress.isUnresolved() ? RpcUtils.createSocketAddr(inetSocketAddress.getHostName(), inetSocketAddress.getPort()) : inetSocketAddress;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ChannelFuture doConnect(SocketAddress socketAddress) {
        ChannelFuture connect = this.bootstrap.clone().connect(socketAddress);
        this.channelFuture = connect;
        return connect;
    }

    private ConnectException makeConnectException(InetSocketAddress inetSocketAddress, ChannelFuture channelFuture) {
        return channelFuture.cause() instanceof UnresolvedAddressException ? new ConnectException("Can't resolve host name: " + inetSocketAddress.toString()) : new ConnectTimeoutException(channelFuture.cause().getMessage());
    }

    public synchronized void connect() throws ConnectException {
        if (isConnected()) {
            return;
        }
        InetSocketAddress inetSocketAddress = this.key.addr;
        if (inetSocketAddress.isUnresolved()) {
            inetSocketAddress = resolveAddress(inetSocketAddress);
        }
        ChannelFuture awaitUninterruptibly = doConnect(inetSocketAddress).awaitUninterruptibly();
        if (awaitUninterruptibly.isSuccess()) {
            return;
        }
        if (this.maxRetryNum <= 0) {
            throw makeConnectException(inetSocketAddress, awaitUninterruptibly);
        }
        doReconnect(inetSocketAddress, awaitUninterruptibly, 0 + 1);
    }

    private void doReconnect(InetSocketAddress inetSocketAddress, ChannelFuture channelFuture, int i) throws ConnectException {
        while (this.maxRetryNum > i) {
            i++;
            if (getChannel().eventLoop().isShuttingDown()) {
                LOG.warn("RPC is shutting down");
                return;
            }
            LOG.warn(getErrorMessage(ExceptionUtils.getMessage(channelFuture.cause())) + "\nTry to reconnect : " + getKey().addr);
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
            this.channelFuture = doConnect(inetSocketAddress).awaitUninterruptibly();
            if (this.channelFuture.isDone() && this.channelFuture.isSuccess()) {
                return;
            }
        }
        LOG.error("Max retry count has been exceeded. attempts=" + i + " caused by: " + channelFuture.cause());
        throw makeConnectException(inetSocketAddress, channelFuture);
    }

    protected abstract NettyClientBase<T>.NettyChannelInboundHandler getHandler();

    public Channel getChannel() {
        if (this.channelFuture == null) {
            return null;
        }
        return this.channelFuture.channel();
    }

    public boolean isConnected() {
        Channel channel = getChannel();
        return channel != null && channel.isActive();
    }

    public SocketAddress getRemoteAddress() {
        Channel channel = getChannel();
        if (channel == null) {
            return null;
        }
        return channel.remoteAddress();
    }

    public int getActiveRequests() {
        return this.requests.size();
    }

    public boolean subscribeEvent(RpcConnectionKey rpcConnectionKey, ChannelEventListener channelEventListener) {
        return this.channelEventListeners.putIfAbsent(rpcConnectionKey, channelEventListener) == null;
    }

    public void removeSubscribers() {
        this.channelEventListeners.clear();
    }

    public Collection<ChannelEventListener> getSubscribers() {
        return this.channelEventListeners.values();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getErrorMessage(String str) {
        return "Exception [" + getKey().protocolClass.getCanonicalName() + "(" + getKey().addr + ")]: " + str;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Channel channel = getChannel();
        if (channel == null || !channel.isOpen()) {
            return;
        }
        LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress());
        channel.close().syncUninterruptibly();
    }
}
