package io.apicurio.registry.utils.streams.diservice;

import io.grpc.Channel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/apicurio/registry/utils/streams/diservice/DistributedService.class */
public abstract class DistributedService<K, S> implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(DistributedService.class);
    private final KafkaStreams streams;
    private final HostInfo localApplicationServer;
    private final String storeName;
    private final Serde<K> keySerde;
    private final Function<? super HostInfo, ? extends Channel> grpcChannelProvider;
    private final boolean parallel;
    private final ConcurrentMap<HostInfo, S> hostInfo2service = new ConcurrentHashMap();

    public DistributedService(KafkaStreams kafkaStreams, HostInfo hostInfo, String str, Serde<K> serde, Function<? super HostInfo, ? extends Channel> function, boolean z) {
        this.streams = (KafkaStreams) Objects.requireNonNull(kafkaStreams, "streams");
        this.localApplicationServer = (HostInfo) Objects.requireNonNull(hostInfo, "localApplicationServer");
        this.storeName = (String) Objects.requireNonNull(str, "storeName");
        this.keySerde = (Serde) Objects.requireNonNull(serde, "keySerde");
        this.grpcChannelProvider = (Function) Objects.requireNonNull(function, "grpcChannelProvider");
        this.parallel = z;
    }

    protected Serde<K> getKeySerde() {
        return this.keySerde;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        synchronized (this.hostInfo2service) {
            Iterator<Map.Entry<HostInfo, S>> it = this.hostInfo2service.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<HostInfo, S> next = it.next();
                HostInfo key = next.getKey();
                S value = next.getValue();
                if (!this.localApplicationServer.equals(key)) {
                    try {
                        ((AutoCloseable) value).close();
                    } catch (Error | RuntimeException e) {
                        throw e;
                    } catch (Exception e2) {
                        throw new RuntimeException("Exception occurred closing the service", e2);
                    }
                }
                it.remove();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final S serviceForKey(K k) {
        KeyQueryMetadata queryMetadataForKey = this.streams.queryMetadataForKey(this.storeName, k, this.keySerde.serializer());
        if (queryMetadataForKey == null) {
            throw new InvalidStateStoreException("StreamsMetadata is null?! Store-name: " + this.storeName + " Key: " + k);
        }
        if (queryMetadataForKey == KeyQueryMetadata.NOT_AVAILABLE) {
            throw new InvalidStateStoreException("StreamsMetadata is currently unavailable. This can occur during rebalance operations. Store-name: " + this.storeName + " Key: " + k);
        }
        return serviceForHostInfo(queryMetadataForKey.getActiveHost());
    }

    protected final Collection<S> allServicesForStore() {
        Collection<StreamsMetadata> allMetadataForStore = this.streams.allMetadataForStore(this.storeName);
        if (allMetadataForStore.isEmpty()) {
            throw new InvalidStateStoreException("StreamsMetadata is currently unavailable. This can occur during rebalance operations. Store-name: " + this.storeName);
        }
        ArrayList arrayList = new ArrayList(allMetadataForStore.size());
        for (StreamsMetadata streamsMetadata : allMetadataForStore) {
            if (streamsMetadata.topicPartitions().size() > 0) {
                arrayList.add(serviceForHostInfo(streamsMetadata.hostInfo()));
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Stream<S> allServicesForStoreStream() {
        Collection<S> allServicesForStore = allServicesForStore();
        return (!this.parallel || allServicesForStore.size() <= 1) ? allServicesForStore.stream() : allServicesForStore.parallelStream();
    }

    protected final Collection<S> allServices() {
        Collection<StreamsMetadata> allMetadata = this.streams.allMetadata();
        if (allMetadata.isEmpty()) {
            throw new StreamsException("StreamsMetadata is currently unavailable. This can occur during rebalance operations. ");
        }
        ArrayList arrayList = new ArrayList(allMetadata.size());
        for (StreamsMetadata streamsMetadata : allMetadata) {
            if (streamsMetadata.topicPartitions().size() > 0) {
                arrayList.add(serviceForHostInfo(streamsMetadata.hostInfo()));
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Stream<S> allServicesStream() {
        Collection<S> allServices = allServices();
        return (!this.parallel || allServices.size() <= 1) ? allServices.stream() : allServices.parallelStream();
    }

    private S serviceForHostInfo(HostInfo hostInfo) {
        return this.hostInfo2service.computeIfAbsent(hostInfo, hostInfo2 -> {
            if (this.localApplicationServer.equals(hostInfo2)) {
                log.info("Obtaining local service '{}' for host info '{}'", this.storeName, hostInfo2);
                return localService(this.storeName, this.streams);
            }
            log.info("Obtaining remote service '{}' for host info '{}'", this.storeName, hostInfo2);
            return remoteServiceGrpcClient(this.storeName, this.grpcChannelProvider.apply(hostInfo2), this.keySerde);
        });
    }

    protected abstract S localService(String str, KafkaStreams kafkaStreams);

    protected abstract S remoteServiceGrpcClient(String str, Channel channel, Serde<K> serde);
}
