package link.thingscloud.netty.remoting.impl.netty;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import link.thingscloud.netty.remoting.api.AsyncHandler;
import link.thingscloud.netty.remoting.api.RemotingEndPoint;
import link.thingscloud.netty.remoting.api.RemotingService;
import link.thingscloud.netty.remoting.api.RequestProcessor;
import link.thingscloud.netty.remoting.api.channel.ChannelEventListener;
import link.thingscloud.netty.remoting.api.channel.ChannelEventListenerGroup;
import link.thingscloud.netty.remoting.api.channel.RemotingChannel;
import link.thingscloud.netty.remoting.api.command.RemotingCommand;
import link.thingscloud.netty.remoting.api.command.RemotingCommandFactory;
import link.thingscloud.netty.remoting.api.command.TrafficType;
import link.thingscloud.netty.remoting.api.exception.RemotingAccessException;
import link.thingscloud.netty.remoting.api.exception.RemotingRuntimeException;
import link.thingscloud.netty.remoting.api.exception.RemotingTimeoutException;
import link.thingscloud.netty.remoting.api.exception.SemaphoreExhaustedException;
import link.thingscloud.netty.remoting.api.interceptor.Interceptor;
import link.thingscloud.netty.remoting.api.interceptor.InterceptorGroup;
import link.thingscloud.netty.remoting.api.interceptor.RequestContext;
import link.thingscloud.netty.remoting.api.interceptor.ResponseContext;
import link.thingscloud.netty.remoting.common.ResponseFuture;
import link.thingscloud.netty.remoting.common.SemaphoreReleaseOnlyOnce;
import link.thingscloud.netty.remoting.config.RemotingConfig;
import link.thingscloud.netty.remoting.impl.command.CodecHelper;
import link.thingscloud.netty.remoting.impl.command.RemotingCommandFactoryImpl;
import link.thingscloud.netty.remoting.impl.command.RemotingSysResponseCode;
import link.thingscloud.netty.remoting.internal.RemotingUtil;
import link.thingscloud.netty.remoting.internal.ThreadHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:link/thingscloud/netty/remoting/impl/netty/AbstractRemotingServiceImpl.class */
public abstract class AbstractRemotingServiceImpl implements RemotingService {
    private static final Logger log = LoggerFactory.getLogger(AbstractRemotingServiceImpl.class);
    private final Semaphore semaphoreOneway;
    private final Semaphore semaphoreAsync;
    protected final ExecutorService publicExecutor;
    private final ExecutorService asyncHandlerExecutor;
    private final boolean binary;
    private final Map<Integer, ResponseFuture> ackTables = new ConcurrentHashMap(256);
    private final Map<Integer, RequestProcessor> processorTables = new ConcurrentHashMap(256);
    private final InterceptorGroup interceptorGroup = new InterceptorGroup();
    protected final ChannelEventListenerGroup listenerGroup = new ChannelEventListenerGroup();
    protected ScheduledExecutorService houseKeepingService = ThreadHelper.newSingleThreadScheduledExecutor("HouseKeepingService", true);
    private final RemotingCommandFactory remotingCommandFactory = new RemotingCommandFactoryImpl();

