package org.apache.flink.statefun.flink.core.reqreply;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
import org.apache.flink.statefun.flink.core.common.PolyglotUtil;
import org.apache.flink.statefun.sdk.AsyncOperationResult;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
import org.apache.flink.statefun.sdk.state.PersistedValue;
import org.apache.flink.types.Either;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.class */
public final class RequestReplyFunction implements StatefulFunction {
    public static final Logger LOG = LoggerFactory.getLogger(RequestReplyFunction.class);
    private final FunctionType functionType;
    private final RequestReplyClient client;
    private final int maxNumBatchRequests;
    private boolean isFirstRequestSent;

    @Persisted
    private final PersistedValue<Integer> requestState;

    @Persisted
    private final PersistedAppendingBuffer<ToFunction.Invocation> batch;

    @Persisted
    private final PersistedRemoteFunctionValues managedStates;

    public RequestReplyFunction(FunctionType functionType, int i, RequestReplyClient requestReplyClient) {
        this(functionType, new PersistedRemoteFunctionValues(), i, requestReplyClient, false);
    }

    @VisibleForTesting
    RequestReplyFunction(FunctionType functionType, PersistedRemoteFunctionValues persistedRemoteFunctionValues, int i, RequestReplyClient requestReplyClient, boolean z) {
        this.requestState = PersistedValue.of("request-state", Integer.class);
        this.batch = PersistedAppendingBuffer.of("batch", ToFunction.Invocation.class);
        this.functionType = (FunctionType) Objects.requireNonNull(functionType);
        this.managedStates = (PersistedRemoteFunctionValues) Objects.requireNonNull(persistedRemoteFunctionValues);
        this.maxNumBatchRequests = i;
        this.client = (RequestReplyClient) Objects.requireNonNull(requestReplyClient);
        this.isFirstRequestSent = z;
    }

    public void invoke(Context context, Object obj) {
        InternalContext internalContext = (InternalContext) context;
        if (obj instanceof AsyncOperationResult) {
            onAsyncResult(internalContext, (AsyncOperationResult) obj);
        } else {
            onRequest(internalContext, (TypedValue) obj);
        }
    }

    private void onRequest(InternalContext internalContext, TypedValue typedValue) {
        ToFunction.Invocation.Builder singeInvocationBuilder = singeInvocationBuilder(internalContext, typedValue);
        int intValue = ((Integer) this.requestState.getOrDefault(-1)).intValue();
        if (intValue < 0) {
            this.requestState.set(0);
            sendToFunction(internalContext, singeInvocationBuilder);
            return;
        }
        this.batch.append(singeInvocationBuilder.m1138build());
        int i = intValue + 1;
        this.requestState.set(Integer.valueOf(i));
        internalContext.functionTypeMetrics().appendBacklogMessages(1);
        if (isMaxNumBatchRequestsExceeded(i)) {
            internalContext.awaitAsyncOperationComplete();
        }
    }

    private void onAsyncResult(InternalContext internalContext, AsyncOperationResult<ToFunction, FromFunction> asyncOperationResult) {
        if (asyncOperationResult.unknown()) {
            sendToFunction(internalContext, createRetryBatch((ToFunction) asyncOperationResult.metadata()));
            return;
        }
        if (asyncOperationResult.failure()) {
            throw new IllegalStateException("Failure forwarding a message to a remote function " + internalContext.self(), asyncOperationResult.throwable());
        }
        Either<FromFunction.InvocationResponse, FromFunction.IncompleteInvocationContext> unpackResponse = unpackResponse((FromFunction) asyncOperationResult.value());
        if (unpackResponse.isRight()) {
            handleIncompleteInvocationContextResponse(internalContext, (FromFunction.IncompleteInvocationContext) unpackResponse.right(), (ToFunction) asyncOperationResult.metadata());
        } else {
            handleInvocationResultResponse(internalContext, (FromFunction.InvocationResponse) unpackResponse.left());
        }
    }

    private static Either<FromFunction.InvocationResponse, FromFunction.IncompleteInvocationContext> unpackResponse(FromFunction fromFunction) {
        return fromFunction.hasIncompleteInvocationContext() ? Either.Right(fromFunction.getIncompleteInvocationContext()) : fromFunction.hasInvocationResult() ? Either.Left(fromFunction.getInvocationResult()) : Either.Left(FromFunction.InvocationResponse.getDefaultInstance());
    }

    private void handleIncompleteInvocationContextResponse(InternalContext internalContext, FromFunction.IncompleteInvocationContext incompleteInvocationContext, ToFunction toFunction) {
        this.managedStates.registerStates(incompleteInvocationContext.getMissingValuesList());
        sendToFunction(internalContext, createRetryBatch(toFunction));
    }

    private void handleInvocationResultResponse(InternalContext internalContext, FromFunction.InvocationResponse invocationResponse) {
        handleOutgoingMessages(internalContext, invocationResponse);
        handleOutgoingDelayedMessages(internalContext, invocationResponse);
        handleEgressMessages(internalContext, invocationResponse);
        this.managedStates.updateStateValues(invocationResponse.getStateMutationsList());
        int intValue = ((Integer) this.requestState.getOrDefault(-1)).intValue();
        if (intValue < 0) {
            throw new IllegalStateException("Got an unexpected async result");
        }
        if (intValue == 0) {
            this.requestState.clear();
            return;
        }
        ToFunction.InvocationBatchRequest.Builder nextBatch = getNextBatch();
        this.requestState.set(0);
        this.batch.clear();
        internalContext.functionTypeMetrics().consumeBacklogMessages(intValue);
        sendToFunction(internalContext, nextBatch);
    }

