package org.apache.flink.statefun.flink.core.functions;

import java.util.HashMap;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.state.State;
import org.apache.flink.statefun.sdk.Address;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/PendingAsyncOperations.class */
final class PendingAsyncOperations {
    private final HashMap<Key, Message> memoryStore;
    private final MapState<Long, Message> backingStore;
    private final Consumer<Address> keySetter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/PendingAsyncOperations$Key.class */
    public static final class Key {
        private final Address owningAddress;
        private final long futureId;
        private final int hash;

        public Key(Address address, long j) {
            this.owningAddress = (Address) Objects.requireNonNull(address);
            this.futureId = j;
            this.hash = (37 * address.hashCode()) + Long.hashCode(j);
        }

        public int hashCode() {
            return this.hash;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Key key = (Key) obj;
            if (this.futureId != key.futureId) {
                return false;
            }
            return this.owningAddress.equals(key.owningAddress);
        }
    }

    /* JADX WARN: 'this' call moved to the top of the method (can break code semantics) */
    @Inject
    PendingAsyncOperations(@Label("state") State state, @Label("async-operations") MapState<Long, Message> mapState) {
        this((Consumer<Address>) state::setCurrentKey, mapState);
        state.getClass();
    }

    @VisibleForTesting
    PendingAsyncOperations(Consumer<Address> consumer, MapState<Long, Message> mapState) {
        this.memoryStore = new HashMap<>(32768);
        this.backingStore = (MapState) Objects.requireNonNull(mapState);
        this.keySetter = (Consumer) Objects.requireNonNull(consumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void add(Address address, long j, Message message) {
        this.memoryStore.put(new Key(address, j), message);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void remove(Address address, long j) {
        if (removeFromMemoryStore(address, j)) {
            return;
        }
        removeFromBackingStore(address, j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flush() {
        this.memoryStore.forEach(this::flushState);
        this.memoryStore.clear();
    }

    private boolean removeFromMemoryStore(Address address, long j) {
        return this.memoryStore.remove(new Key(address, j)) != null;
    }

    private void removeFromBackingStore(Address address, long j) {
        try {
            this.backingStore.remove(Long.valueOf(j));
        } catch (Exception e) {
            throw new IllegalStateException("Unable to remove a registered asynchronous operation for " + address, e);
        }
    }

    private void flushState(Key key, Message message) {
        this.keySetter.accept(key.owningAddress);
        try {
            this.backingStore.put(Long.valueOf(key.futureId), message);
        } catch (Exception e) {
            throw new IllegalStateException("Unable to persisted a previously registered asynchronous operation for " + key.owningAddress, e);
        }
    }
}
