package org.apache.flink.statefun.flink.core.reqreply;

import com.google.protobuf.ByteString;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.statefun.flink.core.TestUtils;
import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
import org.apache.flink.statefun.flink.core.common.PolyglotUtil;
import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.AsyncOperationResult;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.class */
public class RequestReplyFunctionTest {
    private static final FunctionType FN_TYPE = new FunctionType("foo", "bar");
    private final FakeClient client = new FakeClient();
    private final FakeContext context = new FakeContext();
    private final RequestReplyFunction functionUnderTest = new RequestReplyFunction(testInitialRegisteredState("session", "com.foo.bar/myType"), 10, this.client);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest$BacklogTrackingMetrics.class */
    public static final class BacklogTrackingMetrics implements FunctionTypeMetrics {
        private int numBacklog;

        private BacklogTrackingMetrics() {
            this.numBacklog = 0;
        }

        public int numBacklog() {
            return this.numBacklog;
        }

        public void appendBacklogMessages(int i) {
            this.numBacklog += i;
        }

        public void consumeBacklogMessages(int i) {
            this.numBacklog -= i;
        }

        public void remoteInvocationFailures() {
        }

        public void remoteInvocationLatency(long j) {
        }

        public void asyncOperationRegistered() {
        }

        public void asyncOperationCompleted() {
        }

        public void incomingMessage() {
        }

        public void outgoingRemoteMessage() {
        }

        public void outgoingEgressMessage() {
        }

        public void outgoingLocalMessage() {
        }

        public void blockedAddress() {
        }

        public void unblockedAddress() {
        }
    }

    /* loaded from: input_file:org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest$FakeClient.class */
    private static final class FakeClient implements RequestReplyClient {
        ToFunction wasSentToFunction;
        Supplier<FromFunction> fromFunction;

        private FakeClient() {
            this.fromFunction = FromFunction::getDefaultInstance;
        }

        public CompletableFuture<FromFunction> call(ToFunctionRequestSummary toFunctionRequestSummary, RemoteInvocationMetrics remoteInvocationMetrics, ToFunction toFunction) {
            this.wasSentToFunction = toFunction;
            try {
                return CompletableFuture.completedFuture(this.fromFunction.get());
            } catch (Throwable th) {
                CompletableFuture<FromFunction> completableFuture = new CompletableFuture<>();
                completableFuture.completeExceptionally(th);
                return completableFuture;
            }
        }

        ToFunction.Invocation capturedInvocation(int i) {
            return this.wasSentToFunction.getInvocation().getInvocations(i);
        }

        TypedValue capturedState(int i) {
            return this.wasSentToFunction.getInvocation().getState(i).getStateValue();
        }

        Set<String> capturedStateNames() {
            return (Set) this.wasSentToFunction.getInvocation().getStateList().stream().map((v0) -> {
                return v0.getStateName();
            }).collect(Collectors.toSet());
        }

        public int capturedInvocationBatchSize() {
            return this.wasSentToFunction.getInvocation().getInvocationsCount();
        }
    }

    /* loaded from: input_file:org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest$FakeContext.class */
    private static final class FakeContext implements InternalContext {
        private final BacklogTrackingMetrics fakeMetrics;
        Address caller;
        boolean needsWaiting;
        List<Map.Entry<EgressIdentifier<?>, ?>> egresses;
        List<Map.Entry<Duration, ?>> delayed;

        private FakeContext() {
            this.fakeMetrics = new BacklogTrackingMetrics();
            this.egresses = new ArrayList();
            this.delayed = new ArrayList();
        }

        public void awaitAsyncOperationComplete() {
            this.needsWaiting = true;
        }

        /* renamed from: functionTypeMetrics, reason: merged with bridge method [inline-methods] */
        public BacklogTrackingMetrics m15functionTypeMetrics() {
            return this.fakeMetrics;
        }

        public Address self() {
            return new Address(RequestReplyFunctionTest.FN_TYPE, "0");
        }

        public Address caller() {
            return this.caller;
        }

        public void send(Address address, Object obj) {
        }

        public <T> void send(EgressIdentifier<T> egressIdentifier, T t) {
            this.egresses.add(new AbstractMap.SimpleImmutableEntry(egressIdentifier, t));
        }

        public void sendAfter(Duration duration, Address address, Object obj) {
            this.delayed.add(new AbstractMap.SimpleImmutableEntry(duration, obj));
        }

        public <M, T> void registerAsyncOperation(M m, CompletableFuture<T> completableFuture) {
        }
    }

    @Test
    public void example() {
        this.functionUnderTest.invoke(this.context, TypedValue.getDefaultInstance());
        Assert.assertTrue(this.client.wasSentToFunction.hasInvocation());
        Assert.assertThat(Integer.valueOf(this.client.capturedInvocationBatchSize()), CoreMatchers.is(1));
    }

