package org.apache.beam.fn.harness.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.state.StateFetchingIterators;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
import org.apache.beam.sdk.fn.stream.PrefetchableIterator;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.checkerframework.dataflow.qual.Pure;

/* loaded from: input_file:org/apache/beam/fn/harness/state/MultimapUserState.class */
public class MultimapUserState<K, V> {
    private final Cache<?, ?> cache;
    private final BeamFnStateClient beamFnStateClient;
    private final Coder<K> mapKeyCoder;
    private final Coder<V> valueCoder;
    private final BeamFnApi.StateRequest keysStateRequest;
    private final BeamFnApi.StateRequest userStateRequest;
    private final StateFetchingIterators.CachingStateIterable<K> persistedKeys;
    private boolean isClosed;
    private boolean isCleared;
    private HashMap<Object, K> pendingRemoves = Maps.newHashMap();
    private HashMap<Object, KV<K, List<V>>> pendingAdds = Maps.newHashMap();
    private HashMap<Object, KV<K, StateFetchingIterators.CachingStateIterable<V>>> persistedValues = Maps.newHashMap();

    public MultimapUserState(Cache<?, ?> cache, BeamFnStateClient beamFnStateClient, String str, BeamFnApi.StateKey stateKey, Coder<K> coder, Coder<V> coder2) {
        Preconditions.checkArgument(stateKey.hasMultimapKeysUserState(), "Expected MultimapKeysUserState StateKey but received %s.", stateKey);
        this.cache = cache;
        this.beamFnStateClient = beamFnStateClient;
        this.mapKeyCoder = coder;
        this.valueCoder = coder2;
        this.keysStateRequest = BeamFnApi.StateRequest.newBuilder().setInstructionId(str).setStateKey(stateKey).build();
        this.persistedKeys = StateFetchingIterators.readAllAndDecodeStartingFrom(cache, beamFnStateClient, this.keysStateRequest, coder);
        BeamFnApi.StateRequest.Builder newBuilder = BeamFnApi.StateRequest.newBuilder();
        newBuilder.setInstructionId(str).getStateKeyBuilder().getMultimapUserStateBuilder().setTransformId(stateKey.getMultimapKeysUserState().getTransformId()).setUserStateId(stateKey.getMultimapKeysUserState().getUserStateId()).setWindow(stateKey.getMultimapKeysUserState().getWindow()).setKey(stateKey.getMultimapKeysUserState().getKey());
        this.userStateRequest = newBuilder.build();
    }

    public void clear() {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", this.keysStateRequest.getStateKey());
        this.isCleared = true;
        this.persistedValues = Maps.newHashMap();
        this.pendingRemoves = Maps.newHashMap();
        this.pendingAdds = Maps.newHashMap();
    }

