package org.apache.flink.statefun.flink.core.nettyclient;

import java.io.Closeable;
import java.net.URI;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoop;
import org.apache.flink.shaded.netty4.io.netty.channel.pool.ChannelHealthChecker;
import org.apache.flink.shaded.netty4.io.netty.channel.pool.FixedChannelPool;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.ReadOnlyHttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFuture;
import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary;
import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/nettyclient/NettyClient.class */
final class NettyClient implements RequestReplyClient, NettyClientService {
    private final NettySharedResources shared;
    private final FixedChannelPool pool;
    private final Endpoint endpoint;
    private final ReadOnlyHttpHeaders headers;
    private final long totalRequestBudgetInNanos;
    private final EventLoop eventLoop;

    public static NettyClient from(NettySharedResources nettySharedResources, NettyRequestReplySpec nettyRequestReplySpec, URI uri) {
        Endpoint endpoint = new Endpoint(uri);
        long nanos = nettyRequestReplySpec.callTimeout.toNanos();
        ReadOnlyHttpHeaders defaultHeadersFor = NettyHeaders.defaultHeadersFor(endpoint.serviceAddress());
        Bootstrap mo563clone = nettySharedResources.bootstrap().mo563clone();
        mo563clone.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf((int) nettyRequestReplySpec.connectTimeout.toMillis()));
        mo563clone.option(ChannelOption.SO_KEEPALIVE, true);
        mo563clone.remoteAddress(endpoint.serviceAddress());
        FixedChannelPool fixedChannelPool = new FixedChannelPool(mo563clone, new HttpConnectionPoolManager(endpoint.useTls() ? nettySharedResources.sslContext() : null, nettyRequestReplySpec, endpoint.serviceAddress().getHostString(), endpoint.serviceAddress().getPort()), ChannelHealthChecker.ACTIVE, FixedChannelPool.AcquireTimeoutAction.FAIL, nettyRequestReplySpec.connectTimeout.toMillis(), nettyRequestReplySpec.connectionPoolMaxSize, Integer.MAX_VALUE, true, true);
        fixedChannelPool.getClass();
        nettySharedResources.registerClosable(fixedChannelPool::closeAsync);
        return new NettyClient(nettySharedResources, mo563clone.config2().group().next(), fixedChannelPool, endpoint, defaultHeadersFor, nanos);
    }

    private NettyClient(NettySharedResources nettySharedResources, EventLoop eventLoop, FixedChannelPool fixedChannelPool, Endpoint endpoint, ReadOnlyHttpHeaders readOnlyHttpHeaders, long j) {
        this.shared = (NettySharedResources) Objects.requireNonNull(nettySharedResources);
        this.eventLoop = (EventLoop) Objects.requireNonNull(eventLoop);
        this.pool = (FixedChannelPool) Objects.requireNonNull(fixedChannelPool);
        this.endpoint = (Endpoint) Objects.requireNonNull(endpoint);
        this.headers = (ReadOnlyHttpHeaders) Objects.requireNonNull(readOnlyHttpHeaders);
        this.totalRequestBudgetInNanos = j;
    }

    @Override // org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient
    public CompletableFuture<FromFunction> call(ToFunctionRequestSummary toFunctionRequestSummary, RemoteInvocationMetrics remoteInvocationMetrics, ToFunction toFunction) {
        return new NettyRequest(this, remoteInvocationMetrics, toFunctionRequestSummary, toFunction).start();
    }

    @Override // org.apache.flink.statefun.flink.core.nettyclient.NettyClientService
    public void acquireChannel(BiConsumer<Channel, Throwable> biConsumer) {
        this.pool.acquire().addListener2(future -> {
            Throwable cause = future.cause();
            if (cause != null) {
                biConsumer.accept(null, cause);
            } else {
                biConsumer.accept((Channel) future.getNow(), null);
            }
        });
    }

    @Override // org.apache.flink.statefun.flink.core.nettyclient.NettyClientService
    public void releaseChannel(Channel channel) {
        EventLoop eventLoop = channel.eventLoop();
        if (eventLoop.inEventLoop()) {
            releaseChannel0(channel);
        } else {
            eventLoop.execute(() -> {
                releaseChannel0(channel);
            });
        }
    }

    @Override // org.apache.flink.statefun.flink.core.nettyclient.NettyClientService
    public String queryPath() {
        return this.endpoint.queryPath();
    }

    @Override // org.apache.flink.statefun.flink.core.nettyclient.NettyClientService
    public ReadOnlyHttpHeaders headers() {
        return this.headers;
    }

    @Override // org.apache.flink.statefun.flink.core.nettyclient.NettyClientService
    public long totalRequestBudgetInNanos() {
        return this.totalRequestBudgetInNanos;
    }

    @Override // org.apache.flink.statefun.flink.core.nettyclient.NettyClientService
    public Closeable newTimeout(Runnable runnable, long j) {
        ScheduledFuture<?> schedule = this.eventLoop.schedule(runnable, j, TimeUnit.NANOSECONDS);
        return () -> {
            schedule.cancel(false);
        };
    }

    @Override // org.apache.flink.statefun.flink.core.nettyclient.NettyClientService
    public void runOnEventLoop(Runnable runnable) {
        Objects.requireNonNull(runnable);
        if (this.eventLoop.inEventLoop()) {
            runnable.run();
        } else {
            this.eventLoop.execute(runnable);
        }
    }

    @Override // org.apache.flink.statefun.flink.core.nettyclient.NettyClientService
    public boolean isShutdown() {
        return this.shared.isShutdown();
    }

    @Override // org.apache.flink.statefun.flink.core.nettyclient.NettyClientService
    public long systemNanoTime() {
        return System.nanoTime();
    }

    @Override // org.apache.flink.statefun.flink.core.nettyclient.NettyClientService
    public <T> void writeAndFlush(T t, Channel channel, BiConsumer<Void, Throwable> biConsumer) {
        channel.writeAndFlush(t).addListener2(future -> {
            biConsumer.accept(null, future.cause());
        });
    }

    private void releaseChannel0(Channel channel) {
        if (!channel.isActive()) {
            this.pool.release(channel);
        } else if (channel.attr(ChannelAttributes.EXPIRED).get() != Boolean.TRUE) {
            this.pool.release(channel);
        } else {
            channel.close().addListener2(future -> {
                this.pool.release(channel);
            });
        }
    }
}
