package org.apache.paimon.service.network;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.service.network.messages.MessageBody;
import org.apache.paimon.service.network.messages.MessageSerializer;
import org.apache.paimon.service.network.messages.MessageType;
import org.apache.paimon.service.network.stats.ServiceRequestStats;
import org.apache.paimon.utils.ExceptionUtils;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
/* loaded from: input_file:org/apache/paimon/service/network/AbstractServerHandler.class */
public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractServerHandler.class);
    private static final long UNKNOWN_REQUEST_ID = -1;
    private final NetworkServer<REQ, RESP> server;
    private final MessageSerializer<REQ, RESP> serializer;
    protected final ExecutorService queryExecutor;
    private final ServiceRequestStats stats;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/service/network/AbstractServerHandler$AsyncRequestTask.class */
    public static class AsyncRequestTask<REQ extends MessageBody, RESP extends MessageBody> implements Runnable {
        private final AbstractServerHandler<REQ, RESP> handler;
        private final ChannelHandlerContext ctx;
        private final long requestId;
        private final REQ request;
        private final ServiceRequestStats stats;
        private final long creationNanos = System.nanoTime();

        /* loaded from: input_file:org/apache/paimon/service/network/AbstractServerHandler$AsyncRequestTask$RequestWriteListener.class */
        private class RequestWriteListener implements ChannelFutureListener {
            private RequestWriteListener() {
            }

            public void operationComplete(ChannelFuture channelFuture) {
                long convert = TimeUnit.MILLISECONDS.convert(System.nanoTime() - AsyncRequestTask.this.creationNanos, TimeUnit.NANOSECONDS);
                if (channelFuture.isSuccess()) {
                    AbstractServerHandler.LOG.debug("Request {} was successfully answered after {} ms.", AsyncRequestTask.this.request, Long.valueOf(convert));
                    AsyncRequestTask.this.stats.reportSuccessfulRequest(convert);
                } else {
                    AbstractServerHandler.LOG.debug("Request {} failed after {} ms", new Object[]{AsyncRequestTask.this.request, Long.valueOf(convert), channelFuture.cause()});
                    AsyncRequestTask.this.stats.reportFailedRequest();
                }
            }
        }

        AsyncRequestTask(AbstractServerHandler<REQ, RESP> abstractServerHandler, ChannelHandlerContext channelHandlerContext, long j, REQ req, ServiceRequestStats serviceRequestStats) {
            this.handler = (AbstractServerHandler) Preconditions.checkNotNull(abstractServerHandler);
            this.ctx = (ChannelHandlerContext) Preconditions.checkNotNull(channelHandlerContext);
            this.requestId = j;
            this.request = (REQ) Preconditions.checkNotNull(req);
            this.stats = (ServiceRequestStats) Preconditions.checkNotNull(serviceRequestStats);
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.ctx.channel().isActive()) {
                this.handler.handleRequest(this.requestId, this.request).whenComplete((messageBody, th) -> {
                    try {
                        if (th != null) {
                            if (!(th instanceof CompletionException)) {
                                throw th;
                            }
                            throw th.getCause();
                        }
                        if (messageBody == null) {
                            throw new BadRequestException(this.handler.getServerName(), "NULL returned for request with ID " + this.requestId + Path.CUR_DIR);
                        }
                        ByteBuf serializeResponse = MessageSerializer.serializeResponse(this.ctx.alloc(), this.requestId, messageBody);
                        int writeBufferHighWaterMark = this.ctx.channel().config().getWriteBufferHighWaterMark();
                        (serializeResponse.readableBytes() <= writeBufferHighWaterMark ? this.ctx.writeAndFlush(serializeResponse) : this.ctx.writeAndFlush(new ChunkedByteBuf(serializeResponse, writeBufferHighWaterMark))).addListener(new RequestWriteListener());
                    } catch (BadRequestException e) {
                        AbstractServerHandler.LOG.debug("Bad request (request ID = {})", Long.valueOf(this.requestId), e);
                        try {
                            this.stats.reportFailedRequest();
                            this.ctx.writeAndFlush(MessageSerializer.serializeRequestFailure(this.ctx.alloc(), this.requestId, e));
                        } catch (IOException e2) {
                            AbstractServerHandler.LOG.error("Failed to respond with the error after failed request", e2);
                        }
                    } catch (Throwable th) {
                        AbstractServerHandler.LOG.error("Error while handling request with ID {}", Long.valueOf(this.requestId), th);
                        try {
                            this.stats.reportFailedRequest();
                            this.ctx.writeAndFlush(MessageSerializer.serializeRequestFailure(this.ctx.alloc(), this.requestId, new RuntimeException("Failed request " + this.requestId + Path.CUR_DIR + System.lineSeparator() + " Caused by: " + ExceptionUtils.stringifyException(th))));
                        } catch (IOException e3) {
                            AbstractServerHandler.LOG.error("Failed to respond with the error after failed request", e3);
                        }
                    }
                });
            }
        }

        public String toString() {
            return "AsyncRequestTask{requestId=" + this.requestId + ", request=" + this.request + '}';
        }
    }

    public AbstractServerHandler(NetworkServer<REQ, RESP> networkServer, MessageSerializer<REQ, RESP> messageSerializer, ServiceRequestStats serviceRequestStats) {
        this.server = (NetworkServer) Preconditions.checkNotNull(networkServer);
        this.serializer = (MessageSerializer) Preconditions.checkNotNull(messageSerializer);
        this.queryExecutor = networkServer.getQueryExecutor();
        this.stats = (ServiceRequestStats) Preconditions.checkNotNull(serviceRequestStats);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getServerName() {
        return this.server.getServerName();
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) {
        this.stats.reportActiveConnection();
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        this.stats.reportInactiveConnection();
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        ByteBuf serializeServerFailure;
        try {
            try {
                ByteBuf byteBuf = (ByteBuf) obj;
                MessageType deserializeHeader = MessageSerializer.deserializeHeader(byteBuf);
                long requestId = MessageSerializer.getRequestId(byteBuf);
                LOG.trace("Handling request with ID {}", Long.valueOf(requestId));
                if (deserializeHeader == MessageType.REQUEST) {
                    REQ deserializeRequest = this.serializer.deserializeRequest(byteBuf);
                    this.stats.reportRequest();
                    this.queryExecutor.submit(new AsyncRequestTask(this, channelHandlerContext, requestId, deserializeRequest, this.stats));
                } else {
                    String str = "Unexpected message type " + deserializeHeader + ". Expected " + MessageType.REQUEST + Path.CUR_DIR;
                    ByteBuf serializeServerFailure2 = MessageSerializer.serializeServerFailure(channelHandlerContext.alloc(), new IllegalArgumentException(str));
                    LOG.debug(str);
                    channelHandlerContext.writeAndFlush(serializeServerFailure2);
                }
                ReferenceCountUtil.release(obj);
            } catch (Throwable th) {
                LOG.error("Error while handling request with ID [{}]", -1 == -1 ? Identifier.UNKNOWN_DATABASE : -1L, th);
                String stringifyException = ExceptionUtils.stringifyException(th);
                if (0 != 0) {
                    serializeServerFailure = MessageSerializer.serializeRequestFailure(channelHandlerContext.alloc(), -1L, new RuntimeException("Failed request with ID -1. Caused by: " + stringifyException));
                    this.stats.reportFailedRequest();
                } else {
                    serializeServerFailure = MessageSerializer.serializeServerFailure(channelHandlerContext.alloc(), new RuntimeException("Failed incoming message. Caused by: " + stringifyException));
                }
                channelHandlerContext.writeAndFlush(serializeServerFailure);
                ReferenceCountUtil.release(obj);
            }
        } catch (Throwable th2) {
            ReferenceCountUtil.release(obj);
            throw th2;
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        String str = "Exception in server pipeline. Caused by: " + ExceptionUtils.stringifyException(th);
        ByteBuf serializeServerFailure = MessageSerializer.serializeServerFailure(channelHandlerContext.alloc(), new RuntimeException(str));
        LOG.debug(str);
        channelHandlerContext.writeAndFlush(serializeServerFailure).addListener(ChannelFutureListener.CLOSE);
    }

    public abstract CompletableFuture<RESP> handleRequest(long j, REQ req);

    public abstract CompletableFuture<Void> shutdown();
}
