package io.apicurio.registry.utils.streams.distore;

import com.google.protobuf.ByteString;
import io.apicurio.registry.utils.ProtoUtil;
import io.apicurio.registry.utils.streams.distore.KeyValueSerde;
import io.apicurio.registry.utils.streams.distore.proto.FilterReq;
import io.apicurio.registry.utils.streams.distore.proto.Key;
import io.apicurio.registry.utils.streams.distore.proto.KeyFromKeyToReq;
import io.apicurio.registry.utils.streams.distore.proto.KeyReq;
import io.apicurio.registry.utils.streams.distore.proto.KeyValue;
import io.apicurio.registry.utils.streams.distore.proto.KeyValueStoreGrpc;
import io.apicurio.registry.utils.streams.distore.proto.Size;
import io.apicurio.registry.utils.streams.distore.proto.Value;
import io.apicurio.registry.utils.streams.distore.proto.VoidReq;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

/* loaded from: input_file:io/apicurio/registry/utils/streams/distore/KeyValueStoreGrpcImplLocalDispatcher.class */
public class KeyValueStoreGrpcImplLocalDispatcher extends KeyValueStoreGrpc.KeyValueStoreImplBase {
    private final KafkaStreams streams;
    private final KeyValueSerde.Registry keyValueSerdes;
    private final ConcurrentMap<String, ReadOnlyKeyValueStore<?, ?>> keyValueStores = new ConcurrentHashMap();
    private final FilterPredicate<?, ?> filterPredicate;

    public KeyValueStoreGrpcImplLocalDispatcher(KafkaStreams kafkaStreams, KeyValueSerde.Registry registry, FilterPredicate<?, ?> filterPredicate) {
        this.streams = kafkaStreams;
        this.keyValueSerdes = registry;
        this.filterPredicate = filterPredicate;
    }

    private <K, V> ExtReadOnlyKeyValueStore<K, V> keyValueStore(String str) {
        return (ExtReadOnlyKeyValueStore) this.keyValueStores.computeIfAbsent(str, str2 -> {
            return new ExtReadOnlyKeyValueStoreImpl((ReadOnlyKeyValueStore) this.streams.store(StoreQueryParameters.fromNameAndType(str, QueryableStoreTypes.keyValueStore())), this.filterPredicate);
        });
    }

