package org.apache.flink.statefun.flink.core.functions;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.statefun.flink.core.TestUtils;
import org.apache.flink.statefun.flink.core.generated.EnvelopeAddress;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.metrics.Metrics;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/LocalStatefulFunctionGroupTest.class */
public class LocalStatefulFunctionGroupTest {
    private static final FunctionType FUNCTION_TYPE = new FunctionType("test", "a");
    private static final Address FUNCTION_1_ADDR = new Address(FUNCTION_TYPE, "a-1");
    private static final Address FUNCTION_2_ADDR = new Address(FUNCTION_TYPE, "a-2");
    private static final EnvelopeAddress DUMMY_PAYLOAD = EnvelopeAddress.getDefaultInstance();
    private final FakeContext context = new FakeContext();
    private final FakeFunction function = new FakeFunction();
    private final FakeFunctionRepository fakeRepository = new FakeFunctionRepository(this.function);
    private final LocalFunctionGroup functionGroupUnderTest = new LocalFunctionGroup(this.fakeRepository, this.context);

    /* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/LocalStatefulFunctionGroupTest$FakeContext.class */
    static final class FakeContext implements ApplyingContext {
        Message in;

        FakeContext() {
        }

        public Address self() {
            return this.in.target();
        }

        public Address caller() {
            return this.in.source();
        }

        public void send(Address address, Object obj) {
        }

        public <T> void send(EgressIdentifier<T> egressIdentifier, T t) {
        }

        public void sendAfter(Duration duration, Address address, Object obj) {
        }

        public void sendAfter(Duration duration, Address address, Object obj, String str) {
        }

        public void cancelDelayedMessage(String str) {
        }

        public <M, T> void registerAsyncOperation(M m, CompletableFuture<T> completableFuture) {
        }

        public Metrics metrics() {
            throw new UnsupportedOperationException();
        }

        public void apply(LiveFunction liveFunction, Message message) {
            this.in = message;
            liveFunction.receive(this, message);
        }
    }

    /* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/LocalStatefulFunctionGroupTest$FakeFunction.class */
    static final class FakeFunction implements LiveFunction {
        List<Message> receivedMessages = new ArrayList();

        FakeFunction() {
        }

        public void receive(Context context, Message message) {
            this.receivedMessages.add(message);
        }

        public FunctionTypeMetrics metrics() {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/LocalStatefulFunctionGroupTest$FakeFunctionRepository.class */
    static final class FakeFunctionRepository implements FunctionRepository {
        private LiveFunction function;

        FakeFunctionRepository(FakeFunction fakeFunction) {
            this.function = fakeFunction;
        }

        public LiveFunction get(FunctionType functionType) {
            return this.function;
        }
    }

    @Test
    public void sanity() {
        Assert.assertThat(Boolean.valueOf(this.functionGroupUnderTest.processNextEnvelope()), CoreMatchers.is(false));
    }

    @Test
    public void addingMessageWouldBeProcessedLater() {
        this.functionGroupUnderTest.enqueue(TestUtils.ENVELOPE_FACTORY.from(FUNCTION_1_ADDR, FUNCTION_2_ADDR, DUMMY_PAYLOAD));
        Assert.assertThat(Boolean.valueOf(this.functionGroupUnderTest.processNextEnvelope()), CoreMatchers.is(true));
        Assert.assertThat(Boolean.valueOf(this.functionGroupUnderTest.processNextEnvelope()), CoreMatchers.is(false));
    }

    @Test
    public void aFunctionWouldReceiveAMessageAddressedToIt() {
        Message from = TestUtils.ENVELOPE_FACTORY.from(FUNCTION_1_ADDR, FUNCTION_2_ADDR, DUMMY_PAYLOAD);
        this.functionGroupUnderTest.enqueue(from);
        this.functionGroupUnderTest.processNextEnvelope();
        Assert.assertThat(this.function.receivedMessages.get(0).target(), CoreMatchers.is(from.target()));
    }
}
