/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluss.kafka;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.fluss.rpc.netty.server.RpcRequest;
import org.apache.fluss.rpc.protocol.RequestType;
import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.fluss.shaded.netty4.io.netty.util.ReferenceCountUtil;
import org.apache.kafka.common.message.ResponseHeaderData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.protocol.Writable;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;

public class KafkaRequest
implements RpcRequest {
    private static final AtomicLong ID_GENERATOR = new AtomicLong(0L);
    private final ApiKeys apiKey;
    private final short apiVersion;
    private final long requestId = ID_GENERATOR.getAndIncrement();
    private final RequestHeader header;
    private final AbstractRequest request;
    private final ByteBuf buffer;
    private final ChannelHandlerContext ctx;
    private final long startTimeMs;
    private final CompletableFuture<AbstractResponse> future;
    private volatile boolean cancelled = false;

    protected KafkaRequest(ApiKeys apiKey, short apiVersion, RequestHeader header, AbstractRequest request, ByteBuf buffer, ChannelHandlerContext ctx, CompletableFuture<AbstractResponse> future) {
        this.apiKey = apiKey;
        this.apiVersion = apiVersion;
        this.header = header;
        this.request = request;
        this.buffer = buffer.retain();
        this.ctx = ctx;
        this.startTimeMs = System.currentTimeMillis();
        this.future = future;
    }

    public RequestType getRequestType() {
        return RequestType.KAFKA;
    }

    public void releaseBuffer() {
        ReferenceCountUtil.safeRelease((Object)this.buffer);
    }

    public ApiKeys apiKey() {
        return this.apiKey;
    }

    public short apiVersion() {
        return this.apiVersion;
    }

    public long requestId() {
        return this.requestId;
    }

    public RequestHeader header() {
        return this.header;
    }

    public <T> T request() {
        return (T)this.request;
    }

    public ChannelHandlerContext ctx() {
        return this.ctx;
    }

    public long startTimeMs() {
        return this.startTimeMs;
    }

    public CompletableFuture<AbstractResponse> future() {
        return this.future;
    }

    public void complete(AbstractResponse response) {
        this.future.complete(response);
    }

    public void fail(Throwable t) {
        this.future.completeExceptionally(t);
    }

    public void cancel() {
        this.cancelled = true;
    }

    public boolean cancelled() {
        return this.cancelled;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ByteBuf responseBuffer() {
        try {
            AbstractResponse response = this.future.join();
            ByteBuf byteBuf = this.serialize(response);
            return byteBuf;
        }
        catch (Throwable t) {
            AbstractResponse response = this.request.getErrorResponse(t);
            ByteBuf byteBuf = this.serialize(response);
            return byteBuf;
        }
        finally {
            this.releaseBuffer();
        }
    }

    private ByteBuf serialize(AbstractResponse response) {
        ObjectSerializationCache cache = new ObjectSerializationCache();
        ResponseHeader responseHeader = this.header.toResponseHeader();
        short headerVersion = responseHeader.headerVersion();
        short apiVersion = this.request.version();
        ResponseHeaderData headerData = responseHeader.data();
        int headerSize = headerData.size(cache, headerVersion);
        ApiMessage apiMessage = response.data();
        int messageSize = apiMessage.size(cache, apiVersion);
        ByteBuf buffer = this.ctx.alloc().buffer(headerSize + messageSize);
        buffer.writerIndex(headerSize + messageSize);
        ByteBuffer nioBuffer = buffer.nioBuffer();
        ByteBufferAccessor writable = new ByteBufferAccessor(nioBuffer);
        headerData.write((Writable)writable, cache, headerVersion);
        apiMessage.write((Writable)writable, cache, apiVersion);
        return buffer;
    }
}