    /* renamed from: link.thingscloud.netty.remoting.impl.netty.AbstractRemotingServiceImpl$1, reason: invalid class name */
    /* loaded from: input_file:link/thingscloud/netty/remoting/impl/netty/AbstractRemotingServiceImpl$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$link$thingscloud$netty$remoting$api$command$TrafficType = new int[TrafficType.values().length];

        static {
            try {
                $SwitchMap$link$thingscloud$netty$remoting$api$command$TrafficType[TrafficType.REQUEST_ONEWAY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$link$thingscloud$netty$remoting$api$command$TrafficType[TrafficType.REQUEST_ASYNC.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$link$thingscloud$netty$remoting$api$command$TrafficType[TrafficType.REQUEST_SYNC.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$link$thingscloud$netty$remoting$api$command$TrafficType[TrafficType.RESPONSE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRemotingServiceImpl(RemotingConfig remotingConfig) {
        this.binary = remotingConfig.isBinary();
        this.semaphoreOneway = new Semaphore(remotingConfig.getOnewayInvokeSemaphore(), true);
        this.semaphoreAsync = new Semaphore(remotingConfig.getAsyncInvokeSemaphore(), true);
        this.publicExecutor = ThreadHelper.newFixedThreadPool(remotingConfig.getPublicExecutorThreads(), 10000, "Remoting-PublicExecutor", true);
        this.asyncHandlerExecutor = ThreadHelper.newFixedThreadPool(remotingConfig.getAsyncHandlerExecutorThreads(), 10000, "Remoting-AsyncExecutor", true);
    }

    @Override // link.thingscloud.netty.remoting.api.RemotingService
    public void registerRequestProcessor(int i, RequestProcessor requestProcessor) {
        this.processorTables.putIfAbsent(Integer.valueOf(i), requestProcessor);
    }

    @Override // link.thingscloud.netty.remoting.api.RemotingService
    public void registerChannelEventListener(ChannelEventListener channelEventListener) {
        this.listenerGroup.registerChannelEventListener(channelEventListener);
    }

    @Override // link.thingscloud.netty.remoting.api.RemotingService
    public RemotingCommandFactory commandFactory() {
        return this.remotingCommandFactory;
    }

    @Override // link.thingscloud.netty.remoting.api.RemotingService
    public void start() {
        startUpHouseKeepingService();
    }

    @Override // link.thingscloud.netty.remoting.api.RemotingService
    public void stop() {
        ThreadHelper.shutdownGracefully(this.houseKeepingService, 3000L, TimeUnit.MILLISECONDS);
        ThreadHelper.shutdownGracefully(this.publicExecutor, 2000L, TimeUnit.MILLISECONDS);
        ThreadHelper.shutdownGracefully(this.asyncHandlerExecutor, 2000L, TimeUnit.MILLISECONDS);
    }

    @Override // link.thingscloud.netty.remoting.api.RemotingService
    public void registerInterceptor(Interceptor interceptor) {
        this.interceptorGroup.registerInterceptor(interceptor);
    }

    protected void startUpHouseKeepingService() {
        this.houseKeepingService.scheduleAtFixedRate(this::scanResponseTable, 3000L, 1000L, TimeUnit.MICROSECONDS);
    }

    void scanResponseTable() {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<Integer, ResponseFuture>> it = this.ackTables.entrySet().iterator();
        while (it.hasNext()) {
            ResponseFuture value = it.next().getValue();
            if (value.getBeginTimestamp() + value.getTimeoutMillis() <= System.currentTimeMillis()) {
                arrayList.add(Integer.valueOf(value.getRequestId()));
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ResponseFuture remove = this.ackTables.remove((Integer) it2.next());
            if (remove != null) {
                log.warn("Removes timeout request {} ", remove.getRequestCommand());
                remove.setCause(new RemotingTimeoutException(String.format("Request to %s timeout", remove.getRemoteAddr()), remove.getTimeoutMillis()));
                executeAsyncHandler(remove);
            }
        }
    }

    public RemotingCommand invokeWithInterceptor(Channel channel, RemotingCommand remotingCommand, long j) {
        remotingCommand.setTrafficType(TrafficType.REQUEST_SYNC);
        String extractRemoteAddress = RemotingUtil.extractRemoteAddress(channel);
        this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, extractRemoteAddress, remotingCommand));
        return invoke0(extractRemoteAddress, channel, remotingCommand, j);
    }

    private RemotingCommand invoke0(String str, Channel channel, RemotingCommand remotingCommand, long j) {
        try {
            int requestId = remotingCommand.getRequestId();
            ResponseFuture responseFuture = new ResponseFuture(requestId, j);
            responseFuture.setRequestCommand(remotingCommand);
            responseFuture.setRemoteAddr(str);
            this.ackTables.put(Integer.valueOf(requestId), responseFuture);
            writeAndFlush(channel, remotingCommand, channelFuture -> {
                if (channelFuture.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                }
                responseFuture.setSendRequestOK(false);
                this.ackTables.remove(Integer.valueOf(requestId));
                responseFuture.setCause(new RemotingAccessException(RemotingUtil.extractRemoteAddress(channel), channelFuture.cause()));
                responseFuture.putResponse(null);
                log.warn("Send request command to {} failed !", str);
            });
            RemotingCommand waitResponse = responseFuture.waitResponse(j);
            if (null != waitResponse) {
                return waitResponse;
            }
            if (!responseFuture.isSendRequestOK()) {
                throw responseFuture.getCause();
            }
            responseFuture.setCause(new RemotingTimeoutException(RemotingUtil.extractRemoteAddress(channel), j));
            throw responseFuture.getCause();
        } finally {
            this.ackTables.remove(Integer.valueOf(remotingCommand.getRequestId()));
        }
    }

    public void invokeAsyncWithInterceptor(Channel channel, RemotingCommand remotingCommand, AsyncHandler asyncHandler, long j) {
        remotingCommand.setTrafficType(TrafficType.REQUEST_ASYNC);
        String extractRemoteAddress = RemotingUtil.extractRemoteAddress(channel);
        this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, extractRemoteAddress, remotingCommand));
        invokeAsync0(extractRemoteAddress, channel, remotingCommand, asyncHandler, j);
    }

    private void invokeAsync0(String str, Channel channel, RemotingCommand remotingCommand, AsyncHandler asyncHandler, long j) {
        if (!this.semaphoreAsync.tryAcquire()) {
            String format = String.format("No available async semaphore to issue the request request %s", remotingCommand.toString());
            requestFail(new ResponseFuture(remotingCommand.getRequestId(), j, asyncHandler, null), new SemaphoreExhaustedException(format));
            log.error(format);
            return;
        }
        int requestId = remotingCommand.getRequestId();
        ResponseFuture responseFuture = new ResponseFuture(requestId, j, asyncHandler, new SemaphoreReleaseOnlyOnce(this.semaphoreAsync));
        responseFuture.setRequestCommand(remotingCommand);
        responseFuture.setRemoteAddr(str);
        this.ackTables.put(Integer.valueOf(requestId), responseFuture);
        try {
            writeAndFlush(channel, remotingCommand, channelFuture -> {
                responseFuture.setSendRequestOK(channelFuture.isSuccess());
                if (channelFuture.isSuccess()) {
                    return;
                }
                requestFail(requestId, new RemotingAccessException(RemotingUtil.extractRemoteAddress(channel), channelFuture.cause()));
                log.warn("Send request command to channel {} failed.", str);
            });
        } catch (Exception e) {
            requestFail(requestId, new RemotingAccessException(RemotingUtil.extractRemoteAddress(channel), e));
            log.error("Send request command to channel " + channel + " error !", e);
        }
    }

    public void invokeOnewayWithInterceptor(Channel channel, RemotingCommand remotingCommand) {
        remotingCommand.setTrafficType(TrafficType.REQUEST_ONEWAY);
        this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, RemotingUtil.extractRemoteAddress(channel), remotingCommand));
        invokeOneway0(channel, remotingCommand);
    }

    private void invokeOneway0(Channel channel, RemotingCommand remotingCommand) {
        if (!this.semaphoreOneway.tryAcquire()) {
            log.error(String.format("No available oneway semaphore to issue the request %s", remotingCommand.toString()));
            return;
        }
        SemaphoreReleaseOnlyOnce semaphoreReleaseOnlyOnce = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
        try {
            SocketAddress remoteAddress = channel.remoteAddress();
            writeAndFlush(channel, remotingCommand, channelFuture -> {
                semaphoreReleaseOnlyOnce.release();
                if (channelFuture.isSuccess()) {
                    return;
                }
                log.warn("Send request command to channel {} failed !", remoteAddress);
            });
        } catch (Exception e) {
            semaphoreReleaseOnlyOnce.release();
            log.error("Send request command to channel " + channel + " error !", e);
        }
    }

    private void requestFail(int i, RemotingRuntimeException remotingRuntimeException) {
        ResponseFuture remove = this.ackTables.remove(Integer.valueOf(i));
        if (remove != null) {
            remove.setSendRequestOK(false);
            remove.putResponse(null);
            remove.setCause(remotingRuntimeException);
            executeAsyncHandler(remove);
        }
    }

    private void requestFail(ResponseFuture responseFuture, RemotingRuntimeException remotingRuntimeException) {
        responseFuture.setCause(remotingRuntimeException);
        executeAsyncHandler(responseFuture);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processMessageReceived(ChannelHandlerContext channelHandlerContext, RemotingChannel remotingChannel, RemotingCommand remotingCommand) throws Exception {
        if (remotingCommand != null) {
            switch (AnonymousClass1.$SwitchMap$link$thingscloud$netty$remoting$api$command$TrafficType[remotingCommand.getTrafficType().ordinal()]) {
                case RemotingSysResponseCode.SYSTEM_ERROR /* 1 */:
                case RemotingSysResponseCode.SYSTEM_BUSY /* 2 */:
                case RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED /* 3 */:
                    processRequestCommand(channelHandlerContext, remotingChannel, remotingCommand);
                    return;
                case 4:
                    processResponseCommand(channelHandlerContext, remotingChannel, remotingCommand);
                    return;
                default:
                    log.warn("The traffic type {} is NOT supported!", remotingCommand.getTrafficType());
                    return;
            }
        }
    }

    public void processRequestCommand(ChannelHandlerContext channelHandlerContext, RemotingChannel remotingChannel, RemotingCommand remotingCommand) {
        RequestProcessor requestProcessor = this.processorTables.get(Integer.valueOf(remotingCommand.getCmdCode()));
        if (requestProcessor == null) {
            RemotingCommand createResponse = commandFactory().createResponse(remotingCommand);
            createResponse.setOpCode(3);
            writeAndFlush(channelHandlerContext.channel(), createResponse);
            log.warn("The command code {} is NOT supported!", Integer.valueOf(remotingCommand.getCmdCode()));
            return;
        }
        try {
            this.publicExecutor.submit(buildProcessorTask(channelHandlerContext, remotingCommand, requestProcessor, remotingChannel));
        } catch (RejectedExecutionException e) {
            log.warn(String.format("Request %s from %s is rejected by server executor %s !", remotingCommand, RemotingUtil.extractRemoteAddress(channelHandlerContext.channel()), this.publicExecutor));
            if (remotingCommand.getTrafficType() != TrafficType.REQUEST_ONEWAY) {
                RemotingCommand createResponse2 = this.remotingCommandFactory.createResponse(remotingCommand);
                createResponse2.setOpCode(2);
                createResponse2.setRemark("SYSTEM_BUSY");
                writeAndFlush(channelHandlerContext.channel(), createResponse2);
            }
        }
    }

    private void processResponseCommand(ChannelHandlerContext channelHandlerContext, RemotingChannel remotingChannel, RemotingCommand remotingCommand) {
        ResponseFuture remove = this.ackTables.remove(Integer.valueOf(remotingCommand.getRequestId()));
        if (remove == null) {
            log.warn("Response {} from {} doesn't have a matched request!", remotingCommand, RemotingUtil.extractRemoteAddress(channelHandlerContext.channel()));
            return;
        }
        remove.setResponseCommand(remotingCommand);
        remove.release();
        this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST, RemotingUtil.extractRemoteAddress(channelHandlerContext.channel()), remove.getRequestCommand(), remotingCommand));
        if (remove.getAsyncHandler() != null) {
            executeAsyncHandler(remove);
        } else {
            remove.putResponse(remotingCommand);
            remove.release();
        }
    }

    private void executeAsyncHandler(ResponseFuture responseFuture) {
        boolean z = false;
        ExecutorService executorService = this.asyncHandlerExecutor;
        if (executorService != null) {
            try {
                executorService.submit(() -> {
                    try {
                        responseFuture.executeAsyncHandler();
                    } catch (Throwable th) {
                        log.warn("Execute async handler in specific executor exception, ", th);
                    } finally {
                        responseFuture.release();
                    }
                });
            } catch (Throwable th) {
                z = true;
                log.warn("Execute async handler in executor exception, maybe the executor is busy now", th);
            }
        } else {
            z = true;
        }
        try {
            if (z) {
                try {
                    responseFuture.executeAsyncHandler();
                    responseFuture.release();
                } catch (Throwable th2) {
                    log.warn("Execute async handler in current thread exception", th2);
                    responseFuture.release();
                }
            }
        } catch (Throwable th3) {
            responseFuture.release();
            throw th3;
        }
    }

    private Runnable buildProcessorTask(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, RequestProcessor requestProcessor, RemotingChannel remotingChannel) {
        return () -> {
            try {
                this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.RESPONSE, RemotingUtil.extractRemoteAddress(channelHandlerContext.channel()), remotingCommand));
                RemotingCommand processRequest = requestProcessor.processRequest(remotingChannel, remotingCommand);
                this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.RESPONSE, RemotingUtil.extractRemoteAddress(channelHandlerContext.channel()), remotingCommand, processRequest));
                handleResponse(processRequest, remotingCommand, channelHandlerContext);
            } catch (Throwable th) {
                log.error(String.format("Process request %s error !", remotingCommand.toString()), th);
                handleException(th, remotingCommand, channelHandlerContext);
            }
        };
    }

    private void handleResponse(RemotingCommand remotingCommand, RemotingCommand remotingCommand2, ChannelHandlerContext channelHandlerContext) {
        if (remotingCommand2.getTrafficType() == TrafficType.REQUEST_ONEWAY || remotingCommand == null) {
            return;
        }
        try {
            writeAndFlush(channelHandlerContext.channel(), remotingCommand);
        } catch (Throwable th) {
            log.error(String.format("Process request %s success, but transfer response %s failed !", remotingCommand2, remotingCommand), th);
        }
    }

    private void handleException(Throwable th, RemotingCommand remotingCommand, ChannelHandlerContext channelHandlerContext) {
        if (remotingCommand.getTrafficType() != TrafficType.REQUEST_ONEWAY) {
            RemotingCommand createResponse = this.remotingCommandFactory.createResponse(remotingCommand);
            createResponse.setOpCode(1);
            createResponse.setRemark("SYSTEM_ERROR");
            writeAndFlush(channelHandlerContext.channel(), createResponse);
        }
    }

    private void writeAndFlush(Channel channel, Object obj) {
        if (this.binary) {
            channel.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(CodecHelper.encodeBytes(obj))));
        } else {
            channel.writeAndFlush(new TextWebSocketFrame(CodecHelper.encode(obj)));
        }
    }

    private void writeAndFlush(Channel channel, Object obj, ChannelFutureListener channelFutureListener) {
        if (this.binary) {
            channel.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(CodecHelper.encodeBytes(obj)))).addListener(channelFutureListener);
        } else {
            channel.writeAndFlush(new TextWebSocketFrame(CodecHelper.encode(obj))).addListener(channelFutureListener);
        }
    }
}
