/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluss.kafka;

import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.fluss.kafka.KafkaRequest;
import org.apache.fluss.rpc.netty.server.RequestChannel;
import org.apache.fluss.rpc.netty.server.RpcRequest;
import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.fluss.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.fluss.shaded.netty4.io.netty.handler.timeout.IdleState;
import org.apache.fluss.shaded.netty4.io.netty.handler.timeout.IdleStateEvent;
import org.apache.fluss.shaded.netty4.io.netty.util.ReferenceCountUtil;
import org.apache.fluss.utils.MathUtils;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.RequestAndSize;
import org.apache.kafka.common.requests.RequestHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaCommandDecoder
extends SimpleChannelInboundHandler<ByteBuf> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaCommandDecoder.class);
    private final RequestChannel[] requestChannels;
    private final int numChannels;
    private final ConcurrentLinkedDeque<KafkaRequest> inflightResponses = new ConcurrentLinkedDeque();
    protected final AtomicBoolean isActive = new AtomicBoolean(true);
    protected volatile ChannelHandlerContext ctx;
    protected SocketAddress remoteAddress;

    public KafkaCommandDecoder(RequestChannel[] requestChannels) {
        super(false);
        this.requestChannels = requestChannels;
        this.numChannels = requestChannels.length;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        CompletableFuture<AbstractResponse> future = new CompletableFuture<AbstractResponse>();
        boolean needRelease = false;
        try {
            KafkaRequest request = KafkaCommandDecoder.parseRequest(ctx, future, buffer);
            this.inflightResponses.addLast(request);
            future.whenCompleteAsync((r, t) -> this.sendResponse(ctx), (Executor)ctx.executor());
            int channelIndex = MathUtils.murmurHash((int)ctx.channel().id().asLongText().hashCode()) % this.numChannels;
            this.requestChannels[channelIndex].putRequest((RpcRequest)request);
            if (!this.isActive.get()) {
                LOG.warn("Received a request on an inactive channel: {}", (Object)this.remoteAddress);
                request.fail((Throwable)new LeaderNotAvailableException("Channel is inactive"));
                needRelease = true;
            }
        }
        catch (Throwable t2) {
            needRelease = true;
            LOG.error("Error handling request", t2);
            future.completeExceptionally(t2);
        }
        finally {
            if (needRelease) {
                ReferenceCountUtil.release((Object)buffer);
            }
        }
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.ctx = ctx;
        this.remoteAddress = ctx.channel().remoteAddress();
        this.isActive.set(true);
        LOG.info("New connection from {}", (Object)ctx.channel().remoteAddress());
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        LOG.info("Connection closed from {}", (Object)ctx.channel().remoteAddress());
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event;
        if (evt instanceof IdleStateEvent && (event = (IdleStateEvent)evt).state().equals((Object)IdleState.ALL_IDLE)) {
            LOG.warn("Connection {} is idle, closing...", (Object)ctx.channel().remoteAddress());
            ctx.close();
        }
    }

    private void sendResponse(ChannelHandlerContext ctx) {
        KafkaRequest request;
        while ((request = this.inflightResponses.peekFirst()) != null) {
            ProduceRequest produceRequest;
            CompletableFuture<AbstractResponse> f = request.future();
            ApiKeys apiKey = request.apiKey();
            boolean isDone = f.isDone();
            boolean cancelled = request.cancelled();
            if (apiKey.equals((Object)ApiKeys.PRODUCE) && (produceRequest = (ProduceRequest)request.request()).acks() == 0 && isDone) {
                this.inflightResponses.pollFirst();
                request.releaseBuffer();
                continue;
            }
            if (!isDone) break;
            if (cancelled) {
                this.inflightResponses.pollFirst();
                request.releaseBuffer();
                continue;
            }
            this.inflightResponses.pollFirst();
            if (this.isActive.get()) {
                ByteBuf buffer = request.responseBuffer();
                ctx.writeAndFlush((Object)buffer);
                continue;
            }
            request.releaseBuffer();
        }
    }

    protected void close() {
        this.isActive.set(false);
        this.ctx.close();
        LOG.warn("Close channel {} with {} pending requests.", (Object)this.remoteAddress, (Object)this.inflightResponses.size());
        for (KafkaRequest request : this.inflightResponses) {
            request.cancel();
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOG.error("Exception caught on channel {}", (Object)this.remoteAddress, (Object)cause);
        this.close();
    }

    private static KafkaRequest parseRequest(ChannelHandlerContext ctx, CompletableFuture<AbstractResponse> future, ByteBuf buffer) {
        ByteBuffer nioBuffer = buffer.nioBuffer();
        RequestHeader header = RequestHeader.parse((ByteBuffer)nioBuffer);
        if (KafkaCommandDecoder.isUnsupportedApiVersionRequest(header)) {
            ApiVersionsRequest request = (ApiVersionsRequest)new ApiVersionsRequest.Builder(header.apiVersion()).build();
            return new KafkaRequest(ApiKeys.API_VERSIONS, header.apiVersion(), header, (AbstractRequest)request, buffer, ctx, future);
        }
        RequestAndSize request = AbstractRequest.parseRequest((ApiKeys)header.apiKey(), (short)header.apiVersion(), (ByteBuffer)nioBuffer);
        return new KafkaRequest(header.apiKey(), header.apiVersion(), header, request.request, buffer, ctx, future);
    }

    private static boolean isUnsupportedApiVersionRequest(RequestHeader header) {
        return header.apiKey() == ApiKeys.API_VERSIONS && !ApiKeys.API_VERSIONS.isVersionSupported(header.apiVersion());
    }
}