    @Test
    public void callerIsSet() {
        this.context.caller = TestUtils.FUNCTION_1_ADDR;
        this.functionUnderTest.invoke(this.context, TypedValue.getDefaultInstance());
        Assert.assertThat(PolyglotUtil.polyglotAddressToSdkAddress(this.client.capturedInvocation(0).getCaller()), CoreMatchers.is(TestUtils.FUNCTION_1_ADDR));
    }

    @Test
    public void messageIsSet() {
        TypedValue build = TypedValue.newBuilder().setTypename("io.statefun.foo/bar").setHasValue(true).setValue(ByteString.copyFromUtf8("Hello!")).build();
        this.functionUnderTest.invoke(this.context, build);
        Assert.assertThat(this.client.capturedInvocation(0).getArgument(), CoreMatchers.is(build));
    }

    @Test
    public void batchIsAccumulatedWhileARequestIsInFlight() {
        this.functionUnderTest.invoke(this.context, TypedValue.getDefaultInstance());
        this.functionUnderTest.invoke(this.context, TypedValue.getDefaultInstance());
        this.functionUnderTest.invoke(this.context, TypedValue.getDefaultInstance());
        this.functionUnderTest.invoke(this.context, successfulAsyncOperation());
        Assert.assertThat(Integer.valueOf(this.client.capturedInvocationBatchSize()), CoreMatchers.is(2));
    }

    @Test
    public void reachingABatchLimitTriggersBackpressure() {
        RequestReplyFunction requestReplyFunction = new RequestReplyFunction(2, this.client);
        requestReplyFunction.invoke(this.context, TypedValue.getDefaultInstance());
        requestReplyFunction.invoke(this.context, TypedValue.getDefaultInstance());
        requestReplyFunction.invoke(this.context, TypedValue.getDefaultInstance());
        requestReplyFunction.invoke(this.context, TypedValue.getDefaultInstance());
        Assert.assertThat(Boolean.valueOf(this.context.needsWaiting), CoreMatchers.is(true));
    }

    @Test
    public void returnedMessageReleaseBackpressure() {
        RequestReplyFunction requestReplyFunction = new RequestReplyFunction(2, this.client);
        requestReplyFunction.invoke(this.context, TypedValue.getDefaultInstance());
        requestReplyFunction.invoke(this.context, TypedValue.getDefaultInstance());
        requestReplyFunction.invoke(this.context, TypedValue.getDefaultInstance());
        requestReplyFunction.invoke(this.context, TypedValue.getDefaultInstance());
        this.context.needsWaiting = false;
        requestReplyFunction.invoke(this.context, successfulAsyncOperation());
        requestReplyFunction.invoke(this.context, TypedValue.getDefaultInstance());
        Assert.assertThat(Boolean.valueOf(this.context.needsWaiting), CoreMatchers.is(false));
    }

    @Test
    public void stateIsModified() {
        this.functionUnderTest.invoke(this.context, TypedValue.getDefaultInstance());
        this.functionUnderTest.invoke(this.context, successfulAsyncOperation(FromFunction.newBuilder().setInvocationResult(FromFunction.InvocationResponse.newBuilder().addStateMutations(FromFunction.PersistedValueMutation.newBuilder().setStateValue(TypedValue.newBuilder().setTypename("com.foo.bar/myType").setHasValue(true).setValue(ByteString.copyFromUtf8("hello"))).setMutationType(FromFunction.PersistedValueMutation.MutationType.MODIFY).setStateName("session"))).build()));
        this.functionUnderTest.invoke(this.context, TypedValue.getDefaultInstance());
        Assert.assertThat(this.client.capturedState(0).getValue(), CoreMatchers.is(ByteString.copyFromUtf8("hello")));
    }

    @Test
    public void delayedMessages() {
        this.functionUnderTest.invoke(this.context, TypedValue.getDefaultInstance());
        this.functionUnderTest.invoke(this.context, successfulAsyncOperation(FromFunction.newBuilder().setInvocationResult(FromFunction.InvocationResponse.newBuilder().addDelayedInvocations(FromFunction.DelayedInvocation.newBuilder().setArgument(TypedValue.getDefaultInstance()).setDelayInMs(1L).build())).build()));
        Assert.assertFalse(this.context.delayed.isEmpty());
        Assert.assertEquals(Duration.ofMillis(1L), this.context.delayed.get(0).getKey());
    }

    @Test
    public void egressIsSent() {
        this.functionUnderTest.invoke(this.context, TypedValue.getDefaultInstance());
        this.functionUnderTest.invoke(this.context, successfulAsyncOperation(FromFunction.newBuilder().setInvocationResult(FromFunction.InvocationResponse.newBuilder().addOutgoingEgresses(FromFunction.EgressMessage.newBuilder().setArgument(TypedValue.getDefaultInstance()).setEgressNamespace("org.foo").setEgressType("bar"))).build()));
        Assert.assertFalse(this.context.egresses.isEmpty());
        Assert.assertEquals(new EgressIdentifier("org.foo", "bar", TypedValue.class), this.context.egresses.get(0).getKey());
    }