    public PrefetchableIterable<V> get(K k) {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", this.keysStateRequest.getStateKey());
        Object structuralValue = this.mapKeyCoder.structuralValue(k);
        KV<K, List<V>> kv = this.pendingAdds.get(structuralValue);
        PrefetchableIterable<V> fromArray = kv == null ? PrefetchableIterables.fromArray(new Object[0]) : PrefetchableIterables.limit((Iterable) kv.getValue(), ((List) kv.getValue()).size());
        return (this.isCleared || this.pendingRemoves.containsKey(structuralValue)) ? fromArray : PrefetchableIterables.concat(getPersistedValues(structuralValue, k), fromArray);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public PrefetchableIterable<K> keys() {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", this.keysStateRequest.getStateKey());
        if (this.isCleared) {
            ArrayList arrayList = new ArrayList(this.pendingAdds.size());
            Iterator<Map.Entry<Object, KV<K, List<V>>>> it = this.pendingAdds.entrySet().iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().getValue().getKey());
            }
            return PrefetchableIterables.concat(arrayList);
        }
        final HashSet hashSet = new HashSet(this.pendingRemoves.keySet());
        final HashMap hashMap = new HashMap();
        for (Map.Entry<Object, KV<K, List<V>>> entry : this.pendingAdds.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue().getKey());
        }
        return new PrefetchableIterables.Default<K>() { // from class: org.apache.beam.fn.harness.state.MultimapUserState.1
            @Override // org.apache.beam.sdk.fn.stream.PrefetchableIterables.Default
            public PrefetchableIterator<K> createIterator() {
                return new PrefetchableIterator<K>() { // from class: org.apache.beam.fn.harness.state.MultimapUserState.1.1
                    PrefetchableIterator<K> persistedKeysIterator;
                    Iterator<K> pendingAddsNowIterator;
                    boolean hasNext;
                    K nextKey;

                    {
                        this.persistedKeysIterator = (PrefetchableIterator<K>) MultimapUserState.this.persistedKeys.iterator();
                    }

                    @Override // org.apache.beam.sdk.fn.stream.PrefetchableIterator
                    public boolean isReady() {
                        return this.persistedKeysIterator.isReady();
                    }

                    @Override // org.apache.beam.sdk.fn.stream.PrefetchableIterator
                    public void prefetch() {
                        if (isReady()) {
                            return;
                        }
                        this.persistedKeysIterator.prefetch();
                    }

                    @Override // java.util.Iterator
                    @Pure
                    public boolean hasNext() {
                        if (this.hasNext) {
                            return true;
                        }
                        while (this.persistedKeysIterator.hasNext()) {
                            this.nextKey = this.persistedKeysIterator.next();
                            Object structuralValue = MultimapUserState.this.mapKeyCoder.structuralValue(this.nextKey);
                            if (!hashSet.contains(structuralValue)) {
                                if (hashMap.containsKey(structuralValue)) {
                                    hashMap.remove(structuralValue);
                                }
                                this.hasNext = true;
                                return true;
                            }
                        }
                        if (this.pendingAddsNowIterator == null) {
                            this.pendingAddsNowIterator = hashMap.values().iterator();
                        }
                        if (!this.pendingAddsNowIterator.hasNext()) {
                            return false;
                        }
                        this.nextKey = this.pendingAddsNowIterator.next();
                        this.hasNext = true;
                        return true;
                    }

                    @Override // java.util.Iterator
                    public K next() {
                        if (!hasNext()) {
                            throw new NoSuchElementException();
                        }
                        this.hasNext = false;
                        return this.nextKey;
                    }
                };
            }
        };
    }

    public void put(K k, V v) {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", this.keysStateRequest.getStateKey());
        Object structuralValue = this.mapKeyCoder.structuralValue(k);
        this.pendingAdds.putIfAbsent(structuralValue, KV.of(k, new ArrayList()));
        ((List) this.pendingAdds.get(structuralValue).getValue()).add(v);
    }

    public void remove(K k) {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", this.keysStateRequest.getStateKey());
        Object structuralValue = this.mapKeyCoder.structuralValue(k);
        this.pendingAdds.remove(structuralValue);
        if (this.isCleared) {
            return;
        }
        this.pendingRemoves.put(structuralValue, k);
    }

    public void asyncClose() throws Exception {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", this.keysStateRequest.getStateKey());
        this.isClosed = true;
        if (!this.isCleared && this.pendingRemoves.isEmpty() && this.pendingAdds.isEmpty()) {
            return;
        }
        startStateApiWrites();
        updateCache();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void startStateApiWrites() {
        if (this.isCleared) {
            this.beamFnStateClient.handle(this.keysStateRequest.toBuilder().setClear(BeamFnApi.StateClearRequest.getDefaultInstance()));
        } else if (!this.pendingRemoves.isEmpty()) {
            Iterator<K> it = this.pendingRemoves.values().iterator();
            while (it.hasNext()) {
                this.beamFnStateClient.handle(createUserStateRequest(it.next()).toBuilder().setClear(BeamFnApi.StateClearRequest.getDefaultInstance()));
            }
        }
        if (this.pendingAdds.isEmpty()) {
            return;
        }
        for (KV<K, List<V>> kv : this.pendingAdds.values()) {
            this.beamFnStateClient.handle(createUserStateRequest(kv.getKey()).toBuilder().setAppend(BeamFnApi.StateAppendRequest.newBuilder().setData(encodeValues((Iterable) kv.getValue()))));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void updateCache() {
        ArrayList arrayList = new ArrayList(this.pendingAdds.size());
        Iterator<KV<K, List<V>>> it = this.pendingAdds.values().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getKey());
        }
        if (this.isCleared) {
            this.persistedKeys.clearAndAppend(arrayList);
            for (Map.Entry<Object, KV<K, List<V>>> entry : this.pendingAdds.entrySet()) {
                getPersistedValues(entry.getKey(), entry.getValue().getKey()).clearAndAppend((List) entry.getValue().getValue());
            }
            return;
        }
        this.persistedKeys.remove(this.pendingRemoves.keySet());
        this.persistedKeys.append(arrayList);
        for (Map.Entry<Object, K> entry2 : this.pendingRemoves.entrySet()) {
            getPersistedValues(entry2.getKey(), entry2.getValue()).clearAndAppend(Collections.emptyList());
        }
        for (Map.Entry<Object, KV<K, List<V>>> entry3 : this.pendingAdds.entrySet()) {
            KV<K, StateFetchingIterators.CachingStateIterable<V>> kv = this.persistedValues.get(entry3.getKey());
            if (kv != null) {
                ((StateFetchingIterators.CachingStateIterable) kv.getValue()).append((List) entry3.getValue().getValue());
            }
        }
    }

    private ByteString encodeValues(Iterable<V> iterable) {
        try {
            ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
            Iterator<V> it = iterable.iterator();
            while (it.hasNext()) {
                this.valueCoder.encode(it.next(), byteStringOutputStream);
            }
            return byteStringOutputStream.toByteString();
        } catch (IOException e) {
            throw new IllegalStateException(String.format("Failed to encode values for multimap user state id %s.", this.keysStateRequest.getStateKey().getMultimapKeysUserState().getUserStateId()), e);
        }
    }

    private BeamFnApi.StateRequest createUserStateRequest(K k) {
        try {
            ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
            this.mapKeyCoder.encode(k, byteStringOutputStream);
            BeamFnApi.StateRequest.Builder builder = this.userStateRequest.toBuilder();
            builder.getStateKeyBuilder().getMultimapUserStateBuilder().setMapKey(byteStringOutputStream.toByteString());
            return builder.build();
        } catch (IOException e) {
            throw new IllegalStateException(String.format("Failed to encode key for multimap user state id %s.", this.keysStateRequest.getStateKey().getMultimapKeysUserState().getUserStateId()), e);
        }
    }

    private StateFetchingIterators.CachingStateIterable<V> getPersistedValues(Object obj, K k) {
        return (StateFetchingIterators.CachingStateIterable) this.persistedValues.computeIfAbsent(obj, obj2 -> {
            BeamFnApi.StateRequest createUserStateRequest = createUserStateRequest(k);
            return KV.of(k, StateFetchingIterators.readAllAndDecodeStartingFrom(Caches.subCache(this.cache, "ValuesForKey", createUserStateRequest.getStateKey().getMultimapUserState().getMapKey()), this.beamFnStateClient, createUserStateRequest, this.valueCoder));
        }).getValue();
    }
}