    @Override // io.apicurio.registry.utils.streams.distore.proto.KeyValueStoreGrpc.KeyValueStoreImplBase
    public void allKeys(VoidReq voidReq, StreamObserver<Key> streamObserver) {
        boolean z = false;
        try {
            Stream allKeys = keyValueStore(voidReq.getStoreName()).allKeys();
            Throwable th = null;
            try {
                try {
                    drainToKey(voidReq.getStoreName(), allKeys, streamObserver);
                    z = true;
                    if (allKeys != null) {
                        if (0 != 0) {
                            try {
                                allKeys.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            allKeys.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th4) {
            streamObserver.onError(th4);
        }
        if (z) {
            streamObserver.onCompleted();
        }
    }

    @Override // io.apicurio.registry.utils.streams.distore.proto.KeyValueStoreGrpc.KeyValueStoreImplBase
    public void filter(FilterReq filterReq, StreamObserver<KeyValue> streamObserver) {
        Stream filter;
        Throwable th;
        boolean z = false;
        try {
            filter = keyValueStore(filterReq.getStoreName()).filter(ProtoUtil.emptyAsNull(filterReq.getFilter()), filterReq.getOver());
            th = null;
        } catch (Throwable th2) {
            streamObserver.onError(th2);
        }
        try {
            try {
                drainToKeyValue(filterReq.getStoreName(), filter, streamObserver);
                z = true;
                if (filter != null) {
                    if (0 != 0) {
                        try {
                            filter.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        filter.close();
                    }
                }
                if (z) {
                    streamObserver.onCompleted();
                }
            } finally {
            }
        } catch (Throwable th4) {
            th = th4;
            throw th4;
        }
    }

    @Override // io.apicurio.registry.utils.streams.distore.proto.KeyValueStoreGrpc.KeyValueStoreImplBase
    public void get(KeyReq keyReq, StreamObserver<Value> streamObserver) {
        boolean z = false;
        try {
            byte[] serializeVal = this.keyValueSerdes.serializeVal(keyReq.getStoreName(), keyValueStore(keyReq.getStoreName()).get(this.keyValueSerdes.deserializeKey(keyReq.getStoreName(), keyReq.getKey().toByteArray())));
            if (serializeVal != null) {
                streamObserver.onNext(Value.newBuilder().setValue(ByteString.copyFrom(serializeVal)).m1080build());
            }
            z = true;
        } catch (Throwable th) {
            streamObserver.onError(th);
        }
        if (z) {
            streamObserver.onCompleted();
        }
    }

    @Override // io.apicurio.registry.utils.streams.distore.proto.KeyValueStoreGrpc.KeyValueStoreImplBase
    public void range(KeyFromKeyToReq keyFromKeyToReq, StreamObserver<KeyValue> streamObserver) {
        boolean z = false;
        try {
            KeyValueIterator range = keyValueStore(keyFromKeyToReq.getStoreName()).range(this.keyValueSerdes.deserializeKey(keyFromKeyToReq.getStoreName(), keyFromKeyToReq.getKeyFrom().toByteArray()), this.keyValueSerdes.deserializeVal(keyFromKeyToReq.getStoreName(), keyFromKeyToReq.getKeyTo().toByteArray()));
            Throwable th = null;
            try {
                try {
                    drainToKeyValue(keyFromKeyToReq.getStoreName(), range, streamObserver);
                    z = true;
                    if (range != null) {
                        if (0 != 0) {
                            try {
                                range.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            range.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th4) {
            streamObserver.onError(th4);
        }
        if (z) {
            streamObserver.onCompleted();
        }
    }

    @Override // io.apicurio.registry.utils.streams.distore.proto.KeyValueStoreGrpc.KeyValueStoreImplBase
    public void all(VoidReq voidReq, StreamObserver<KeyValue> streamObserver) {
        boolean z = false;
        try {
            KeyValueIterator all = keyValueStore(voidReq.getStoreName()).all();
            Throwable th = null;
            try {
                try {
                    drainToKeyValue(voidReq.getStoreName(), all, streamObserver);
                    z = true;
                    if (all != null) {
                        if (0 != 0) {
                            try {
                                all.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            all.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th4) {
            streamObserver.onError(th4);
        }
        if (z) {
            streamObserver.onCompleted();
        }
    }

    @Override // io.apicurio.registry.utils.streams.distore.proto.KeyValueStoreGrpc.KeyValueStoreImplBase
    public void approximateNumEntries(VoidReq voidReq, StreamObserver<Size> streamObserver) {
        boolean z = false;
        try {
            streamObserver.onNext(Size.newBuilder().setSize(keyValueStore(voidReq.getStoreName()).approximateNumEntries()).m1033build());
            z = true;
        } catch (Throwable th) {
            streamObserver.onError(th);
        }
        if (z) {
            streamObserver.onCompleted();
        }
    }

    private <K> void drainToKey(String str, Stream<K> stream, StreamObserver<Key> streamObserver) {
        stream.forEach(obj -> {
            byte[] serializeKey = this.keyValueSerdes.serializeKey(str, obj);
            if (serializeKey != null) {
                streamObserver.onNext(Key.newBuilder().setKey(ByteString.copyFrom(serializeKey)).m839build());
            }
        });
    }

    private <K, V> void drainToKeyValue(String str, Stream<org.apache.kafka.streams.KeyValue<K, V>> stream, StreamObserver<KeyValue> streamObserver) {
        stream.forEach(keyValue -> {
            drainToKeyValue(str, keyValue, (StreamObserver<KeyValue>) streamObserver);
        });
    }

    private <K, V> void drainToKeyValue(String str, KeyValueIterator<K, V> keyValueIterator, StreamObserver<KeyValue> streamObserver) {
        while (keyValueIterator.hasNext()) {
            drainToKeyValue(str, (org.apache.kafka.streams.KeyValue) keyValueIterator.next(), streamObserver);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <K, V> void drainToKeyValue(String str, org.apache.kafka.streams.KeyValue<K, V> keyValue, StreamObserver<KeyValue> streamObserver) {
        byte[] serializeKey = this.keyValueSerdes.serializeKey(str, keyValue.key);
        byte[] serializeVal = this.keyValueSerdes.serializeVal(str, keyValue.value);
        if (serializeKey == null || serializeVal == null) {
            return;
        }
        streamObserver.onNext(KeyValue.newBuilder().setKey(ByteString.copyFrom(serializeKey)).setValue(ByteString.copyFrom(serializeVal)).m980build());
    }
}
