package org.apache.flink.statefun.flink.core.functions;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.statefun.flink.core.TestUtils;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.sdk.Address;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/PendingAsyncOperationsTest.class */
public class PendingAsyncOperationsTest {
    private final MemoryMapState<Long, Message> miniStateBackend = new MemoryMapState<>();
    private final Message dummyMessage = TestUtils.ENVELOPE_FACTORY.from(1);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/PendingAsyncOperationsTest$MemoryMapState.class */
    public static final class MemoryMapState<K, V> implements MapState<K, V>, Consumer<Address> {
        Map<Address, Map<K, V>> states;
        Address address;
        static final /* synthetic */ boolean $assertionsDisabled;

        private MemoryMapState() {
            this.states = new HashMap();
        }

        @Override // java.util.function.Consumer
        public void accept(Address address) {
            this.address = address;
        }

        public void setCurrentAddress(Address address) {
            this.address = address;
        }

        public Map<K, V> perCurrentAddressState() {
            if ($assertionsDisabled || this.address != null) {
                return this.states.computeIfAbsent(this.address, address -> {
                    return new HashMap();
                });
            }
            throw new AssertionError();
        }

        public V get(K k) {
            return perCurrentAddressState().get(k);
        }

        public void put(K k, V v) {
            perCurrentAddressState().put(k, v);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void putAll(Map<K, V> map) {
            perCurrentAddressState().putAll(map);
        }

        public void remove(K k) {
            perCurrentAddressState().remove(k);
        }

        public boolean contains(K k) {
            return perCurrentAddressState().containsKey(k);
        }

        public Iterable<Map.Entry<K, V>> entries() {
            return perCurrentAddressState().entrySet();
        }

        public Iterable<K> keys() {
            return perCurrentAddressState().keySet();
        }

        public Iterable<V> values() {
            return perCurrentAddressState().values();
        }

        public Iterator<Map.Entry<K, V>> iterator() {
            return perCurrentAddressState().entrySet().iterator();
        }

        public boolean isEmpty() {
            return perCurrentAddressState().isEmpty();
        }

        public void clear() {
            perCurrentAddressState().clear();
        }

        static {
            $assertionsDisabled = !PendingAsyncOperationsTest.class.desiredAssertionStatus();
        }
    }

    @Test
    public void exampleUsage() {
        PendingAsyncOperations pendingAsyncOperations = new PendingAsyncOperations(this.miniStateBackend, this.miniStateBackend);
        this.miniStateBackend.setCurrentAddress(TestUtils.FUNCTION_1_ADDR);
        pendingAsyncOperations.add(TestUtils.FUNCTION_1_ADDR, 1L, this.dummyMessage);
        pendingAsyncOperations.flush();
        Assert.assertThat(this.miniStateBackend, matchesAddressState(TestUtils.FUNCTION_1_ADDR, Matchers.hasKey(1L)));
    }

    @Test
    public void itemsAreExplicitlyFlushed() {
        PendingAsyncOperations pendingAsyncOperations = new PendingAsyncOperations(this.miniStateBackend, this.miniStateBackend);
        this.miniStateBackend.setCurrentAddress(TestUtils.FUNCTION_1_ADDR);
        pendingAsyncOperations.add(TestUtils.FUNCTION_1_ADDR, 1L, this.dummyMessage);
        Assert.assertThat(this.miniStateBackend, CoreMatchers.not(matchesAddressState(TestUtils.FUNCTION_1_ADDR, Matchers.hasKey(1L))));
    }

    @Test
    public void inFlightItemsDoNotFlush() {
        PendingAsyncOperations pendingAsyncOperations = new PendingAsyncOperations(this.miniStateBackend, this.miniStateBackend);
        this.miniStateBackend.setCurrentAddress(TestUtils.FUNCTION_1_ADDR);
        pendingAsyncOperations.add(TestUtils.FUNCTION_1_ADDR, 1L, this.dummyMessage);
        pendingAsyncOperations.remove(TestUtils.FUNCTION_1_ADDR, 1L);
        pendingAsyncOperations.flush();
        Assert.assertThat(this.miniStateBackend, CoreMatchers.not(matchesAddressState(TestUtils.FUNCTION_1_ADDR, Matchers.hasKey(1L))));
    }

    @Test
    public void differentAddressesShouldBeFlushedToTheirStates() {
        PendingAsyncOperations pendingAsyncOperations = new PendingAsyncOperations(this.miniStateBackend, this.miniStateBackend);
        this.miniStateBackend.setCurrentAddress(TestUtils.FUNCTION_1_ADDR);
        pendingAsyncOperations.add(TestUtils.FUNCTION_1_ADDR, 1L, this.dummyMessage);
        this.miniStateBackend.setCurrentAddress(TestUtils.FUNCTION_2_ADDR);
        pendingAsyncOperations.add(TestUtils.FUNCTION_2_ADDR, 1L, this.dummyMessage);
        pendingAsyncOperations.flush();
        Assert.assertThat(this.miniStateBackend, CoreMatchers.allOf(matchesAddressState(TestUtils.FUNCTION_1_ADDR, Matchers.hasKey(1L)), matchesAddressState(TestUtils.FUNCTION_2_ADDR, Matchers.hasKey(1L))));
    }

    private static <K, V, M> Matcher<MemoryMapState<K, V>> matchesAddressState(final Address address, final Matcher<M> matcher) {
        return new TypeSafeMatcher<MemoryMapState<K, V>>() { // from class: org.apache.flink.statefun.flink.core.functions.PendingAsyncOperationsTest.1
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(MemoryMapState<K, V> memoryMapState) {
                return matcher.matches(memoryMapState.states.get(address));
            }

            public void describeTo(Description description) {
                matcher.describeTo(description);
            }
        };
    }
}
