package org.apache.flink.statefun.flink.core.nettyclient;

import java.io.Closeable;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.ReadOnlyHttpHeaders;
import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
import org.apache.flink.statefun.flink.core.nettyclient.exceptions.RequestTimeoutException;
import org.apache.flink.statefun.flink.core.nettyclient.exceptions.ShutdownException;
import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary;
import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/nettyclient/NettyRequest.class */
final class NettyRequest {
    private static final Logger LOG = LoggerFactory.getLogger(NettyRequest.class);
    private static final AtomicReferenceFieldUpdater<NettyRequest, Channel> ATTEMPT_CHANNEL_CAS = AtomicReferenceFieldUpdater.newUpdater(NettyRequest.class, Channel.class, "attemptChannel");
    private final NettyClientService client;
    private final RemoteInvocationMetrics metrics;
    private final ToFunctionRequestSummary reqSummary;
    private final ToFunction toFunction;
    private final long requestCreatedNanos;
    private final CompletableFuture<FromFunction> result = new CompletableFuture<>();
    private long attemptStartedNanos;
    private int numberOfAttempts;

    @Nullable
    private Closeable retryTask;

    @Nullable
    private volatile Channel attemptChannel;

    /* JADX INFO: Access modifiers changed from: package-private */
    @OnFlinkThread
    public NettyRequest(NettyClientService nettyClientService, RemoteInvocationMetrics remoteInvocationMetrics, ToFunctionRequestSummary toFunctionRequestSummary, ToFunction toFunction) {
        this.client = (NettyClientService) Objects.requireNonNull(nettyClientService);
        this.reqSummary = (ToFunctionRequestSummary) Objects.requireNonNull(toFunctionRequestSummary);
        this.metrics = (RemoteInvocationMetrics) Objects.requireNonNull(remoteInvocationMetrics);
        this.toFunction = (ToFunction) Objects.requireNonNull(toFunction);
        this.requestCreatedNanos = nettyClientService.systemNanoTime();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @OnFlinkThread
    public CompletableFuture<FromFunction> start() {
        this.client.runOnEventLoop(this::startAttempt);
        return this.result;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @OnChannelThread
    public void complete(FromFunction fromFunction) {
        try {
            onAttemptCompleted();
        } catch (Throwable th) {
            LOG.warn("Attempt cleanup failed", th);
        }
        onFinalCompleted(fromFunction, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @OnClientThread
    @OnChannelThread
    public void completeAttemptExceptionally(Throwable th) {
        try {
            onAttemptCompleted();
        } catch (Throwable th2) {
            LOG.warn("Attempt cleanup failed", th2);
        }
        try {
            onAttemptCompletedExceptionally(th);
        } catch (Throwable th3) {
            onFinalCompleted(null, th3);
        }
    }

    @OnClientThread
    private void startAttempt() {
        try {
            this.attemptStartedNanos = this.client.systemNanoTime();
            this.client.acquireChannel(this::onChannelAcquisitionComplete);
        } catch (Throwable th) {
            completeAttemptExceptionally(th);
        }
    }

    @OnChannelThread
    private void onChannelAcquisitionComplete(Channel channel, Throwable th) {
        if (th != null) {
            completeAttemptExceptionally(th);
        } else if (ATTEMPT_CHANNEL_CAS.compareAndSet(this, null, channel)) {
            this.client.writeAndFlush(this, channel, this::onFirstWriteCompleted);
        } else {
            LOG.warn("BUG: Trying to acquire a new Netty channel, while still holding an existing one. Failing this request, but continuing processing others.");
            onFinalCompleted(null, new IllegalStateException("Unexpected request state, failing this request, but will try others."));
        }
    }

    @OnChannelThread
    private void onFirstWriteCompleted(Void r4, Throwable th) {
        if (th != null) {
            completeAttemptExceptionally(th);
        }
    }

    @OnClientThread
    @OnChannelThread
    private void onAttemptCompleted() {
        Channel andSet = ATTEMPT_CHANNEL_CAS.getAndSet(this, null);
        if (andSet != null) {
            this.client.releaseChannel(andSet);
        }
        long millis = TimeUnit.NANOSECONDS.toMillis(this.client.systemNanoTime() - this.attemptStartedNanos);
        this.attemptStartedNanos = 0L;
        this.metrics.remoteInvocationLatency(millis);
        IOUtils.closeQuietly(this.retryTask);
        this.retryTask = null;
        this.numberOfAttempts++;
    }

    @OnClientThread
    @OnChannelThread
    private void onAttemptCompletedExceptionally(Throwable th) throws Throwable {
        this.metrics.remoteInvocationFailures();
        LOG.warn("Exception caught while trying to deliver a message: (attempt #" + (this.numberOfAttempts - 1) + ")" + this.reqSummary, th);
        if (this.client.isShutdown()) {
            throw ShutdownException.INSTANCE;
        }
        long delayUntilNextAttempt = delayUntilNextAttempt();
        if (delayUntilNextAttempt < 0) {
            throw RequestTimeoutException.INSTANCE;
        }
        analyzeCausalChain(th);
        LOG.info("Retry #" + this.numberOfAttempts + StringUtils.SPACE + this.reqSummary + " ,About to sleep for " + TimeUnit.NANOSECONDS.toMillis(delayUntilNextAttempt));
        Preconditions.checkState(this.retryTask == null);
        this.retryTask = this.client.newTimeout(this::onAttemptBackoffTimer, delayUntilNextAttempt);
    }

    @OnClientThread
    private void onAttemptBackoffTimer() {
        if (delayUntilNextAttempt() < 0) {
            completeAttemptExceptionally(RequestTimeoutException.INSTANCE);
        } else if (this.client.isShutdown()) {
            completeAttemptExceptionally(ShutdownException.INSTANCE);
        } else {
            startAttempt();
        }
    }

    @OnClientThread
    @OnChannelThread
    private void onFinalCompleted(FromFunction fromFunction, Throwable th) {
        if (th != null) {
            this.result.completeExceptionally(th);
        } else {
            this.result.complete(fromFunction);
        }
    }

    CompletableFuture<FromFunction> result() {
        return this.result;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long remainingRequestBudgetNanos() {
        return this.client.totalRequestBudgetInNanos() - (this.client.systemNanoTime() - this.requestCreatedNanos);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ToFunction toFunction() {
        return this.toFunction;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String uri() {
        return this.client.queryPath();
    }

    private void analyzeCausalChain(Throwable th) throws Throwable {
        while (th != null) {
            if (!isRetryable(th)) {
                throw th;
            }
            th = th.getCause();
        }
    }

    private boolean isRetryable(Throwable th) {
        return ((th instanceof ShutdownException) || (th instanceof RequestTimeoutException)) ? false : true;
    }

    private long delayUntilNextAttempt() {
        long remainingRequestBudgetNanos = remainingRequestBudgetNanos();
        if (remainingRequestBudgetNanos <= 1000000) {
            return -1L;
        }
        return Math.min(2000000 * (1 << this.numberOfAttempts), remainingRequestBudgetNanos);
    }

    public ReadOnlyHttpHeaders headers() {
        return this.client.headers();
    }
}
