package org.apache.flink.statefun.flink.core.functions;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.state.State;
import org.apache.flink.statefun.sdk.Address;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/PendingAsyncOperations.class */
public final class PendingAsyncOperations {
    private final Map<Address, Map<Long, Message>> memoryStore;
    private final MapState<Long, Message> backingStore;
    private final Consumer<Address> keySetter;

    /* JADX WARN: 'this' call moved to the top of the method (can break code semantics) */
    @Inject
    PendingAsyncOperations(@Label("state") State state, @Label("async-operations") MapState<Long, Message> mapState) {
        this((Consumer<Address>) state::setCurrentKey, mapState);
        state.getClass();
    }

    @VisibleForTesting
    PendingAsyncOperations(Consumer<Address> consumer, MapState<Long, Message> mapState) {
        this.memoryStore = new HashMap();
        this.backingStore = (MapState) Objects.requireNonNull(mapState);
        this.keySetter = (Consumer) Objects.requireNonNull(consumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void add(Address address, long j, Message message) {
        this.memoryStore.computeIfAbsent(address, address2 -> {
            return new HashMap();
        }).put(Long.valueOf(j), message);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void remove(Address address, long j) {
        Map<Long, Message> map = this.memoryStore.get(address);
        if (map == null) {
            removeFromTheBackingStore(address, j);
            return;
        }
        if (map.remove(Long.valueOf(j)) == null) {
            removeFromTheBackingStore(address, j);
        }
        if (map.isEmpty()) {
            this.memoryStore.remove(address);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flush() {
        this.memoryStore.forEach(this::flushState);
        this.memoryStore.clear();
    }

    private void flushState(Address address, Map<Long, Message> map) {
        this.keySetter.accept(address);
        try {
            this.backingStore.putAll(map);
        } catch (Exception e) {
            throw new IllegalStateException("Unable to persisted a previously registered asynchronous operation for " + address, e);
        }
    }

    private void removeFromTheBackingStore(Address address, long j) {
        try {
            this.backingStore.remove(Long.valueOf(j));
        } catch (Exception e) {
            throw new IllegalStateException("Unable to remove a registered asynchronous operation for " + address, e);
        }
    }
}