    @Test
    public void retryBatchOnIncompleteInvocationContextResponse() {
        TypedValue build = TypedValue.newBuilder().setTypename("io.statefun.foo/bar").setValue(ByteString.copyFromUtf8("Hello!")).build();
        this.functionUnderTest.invoke(this.context, build);
        this.functionUnderTest.invoke(this.context, successfulAsyncOperation(this.client.wasSentToFunction, FromFunction.newBuilder().setIncompleteInvocationContext(FromFunction.IncompleteInvocationContext.newBuilder().addMissingValues(FromFunction.PersistedValueSpec.newBuilder().setStateName("new-state").setExpirationSpec(FromFunction.ExpirationSpec.newBuilder().setMode(FromFunction.ExpirationSpec.ExpireMode.AFTER_INVOKE).setExpireAfterMillis(5000L).build()))).build()));
        Assert.assertTrue(this.client.wasSentToFunction.hasInvocation());
        Assert.assertThat(Integer.valueOf(this.client.capturedInvocationBatchSize()), CoreMatchers.is(1));
        Assert.assertThat(this.client.capturedInvocation(0).getArgument(), CoreMatchers.is(build));
        Assert.assertThat(Integer.valueOf(this.client.capturedStateNames().size()), CoreMatchers.is(2));
        Assert.assertThat(this.client.capturedStateNames(), CoreMatchers.hasItems(new String[]{"session", "new-state"}));
    }

    @Test
    public void backlogMetricsIncreasedOnInvoke() {
        this.functionUnderTest.invoke(this.context, TypedValue.getDefaultInstance());
        this.functionUnderTest.invoke(this.context, TypedValue.getDefaultInstance());
        this.functionUnderTest.invoke(this.context, TypedValue.getDefaultInstance());
        Assert.assertThat(Integer.valueOf(this.context.m15functionTypeMetrics().numBacklog), CoreMatchers.is(2));
    }

    @Test
    public void backlogMetricsDecreasedOnNextSuccess() {
        this.functionUnderTest.invoke(this.context, TypedValue.getDefaultInstance());
        this.functionUnderTest.invoke(this.context, TypedValue.getDefaultInstance());
        this.functionUnderTest.invoke(this.context, TypedValue.getDefaultInstance());
        this.context.needsWaiting = false;
        this.functionUnderTest.invoke(this.context, successfulAsyncOperation());
        Assert.assertThat(Integer.valueOf(this.context.m15functionTypeMetrics().numBacklog), CoreMatchers.is(0));
    }

    @Test
    public void retryBatchOnUnkownAsyncResponseAfterRestore() {
        TypedValue build = TypedValue.newBuilder().setTypename("io.statefun.foo/bar").setValue(ByteString.copyFromUtf8("Hello!")).build();
        this.functionUnderTest.invoke(this.context, build);
        new RequestReplyFunction(2, this.client).invoke(this.context, unknownAsyncOperation(this.client.wasSentToFunction));
        Assert.assertTrue(this.client.wasSentToFunction.hasInvocation());
        Assert.assertThat(Integer.valueOf(this.client.capturedInvocationBatchSize()), CoreMatchers.is(1));
        Assert.assertThat(this.client.capturedInvocation(0).getArgument(), CoreMatchers.is(build));
        Assert.assertThat(Integer.valueOf(this.client.capturedStateNames().size()), CoreMatchers.is(0));
    }

    private static PersistedRemoteFunctionValues testInitialRegisteredState(String str, String str2) {
        PersistedRemoteFunctionValues persistedRemoteFunctionValues = new PersistedRemoteFunctionValues();
        persistedRemoteFunctionValues.registerStates(Collections.singletonList(FromFunction.PersistedValueSpec.newBuilder().setTypeTypename(str2).setStateName(str).build()));
        return persistedRemoteFunctionValues;
    }

    private static AsyncOperationResult<Object, FromFunction> successfulAsyncOperation() {
        return new AsyncOperationResult<>(new Object(), AsyncOperationResult.Status.SUCCESS, FromFunction.getDefaultInstance(), (Throwable) null);
    }

    private static AsyncOperationResult<Object, FromFunction> successfulAsyncOperation(FromFunction fromFunction) {
        return new AsyncOperationResult<>(new Object(), AsyncOperationResult.Status.SUCCESS, fromFunction, (Throwable) null);
    }

    private static AsyncOperationResult<ToFunction, FromFunction> successfulAsyncOperation(ToFunction toFunction, FromFunction fromFunction) {
        return new AsyncOperationResult<>(toFunction, AsyncOperationResult.Status.SUCCESS, fromFunction, (Throwable) null);
    }

    private static AsyncOperationResult<ToFunction, FromFunction> unknownAsyncOperation(ToFunction toFunction) {
        return new AsyncOperationResult<>(toFunction, AsyncOperationResult.Status.UNKNOWN, FromFunction.getDefaultInstance(), (Throwable) null);
    }
}
