package org.apache.hugegraph.computer.core.network.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hugegraph.computer.core.common.exception.TransportException;
import org.apache.hugegraph.computer.core.network.MessageHandler;
import org.apache.hugegraph.computer.core.network.TransportUtil;
import org.apache.hugegraph.computer.core.network.buffer.FileRegionBuffer;
import org.apache.hugegraph.computer.core.network.buffer.NetworkBuffer;
import org.apache.hugegraph.computer.core.network.message.AckMessage;
import org.apache.hugegraph.computer.core.network.message.DataMessage;
import org.apache.hugegraph.computer.core.network.message.FinishMessage;
import org.apache.hugegraph.computer.core.network.message.StartMessage;
import org.apache.hugegraph.computer.core.network.session.ServerSession;

/* loaded from: input_file:org/apache/hugegraph/computer/core/network/netty/NettyServerHandler.class */
public class NettyServerHandler extends AbstractNettyHandler {
    private static final long INITIAL_DELAY = 0;
    private final MessageHandler handler;
    private final ServerSession serverSession;
    private final ChannelFutureListenerOnWrite listenerOnWrite;
    private ScheduledFuture<?> respondAckTask;

    public NettyServerHandler(ServerSession serverSession, MessageHandler messageHandler) {
        this.serverSession = serverSession;
        this.handler = messageHandler;
        this.listenerOnWrite = new ChannelFutureListenerOnWrite(this.handler);
    }

    @Override // org.apache.hugegraph.computer.core.network.netty.AbstractNettyHandler
    protected void processStartMessage(ChannelHandlerContext channelHandlerContext, Channel channel, StartMessage startMessage) {
        this.serverSession.onRecvStateStart();
        ackStartMessage(channelHandlerContext);
    }

    @Override // org.apache.hugegraph.computer.core.network.netty.AbstractNettyHandler
    protected void processFinishMessage(ChannelHandlerContext channelHandlerContext, Channel channel, FinishMessage finishMessage) {
        if (this.serverSession.onRecvStateFinish(finishMessage.requestId())) {
            ackFinishMessage(channelHandlerContext, this.serverSession.finishId());
        }
    }

    @Override // org.apache.hugegraph.computer.core.network.netty.AbstractNettyHandler
    protected void processDataMessage(ChannelHandlerContext channelHandlerContext, Channel channel, DataMessage dataMessage) {
        NetworkBuffer body = dataMessage.body();
        try {
            int requestId = dataMessage.requestId();
            this.serverSession.onRecvData(requestId);
            if (body instanceof FileRegionBuffer) {
                processFileRegionBuffer(channelHandlerContext, channel, dataMessage, (FileRegionBuffer) body);
            } else {
                this.handler.handle(dataMessage.type(), dataMessage.partition(), dataMessage.body());
                this.serverSession.onHandledData(requestId);
            }
        } finally {
            body.release();
        }
    }

    private void processFileRegionBuffer(ChannelHandlerContext channelHandlerContext, Channel channel, DataMessage dataMessage, FileRegionBuffer fileRegionBuffer) {
        TransportUtil.setMaxBytesPerRead(channel, fileRegionBuffer.length());
        fileRegionBuffer.transformFromChannel((SocketChannel) channel, this.handler.genOutputPath(dataMessage.type(), dataMessage.partition())).addListener(channelFuture -> {
            try {
                if (channelFuture.isSuccess()) {
                    this.handler.handle(dataMessage.type(), dataMessage.partition(), dataMessage.body());
                    this.serverSession.onHandledData(dataMessage.requestId());
                } else {
                    exceptionCaught(channelHandlerContext, channelFuture.cause());
                }
                TransportUtil.setMaxBytesPerRead(channelFuture.channel(), 16);
                channelFuture.channel().unsafe().recvBufAllocHandle().reset(channelFuture.channel().config());
                dataMessage.release();
            } catch (Throwable th) {
                exceptionCaught(channelHandlerContext, th);
            }
        });
    }

    @Override // org.apache.hugegraph.computer.core.network.netty.AbstractNettyHandler
    protected void processAckMessage(ChannelHandlerContext channelHandlerContext, Channel channel, AckMessage ackMessage) {
        throw new UnsupportedOperationException("Server does not support processAckMessage()");
    }

    private void ackStartMessage(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.writeAndFlush(new AckMessage(0)).addListener(this.listenerOnWrite);
        this.serverSession.completeStateStart();
        this.handler.onStarted(TransportUtil.remoteConnectionId(channelHandlerContext.channel()));
        if (this.respondAckTask == null) {
            this.respondAckTask = channelHandlerContext.channel().eventLoop().scheduleWithFixedDelay(() -> {
                checkAndRespondAck(channelHandlerContext);
            }, INITIAL_DELAY, this.serverSession.minAckInterval(), TimeUnit.MILLISECONDS);
        }
    }

    private void ackFinishMessage(ChannelHandlerContext channelHandlerContext, int i) {
        channelHandlerContext.writeAndFlush(new AckMessage(i)).addListener(this.listenerOnWrite);
        this.serverSession.completeStateFinish();
        this.handler.onFinished(TransportUtil.remoteConnectionId(channelHandlerContext.channel()));
        if (this.respondAckTask != null) {
            this.respondAckTask.cancel(false);
            this.respondAckTask = null;
        }
    }

    private void ackDataMessage(ChannelHandlerContext channelHandlerContext, int i) {
        channelHandlerContext.writeAndFlush(new AckMessage(i)).addListener(this.listenerOnWrite);
        this.serverSession.onDataAckSent(i);
    }

    private void checkAndRespondAck(ChannelHandlerContext channelHandlerContext) {
        if (this.serverSession.needAckFinish()) {
            ackFinishMessage(channelHandlerContext, this.serverSession.finishId());
        } else if (this.serverSession.needAckData()) {
            ackDataMessage(channelHandlerContext, this.serverSession.maxHandledId());
        }
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.handler.onChannelActive(TransportUtil.remoteConnectionId(channelHandlerContext.channel()));
        super.channelActive(channelHandlerContext);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.handler.onChannelInactive(TransportUtil.remoteConnectionId(channelHandlerContext.channel()));
        super.channelInactive(channelHandlerContext);
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        Channel channel = channelHandlerContext.channel();
        this.handler.exceptionCaught(th instanceof TransportException ? (TransportException) th : new TransportException("%s when the server receive data from '%s'", th, th.getMessage(), TransportUtil.remoteAddress(channel)), TransportUtil.remoteConnectionId(channel));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hugegraph.computer.core.network.netty.AbstractNettyHandler
    public ServerSession session() {
        return this.serverSession;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hugegraph.computer.core.network.netty.AbstractNettyHandler
    public MessageHandler transportHandler() {
        return this.handler;
    }
}
