package org.apache.flink.statefun.flink.core.nettyclient;

import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelDuplexHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
import org.apache.flink.statefun.flink.core.nettyclient.exceptions.DisconnectedException;
import org.apache.flink.statefun.flink.core.nettyclient.exceptions.WrongHttpResponse;
import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.class */
public final class NettyRequestReplyHandler extends ChannelDuplexHandler {
    private final NettyRequestTimeoutTask requestDurationTracker = new NettyRequestTimeoutTask(this);

    @Nullable
    private NettyRequest inflightRequest;

    @Nullable
    private DefaultHttpHeaders cachedHeaders;

    @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelDuplexHandler, org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandler
    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
        if (!(obj instanceof NettyRequest)) {
            super.write(channelHandlerContext, obj, channelPromise);
            return;
        }
        NettyRequest nettyRequest = (NettyRequest) obj;
        if (this.inflightRequest != null) {
            IllegalStateException illegalStateException = new IllegalStateException("A Channel has not finished the previous request.");
            nettyRequest.completeAttemptExceptionally(illegalStateException);
            exceptionCaught(channelHandlerContext, illegalStateException);
            return;
        }
        this.inflightRequest = nettyRequest;
        ByteBuf byteBuf = null;
        try {
            ByteBufAllocator alloc = channelHandlerContext.channel().alloc();
            alloc.getClass();
            byteBuf = NettyProtobuf.serializeProtobuf(alloc::buffer, nettyRequest.toFunction());
            writeHttpRequest(channelHandlerContext, byteBuf, nettyRequest);
            scheduleRequestTimeout(channelHandlerContext, nettyRequest.remainingRequestBudgetNanos());
        } catch (Throwable th) {
            ReferenceCountUtil.safeRelease(byteBuf);
            exceptionCaught(channelHandlerContext, th);
        }
    }

    @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        FullHttpResponse fullHttpResponse = obj instanceof FullHttpResponse ? (FullHttpResponse) obj : null;
        try {
            try {
                readHttpMessage(fullHttpResponse);
                ReferenceCountUtil.release(fullHttpResponse);
            } catch (Throwable th) {
                exceptionCaught(channelHandlerContext, th);
                ReferenceCountUtil.release(fullHttpResponse);
            }
        } catch (Throwable th2) {
            ReferenceCountUtil.release(fullHttpResponse);
            throw th2;
        }
    }

    @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler, org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        this.requestDurationTracker.cancel();
        if (channelHandlerContext.channel().isActive()) {
            channelHandlerContext.channel().close().addListener2(future -> {
                tryComplete(null, th);
            });
        } else {
            tryComplete(null, th);
        }
    }

    private void writeHttpRequest(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, NettyRequest nettyRequest) {
        channelHandlerContext.writeAndFlush(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, nettyRequest.uri(), byteBuf, headers(nettyRequest, byteBuf), NettyHeaders.EMPTY));
    }

    private DefaultHttpHeaders headers(NettyRequest nettyRequest, ByteBuf byteBuf) {
        DefaultHttpHeaders defaultHttpHeaders;
        if (this.cachedHeaders != null) {
            defaultHttpHeaders = this.cachedHeaders;
        } else {
            defaultHttpHeaders = new DefaultHttpHeaders(false);
            defaultHttpHeaders.add(nettyRequest.headers());
            this.cachedHeaders = defaultHttpHeaders;
        }
        defaultHttpHeaders.remove(HttpHeaderNames.CONTENT_LENGTH);
        defaultHttpHeaders.add(HttpHeaderNames.CONTENT_LENGTH, Integer.valueOf(byteBuf.readableBytes()));
        return defaultHttpHeaders;
    }

    private void readHttpMessage(FullHttpResponse fullHttpResponse) {
        Preconditions.checkState(this.inflightRequest != null, "A read without a request");
        this.requestDurationTracker.cancel();
        Preconditions.checkState(fullHttpResponse != null, "Unexpected message type");
        validateFullHttpResponse(fullHttpResponse);
        tryComplete((FromFunction) NettyProtobuf.deserializeProtobuf(fullHttpResponse.content(), FromFunction.parser()), null);
    }

    public void onReleaseToPool() {
        this.requestDurationTracker.cancel();
        this.inflightRequest = null;
    }

    @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.requestDurationTracker.cancel();
        tryComplete(null, DisconnectedException.INSTANCE);
        super.channelInactive(channelHandlerContext);
    }

    @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler
    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.requestDurationTracker.cancel();
        super.channelUnregistered(channelHandlerContext);
    }

    private void validateFullHttpResponse(FullHttpResponse fullHttpResponse) {
        int code = fullHttpResponse.status().code();
        if (code < 200 || code >= 300) {
            throw new WrongHttpResponse("Unexpected response code " + code + " (" + fullHttpResponse.status().reasonPhrase() + ") ");
        }
        if (!fullHttpResponse.headers().containsValue(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM, true)) {
            throw new IllegalStateException("Unexpected content type " + fullHttpResponse.headers().get(HttpHeaderNames.CONTENT_TYPE));
        }
        Preconditions.checkState(fullHttpResponse.content() != null, "Unexpected empty HTTP response (no body)");
    }

    private void scheduleRequestTimeout(ChannelHandlerContext channelHandlerContext, long j) {
        this.requestDurationTracker.schedule(channelHandlerContext, Math.max(ThreadLocalRandom.current().nextLong(7000000L, 13000000L), j));
    }

    private void tryComplete(FromFunction fromFunction, Throwable th) {
        NettyRequest nettyRequest = this.inflightRequest;
        if (nettyRequest == null) {
            return;
        }
        this.inflightRequest = null;
        if (th != null) {
            nettyRequest.completeAttemptExceptionally(th);
        } else {
            nettyRequest.complete(fromFunction);
        }
    }
}
