package io.openmessaging.storage.dledger;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.channel.ChannelHandlerContext;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.DLedgerRequestCode;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.GetEntriesRequest;
import io.openmessaging.storage.dledger.protocol.GetEntriesResponse;
import io.openmessaging.storage.dledger.protocol.HeartBeatRequest;
import io.openmessaging.storage.dledger.protocol.HeartBeatResponse;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse;
import io.openmessaging.storage.dledger.protocol.MetadataRequest;
import io.openmessaging.storage.dledger.protocol.MetadataResponse;
import io.openmessaging.storage.dledger.protocol.PullEntriesRequest;
import io.openmessaging.storage.dledger.protocol.PullEntriesResponse;
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
import io.openmessaging.storage.dledger.protocol.RequestOrResponse;
import io.openmessaging.storage.dledger.protocol.VoteRequest;
import io.openmessaging.storage.dledger.protocol.VoteResponse;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/dledger-0.2.2.jar:io/openmessaging/storage/dledger/DLedgerRpcNettyService.class */
public class DLedgerRpcNettyService extends DLedgerRpcService {
    private static Logger logger = LoggerFactory.getLogger((Class<?>) DLedgerRpcNettyService.class);
    private NettyRemotingServer remotingServer;
    private NettyRemotingClient remotingClient;
    private MemberState memberState;
    private DLedgerServer dLedgerServer;
    private ExecutorService futureExecutor = Executors.newFixedThreadPool(4, new ThreadFactory() { // from class: io.openmessaging.storage.dledger.DLedgerRpcNettyService.1
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "FutureExecutor_" + this.threadIndex.incrementAndGet());
        }
    });
    private ExecutorService voteInvokeExecutor = Executors.newCachedThreadPool(new ThreadFactory() { // from class: io.openmessaging.storage.dledger.DLedgerRpcNettyService.2
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "voteInvokeExecutor_" + this.threadIndex.incrementAndGet());
        }
    });
    private ExecutorService heartBeatInvokeExecutor = Executors.newCachedThreadPool(new ThreadFactory() { // from class: io.openmessaging.storage.dledger.DLedgerRpcNettyService.3
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "heartBeatInvokeExecutor_" + this.threadIndex.incrementAndGet());
        }
    });

    public DLedgerRpcNettyService(DLedgerServer dLedgerServer) {
        this.dLedgerServer = dLedgerServer;
        this.memberState = dLedgerServer.getMemberState();
        NettyRequestProcessor nettyRequestProcessor = new NettyRequestProcessor() { // from class: io.openmessaging.storage.dledger.DLedgerRpcNettyService.4
            @Override // org.apache.rocketmq.remoting.netty.NettyRequestProcessor
            public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws Exception {
                return DLedgerRpcNettyService.this.processRequest(channelHandlerContext, remotingCommand);
            }

            @Override // org.apache.rocketmq.remoting.netty.NettyRequestProcessor
            public boolean rejectRequest() {
                return false;
            }
        };
        NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(Integer.valueOf(this.memberState.getSelfAddr().split(":")[1]).intValue());
        this.remotingServer = new NettyRemotingServer(nettyServerConfig, null);
        this.remotingServer.registerProcessor(DLedgerRequestCode.METADATA.getCode(), nettyRequestProcessor, null);
        this.remotingServer.registerProcessor(DLedgerRequestCode.APPEND.getCode(), nettyRequestProcessor, null);
        this.remotingServer.registerProcessor(DLedgerRequestCode.GET.getCode(), nettyRequestProcessor, null);
        this.remotingServer.registerProcessor(DLedgerRequestCode.PULL.getCode(), nettyRequestProcessor, null);
        this.remotingServer.registerProcessor(DLedgerRequestCode.PUSH.getCode(), nettyRequestProcessor, null);
        this.remotingServer.registerProcessor(DLedgerRequestCode.VOTE.getCode(), nettyRequestProcessor, null);
        this.remotingServer.registerProcessor(DLedgerRequestCode.HEART_BEAT.getCode(), nettyRequestProcessor, null);
        this.remotingServer.registerProcessor(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), nettyRequestProcessor, null);
        this.remotingClient = new NettyRemotingClient(new NettyClientConfig(), null);
    }

    private String getPeerAddr(RequestOrResponse requestOrResponse) {
        return this.memberState.getPeerAddr(requestOrResponse.getRemoteId());
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerProtocol
    public CompletableFuture<HeartBeatResponse> heartBeat(HeartBeatRequest heartBeatRequest) throws Exception {
        CompletableFuture<HeartBeatResponse> completableFuture = new CompletableFuture<>();
        this.heartBeatInvokeExecutor.execute(() -> {
            try {
                RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(DLedgerRequestCode.HEART_BEAT.getCode(), null);
                createRequestCommand.setBody(JSON.toJSONBytes(heartBeatRequest, new SerializerFeature[0]));
                this.remotingClient.invokeAsync(getPeerAddr(heartBeatRequest), createRequestCommand, 3000L, responseFuture -> {
                    RemotingCommand responseCommand = responseFuture.getResponseCommand();
                    if (responseCommand != null) {
                        completableFuture.complete((HeartBeatResponse) JSON.parseObject(responseCommand.getBody(), HeartBeatResponse.class, new Feature[0]));
                    } else {
                        logger.error("HeartBeat request time out, {}", heartBeatRequest.baseInfo());
                        completableFuture.complete(new HeartBeatResponse().code(DLedgerResponseCode.NETWORK_ERROR.getCode()));
                    }
                });
            } catch (Throwable th) {
                logger.error("Send heartBeat request failed, {}", heartBeatRequest.baseInfo(), th);
                completableFuture.complete(new HeartBeatResponse().code(DLedgerResponseCode.NETWORK_ERROR.getCode()));
            }
        });
        return completableFuture;
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerProtocol
    public CompletableFuture<VoteResponse> vote(VoteRequest voteRequest) throws Exception {
        CompletableFuture<VoteResponse> completableFuture = new CompletableFuture<>();
        this.voteInvokeExecutor.execute(() -> {
            try {
                RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(DLedgerRequestCode.VOTE.getCode(), null);
                createRequestCommand.setBody(JSON.toJSONBytes(voteRequest, new SerializerFeature[0]));
                this.remotingClient.invokeAsync(getPeerAddr(voteRequest), createRequestCommand, 3000L, responseFuture -> {
                    RemotingCommand responseCommand = responseFuture.getResponseCommand();
                    if (responseCommand != null) {
                        completableFuture.complete((VoteResponse) JSON.parseObject(responseCommand.getBody(), VoteResponse.class, new Feature[0]));
                    } else {
                        logger.error("Vote request time out, {}", voteRequest.baseInfo());
                        completableFuture.complete(new VoteResponse());
                    }
                });
            } catch (Throwable th) {
                logger.error("Send vote request failed, {}", voteRequest.baseInfo(), th);
                completableFuture.complete(new VoteResponse());
            }
        });
        return completableFuture;
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerClientProtocol
    public CompletableFuture<GetEntriesResponse> get(GetEntriesRequest getEntriesRequest) throws Exception {
        GetEntriesResponse getEntriesResponse = new GetEntriesResponse();
        getEntriesResponse.setCode(DLedgerResponseCode.UNSUPPORTED.getCode());
        return CompletableFuture.completedFuture(getEntriesResponse);
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerClientProtocol
    public CompletableFuture<AppendEntryResponse> append(AppendEntryRequest appendEntryRequest) throws Exception {
        CompletableFuture<AppendEntryResponse> completableFuture = new CompletableFuture<>();
        try {
            RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(DLedgerRequestCode.APPEND.getCode(), null);
            createRequestCommand.setBody(JSON.toJSONBytes(appendEntryRequest, new SerializerFeature[0]));
            this.remotingClient.invokeAsync(getPeerAddr(appendEntryRequest), createRequestCommand, 3000L, responseFuture -> {
                RemotingCommand responseCommand = responseFuture.getResponseCommand();
                if (responseCommand != null) {
                    completableFuture.complete((AppendEntryResponse) JSON.parseObject(responseCommand.getBody(), AppendEntryResponse.class, new Feature[0]));
                    return;
                }
                AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
                appendEntryResponse.copyBaseInfo(appendEntryRequest);
                appendEntryResponse.setCode(DLedgerResponseCode.NETWORK_ERROR.getCode());
                completableFuture.complete(appendEntryResponse);
            });
        } catch (Throwable th) {
            logger.error("Send append request failed, {}", appendEntryRequest.baseInfo(), th);
            AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
            appendEntryResponse.copyBaseInfo(appendEntryRequest);
            appendEntryResponse.setCode(DLedgerResponseCode.NETWORK_ERROR.getCode());
            completableFuture.complete(appendEntryResponse);
        }
        return completableFuture;
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerClientProtocol
    public CompletableFuture<MetadataResponse> metadata(MetadataRequest metadataRequest) throws Exception {
        MetadataResponse metadataResponse = new MetadataResponse();
        metadataResponse.setCode(DLedgerResponseCode.UNSUPPORTED.getCode());
        return CompletableFuture.completedFuture(metadataResponse);
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerProtocol
    public CompletableFuture<PullEntriesResponse> pull(PullEntriesRequest pullEntriesRequest) throws Exception {
        RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(DLedgerRequestCode.PULL.getCode(), null);
        createRequestCommand.setBody(JSON.toJSONBytes(pullEntriesRequest, new SerializerFeature[0]));
        return CompletableFuture.completedFuture((PullEntriesResponse) JSON.parseObject(this.remotingClient.invokeSync(getPeerAddr(pullEntriesRequest), createRequestCommand, 3000L).getBody(), PullEntriesResponse.class, new Feature[0]));
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerProtocol
    public CompletableFuture<PushEntryResponse> push(PushEntryRequest pushEntryRequest) throws Exception {
        CompletableFuture<PushEntryResponse> completableFuture = new CompletableFuture<>();
        try {
            RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(DLedgerRequestCode.PUSH.getCode(), null);
            createRequestCommand.setBody(JSON.toJSONBytes(pushEntryRequest, new SerializerFeature[0]));
            this.remotingClient.invokeAsync(getPeerAddr(pushEntryRequest), createRequestCommand, 3000L, responseFuture -> {
                RemotingCommand responseCommand = responseFuture.getResponseCommand();
                if (responseCommand != null) {
                    completableFuture.complete((PushEntryResponse) JSON.parseObject(responseCommand.getBody(), PushEntryResponse.class, new Feature[0]));
                    return;
                }
                PushEntryResponse pushEntryResponse = new PushEntryResponse();
                pushEntryResponse.copyBaseInfo(pushEntryRequest);
                pushEntryResponse.setCode(DLedgerResponseCode.NETWORK_ERROR.getCode());
                completableFuture.complete(pushEntryResponse);
            });
        } catch (Throwable th) {
            logger.error("Send push request failed, {}", pushEntryRequest.baseInfo(), th);
            PushEntryResponse pushEntryResponse = new PushEntryResponse();
            pushEntryResponse.copyBaseInfo(pushEntryRequest);
            pushEntryResponse.setCode(DLedgerResponseCode.NETWORK_ERROR.getCode());
            completableFuture.complete(pushEntryResponse);
        }
        return completableFuture;
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerClientProtocol
    public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(LeadershipTransferRequest leadershipTransferRequest) throws Exception {
        CompletableFuture<LeadershipTransferResponse> completableFuture = new CompletableFuture<>();
        try {
            RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), null);
            createRequestCommand.setBody(JSON.toJSONBytes(leadershipTransferRequest, new SerializerFeature[0]));
            this.remotingClient.invokeAsync(getPeerAddr(leadershipTransferRequest), createRequestCommand, 3000L, responseFuture -> {
                if (responseFuture.getResponseCommand() != null) {
                    completableFuture.complete((LeadershipTransferResponse) JSON.parseObject(responseFuture.getResponseCommand().getBody(), LeadershipTransferResponse.class, new Feature[0]));
                    return;
                }
                LeadershipTransferResponse leadershipTransferResponse = new LeadershipTransferResponse();
                leadershipTransferResponse.copyBaseInfo(leadershipTransferRequest);
                leadershipTransferResponse.setCode(DLedgerResponseCode.NETWORK_ERROR.getCode());
                completableFuture.complete(leadershipTransferResponse);
            });
        } catch (Throwable th) {
            logger.error("Send leadershipTransfer request failed, {}", leadershipTransferRequest.baseInfo(), th);
            LeadershipTransferResponse leadershipTransferResponse = new LeadershipTransferResponse();
            leadershipTransferResponse.copyBaseInfo(leadershipTransferRequest);
            leadershipTransferResponse.setCode(DLedgerResponseCode.NETWORK_ERROR.getCode());
            completableFuture.complete(leadershipTransferResponse);
        }
        return completableFuture;
    }

    private void writeResponse(RequestOrResponse requestOrResponse, Throwable th, RemotingCommand remotingCommand, ChannelHandlerContext channelHandlerContext) {
        try {
            if (th != null) {
                throw th;
            }
            RemotingCommand handleResponse = handleResponse(requestOrResponse, remotingCommand);
            handleResponse.markResponseType();
            channelHandlerContext.writeAndFlush(handleResponse);
        } catch (Throwable th2) {
            logger.error("Process request over, but fire response failed, request:[{}] response:[{}]", remotingCommand, null, th2);
        }
    }

    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws Exception {
        switch (DLedgerRequestCode.valueOf(remotingCommand.getCode())) {
            case METADATA:
                handleMetadata((MetadataRequest) JSON.parseObject(remotingCommand.getBody(), MetadataRequest.class, new Feature[0])).whenCompleteAsync((metadataResponse, th) -> {
                    writeResponse(metadataResponse, th, remotingCommand, channelHandlerContext);
                }, (Executor) this.futureExecutor);
                return null;
            case APPEND:
                handleAppend((AppendEntryRequest) JSON.parseObject(remotingCommand.getBody(), AppendEntryRequest.class, new Feature[0])).whenCompleteAsync((appendEntryResponse, th2) -> {
                    writeResponse(appendEntryResponse, th2, remotingCommand, channelHandlerContext);
                }, (Executor) this.futureExecutor);
                return null;
            case GET:
                handleGet((GetEntriesRequest) JSON.parseObject(remotingCommand.getBody(), GetEntriesRequest.class, new Feature[0])).whenCompleteAsync((getEntriesResponse, th3) -> {
                    writeResponse(getEntriesResponse, th3, remotingCommand, channelHandlerContext);
                }, (Executor) this.futureExecutor);
                return null;
            case PULL:
                handlePull((PullEntriesRequest) JSON.parseObject(remotingCommand.getBody(), PullEntriesRequest.class, new Feature[0])).whenCompleteAsync((pullEntriesResponse, th4) -> {
                    writeResponse(pullEntriesResponse, th4, remotingCommand, channelHandlerContext);
                }, (Executor) this.futureExecutor);
                return null;
            case PUSH:
                handlePush((PushEntryRequest) JSON.parseObject(remotingCommand.getBody(), PushEntryRequest.class, new Feature[0])).whenCompleteAsync((pushEntryResponse, th5) -> {
                    writeResponse(pushEntryResponse, th5, remotingCommand, channelHandlerContext);
                }, (Executor) this.futureExecutor);
                return null;
            case VOTE:
                handleVote((VoteRequest) JSON.parseObject(remotingCommand.getBody(), VoteRequest.class, new Feature[0])).whenCompleteAsync((voteResponse, th6) -> {
                    writeResponse(voteResponse, th6, remotingCommand, channelHandlerContext);
                }, (Executor) this.futureExecutor);
                return null;
            case HEART_BEAT:
                handleHeartBeat((HeartBeatRequest) JSON.parseObject(remotingCommand.getBody(), HeartBeatRequest.class, new Feature[0])).whenCompleteAsync((heartBeatResponse, th7) -> {
                    writeResponse(heartBeatResponse, th7, remotingCommand, channelHandlerContext);
                }, (Executor) this.futureExecutor);
                return null;
            case LEADERSHIP_TRANSFER:
                long currentTimeMillis = System.currentTimeMillis();
                handleLeadershipTransfer((LeadershipTransferRequest) JSON.parseObject(remotingCommand.getBody(), LeadershipTransferRequest.class, new Feature[0])).whenCompleteAsync((leadershipTransferResponse, th8) -> {
                    writeResponse(leadershipTransferResponse, th8, remotingCommand, channelHandlerContext);
                    logger.info("LEADERSHIP_TRANSFER FINISHED. Request={}, response={}, cost={}ms", remotingCommand, leadershipTransferResponse, Long.valueOf(DLedgerUtils.elapsed(currentTimeMillis)));
                }, (Executor) this.futureExecutor);
                return null;
            default:
                logger.error("Unknown request code {} from {}", Integer.valueOf(remotingCommand.getCode()), remotingCommand);
                return null;
        }
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerClientProtocolHandler
    public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(LeadershipTransferRequest leadershipTransferRequest) throws Exception {
        return this.dLedgerServer.handleLeadershipTransfer(leadershipTransferRequest);
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerProtocolHander
    public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest heartBeatRequest) throws Exception {
        return this.dLedgerServer.handleHeartBeat(heartBeatRequest);
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerProtocolHander
    public CompletableFuture<VoteResponse> handleVote(VoteRequest voteRequest) throws Exception {
        return CompletableFuture.completedFuture(this.dLedgerServer.handleVote(voteRequest).get());
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerClientProtocolHandler
    public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest appendEntryRequest) throws Exception {
        return this.dLedgerServer.handleAppend(appendEntryRequest);
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerClientProtocolHandler
    public CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest getEntriesRequest) throws Exception {
        return this.dLedgerServer.handleGet(getEntriesRequest);
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerClientProtocolHandler
    public CompletableFuture<MetadataResponse> handleMetadata(MetadataRequest metadataRequest) throws Exception {
        return this.dLedgerServer.handleMetadata(metadataRequest);
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerProtocolHander
    public CompletableFuture<PullEntriesResponse> handlePull(PullEntriesRequest pullEntriesRequest) throws Exception {
        return this.dLedgerServer.handlePull(pullEntriesRequest);
    }

    @Override // io.openmessaging.storage.dledger.protocol.DLedgerProtocolHander
    public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest pushEntryRequest) throws Exception {
        return this.dLedgerServer.handlePush(pushEntryRequest);
    }

    public RemotingCommand handleResponse(RequestOrResponse requestOrResponse, RemotingCommand remotingCommand) {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(DLedgerResponseCode.SUCCESS.getCode(), null);
        createResponseCommand.setBody(JSON.toJSONBytes(requestOrResponse, new SerializerFeature[0]));
        createResponseCommand.setOpaque(remotingCommand.getOpaque());
        return createResponseCommand;
    }

    @Override // io.openmessaging.storage.dledger.DLedgerRpcService
    public void startup() {
        this.remotingServer.start();
        this.remotingClient.start();
    }

    @Override // io.openmessaging.storage.dledger.DLedgerRpcService
    public void shutdown() {
        this.remotingServer.shutdown();
        this.remotingClient.shutdown();
    }

    public MemberState getMemberState() {
        return this.memberState;
    }

    public void setMemberState(MemberState memberState) {
        this.memberState = memberState;
    }

    public DLedgerServer getdLedgerServer() {
        return this.dLedgerServer;
    }

    public void setdLedgerServer(DLedgerServer dLedgerServer) {
        this.dLedgerServer = dLedgerServer;
    }
}