    private ToFunction.InvocationBatchRequest.Builder getNextBatch() {
        ToFunction.InvocationBatchRequest.Builder newBuilder = ToFunction.InvocationBatchRequest.newBuilder();
        newBuilder.addAllInvocations(this.batch.view());
        return newBuilder;
    }

    private ToFunction.InvocationBatchRequest.Builder createRetryBatch(ToFunction toFunction) {
        ToFunction.InvocationBatchRequest.Builder newBuilder = ToFunction.InvocationBatchRequest.newBuilder();
        newBuilder.addAllInvocations(toFunction.getInvocation().getInvocationsList());
        return newBuilder;
    }

    private void handleEgressMessages(Context context, FromFunction.InvocationResponse invocationResponse) {
        for (FromFunction.EgressMessage egressMessage : invocationResponse.getOutgoingEgressesList()) {
            context.send(new EgressIdentifier(egressMessage.getEgressNamespace(), egressMessage.getEgressType(), TypedValue.class), egressMessage.getArgument());
        }
    }

    private void handleOutgoingMessages(Context context, FromFunction.InvocationResponse invocationResponse) {
        for (FromFunction.Invocation invocation : invocationResponse.getOutgoingMessagesList()) {
            context.send(PolyglotUtil.polyglotAddressToSdkAddress(invocation.getTarget()), invocation.getArgument());
        }
    }

    private void handleOutgoingDelayedMessages(Context context, FromFunction.InvocationResponse invocationResponse) {
        for (FromFunction.DelayedInvocation delayedInvocation : invocationResponse.getDelayedInvocationsList()) {
            if (delayedInvocation.getIsCancellationRequest()) {
                handleDelayedMessageCancellation(context, delayedInvocation);
            } else {
                handleDelayedMessageSending(context, delayedInvocation);
            }
        }
    }

    private void handleDelayedMessageSending(Context context, FromFunction.DelayedInvocation delayedInvocation) {
        context.sendAfter(Duration.ofMillis(delayedInvocation.getDelayInMs()), PolyglotUtil.polyglotAddressToSdkAddress(delayedInvocation.getTarget()), delayedInvocation.getArgument());
    }

    private void handleDelayedMessageCancellation(Context context, FromFunction.DelayedInvocation delayedInvocation) {
        String cancellationToken = delayedInvocation.getCancellationToken();
        if (cancellationToken.isEmpty()) {
            throw new IllegalArgumentException("Can not handle a cancellation request without a cancellation token.");
        }
        context.cancelDelayedMessage(cancellationToken);
    }

    private static ToFunction.Invocation.Builder singeInvocationBuilder(Context context, TypedValue typedValue) {
        ToFunction.Invocation.Builder newBuilder = ToFunction.Invocation.newBuilder();
        if (context.caller() != null) {
            newBuilder.setCaller(PolyglotUtil.sdkAddressToPolyglotAddress(context.caller()));
        }
        newBuilder.setArgument(typedValue);
        return newBuilder;
    }

    private void sendToFunction(InternalContext internalContext, ToFunction.Invocation.Builder builder) {
        ToFunction.InvocationBatchRequest.Builder newBuilder = ToFunction.InvocationBatchRequest.newBuilder();
        newBuilder.addInvocations(builder);
        sendToFunction(internalContext, newBuilder);
    }

    private void sendToFunction(InternalContext internalContext, ToFunction.InvocationBatchRequest.Builder builder) {
        builder.setTarget(PolyglotUtil.sdkAddressToPolyglotAddress(internalContext.self()));
        this.managedStates.attachStateValues(builder);
        sendToFunction(internalContext, ToFunction.newBuilder().setInvocation(builder).m1091build());
    }

    private void sendToFunction(InternalContext internalContext, ToFunction toFunction) {
        CompletableFuture<FromFunction> call = this.client.call(new ToFunctionRequestSummary(internalContext.self(), toFunction.getSerializedSize(), toFunction.getInvocation().getStateCount(), toFunction.getInvocation().getInvocationsCount()), internalContext.functionTypeMetrics(), toFunction);
        if (this.isFirstRequestSent) {
            internalContext.registerAsyncOperation(toFunction, call);
            return;
        }
        LOG.info("Bootstrapping function {}. Blocking processing until first request is completed. Successive requests will be performed asynchronously.", this.functionType);
        this.isFirstRequestSent = true;
        onAsyncResult(internalContext, joinResponse(call, toFunction));
    }

    private boolean isMaxNumBatchRequestsExceeded(int i) {
        return this.maxNumBatchRequests > 0 && i >= this.maxNumBatchRequests;
    }

    private AsyncOperationResult<ToFunction, FromFunction> joinResponse(CompletableFuture<FromFunction> completableFuture, ToFunction toFunction) {
        try {
            return new AsyncOperationResult<>(toFunction, AsyncOperationResult.Status.SUCCESS, completableFuture.join(), (Throwable) null);
        } catch (Exception e) {
            return new AsyncOperationResult<>(toFunction, AsyncOperationResult.Status.FAILURE, (Object) null, e.getCause());
        }
    }
}
