package org.apache.dubbo.registry.xds.util.protocol;

import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.registry.xds.util.AdsObserver;
import org.apache.dubbo.registry.xds.util.XdsListener;
import org.apache.dubbo.registry.xds.util.protocol.DeltaResource;

/* loaded from: input_file:org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.class */
public abstract class AbstractProtocol<T, S extends DeltaResource<T>> implements XdsProtocol<T>, XdsListener {
    private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractProtocol.class);
    protected AdsObserver adsObserver;
    protected final Node node;
    private final int checkInterval;
    protected Set<String> observeResourcesName;
    public static final String emptyResourceName = "emptyResourcesName";
    protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    protected final ReentrantReadWriteLock.ReadLock readLock = this.lock.readLock();
    protected final ReentrantReadWriteLock.WriteLock writeLock = this.lock.writeLock();
    private final ReentrantLock resourceLock = new ReentrantLock();
    protected Map<Set<String>, List<Consumer<Map<String, T>>>> consumerObserveMap = new ConcurrentHashMap();
    protected Map<String, T> resourcesMap = new ConcurrentHashMap();

    public Map<Set<String>, List<Consumer<Map<String, T>>>> getConsumerObserveMap() {
        return this.consumerObserveMap;
    }

    public AbstractProtocol(AdsObserver adsObserver, Node node, int i) {
        this.adsObserver = adsObserver;
        this.node = node;
        this.checkInterval = i;
        adsObserver.addListener(this);
    }

    public abstract String getTypeUrl();

    public boolean isCacheExistResource(Set<String> set) {
        for (String str : set) {
            if (!"".equals(str) && !this.resourcesMap.containsKey(str)) {
                return false;
            }
        }
        return true;
    }

    public T getCacheResource(String str) {
        if (str == null || str.length() == 0) {
            return null;
        }
        return this.resourcesMap.get(str);
    }

    @Override // org.apache.dubbo.registry.xds.util.protocol.XdsProtocol
    public Map<String, T> getResource(Set<String> set) {
        Set<String> emptySet = set == null ? Collections.emptySet() : set;
        return (emptySet.isEmpty() || !isCacheExistResource(emptySet)) ? getResourceFromRemote(emptySet) : getResourceFromCache(emptySet);
    }

    private Map<String, T> getResourceFromCache(Set<String> set) {
        return (Map) set.stream().filter(str -> {
            return !StringUtils.isEmpty(str);
        }).collect(Collectors.toMap(str2 -> {
            return str2;
        }, this::getCacheResource));
    }

    public Map<String, T> getResourceFromRemote(Set<String> set) {
        try {
            this.resourceLock.lock();
            CompletableFuture completableFuture = new CompletableFuture();
            this.observeResourcesName = set;
            Set<String> hashSet = new HashSet();
            if (set.isEmpty()) {
                hashSet.add(emptyResourceName);
            } else {
                hashSet = set;
            }
            Objects.requireNonNull(completableFuture);
            Consumer consumer = (v1) -> {
                r0.complete(v1);
            };
            try {
                this.writeLock.lock();
                ((List) ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap) this.consumerObserveMap, hashSet, set2 -> {
                    return new ArrayList();
                })).add(consumer);
                this.writeLock.unlock();
                HashSet hashSet2 = new HashSet(set);
                hashSet2.addAll(this.resourcesMap.keySet());
                this.adsObserver.request(buildDiscoveryRequest(hashSet2));
                logger.info("Send xDS Observe request to remote. Resource count: " + hashSet2.size() + ". Resource Type: " + getTypeUrl());
                try {
                    Map<String, T> map = (Map) completableFuture.get();
                    try {
                        this.writeLock.lock();
                        this.consumerObserveMap.get(hashSet).removeIf(consumer2 -> {
                            return consumer2.equals(consumer);
                        });
                        this.writeLock.unlock();
                        return map;
                    } finally {
                    }
                } catch (InterruptedException e) {
                    logger.error("99-1", "", "", "InterruptedException occur when request control panel. error=", e);
                    Thread.currentThread().interrupt();
                    this.resourceLock.unlock();
                    return Collections.emptyMap();
                } catch (Exception e2) {
                    logger.error("4-11", "", "", "Error occur when request control panel. error=", e2);
                    this.resourceLock.unlock();
                    return Collections.emptyMap();
                }
            } finally {
            }
        } finally {
            this.resourceLock.unlock();
        }
    }

    @Override // org.apache.dubbo.registry.xds.util.protocol.XdsProtocol
    public void observeResource(Set<String> set, Consumer<Map<String, T>> consumer, boolean z) {
        if (!z) {
            consumer.accept(getResource(set));
            try {
                this.writeLock.lock();
                this.consumerObserveMap.compute(set, (set2, list) -> {
                    if (list == null) {
                        list = new ArrayList();
                    }
                    list.add(consumer);
                    return list;
                });
                this.writeLock.unlock();
            } finally {
            }
        }
        try {
            this.writeLock.lock();
            this.observeResourcesName = (Set) this.consumerObserveMap.keySet().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toSet());
            this.writeLock.unlock();
        } finally {
        }
    }

    public void unobserveResource(Set<String> set, Consumer<Map<String, T>> consumer) {
    }

    protected DiscoveryRequest buildDiscoveryRequest(Set<String> set) {
        return DiscoveryRequest.newBuilder().setNode(this.node).setTypeUrl(getTypeUrl()).addAllResourceNames(set).build();
    }

    protected abstract Map<String, T> decodeDiscoveryResponse(DiscoveryResponse discoveryResponse);

    @Override // org.apache.dubbo.registry.xds.util.XdsListener
    public final void process(DiscoveryResponse discoveryResponse) {
        Map<String, T> decodeDiscoveryResponse = decodeDiscoveryResponse(discoveryResponse);
        discoveryResponseListener(this.resourcesMap, decodeDiscoveryResponse);
        this.resourcesMap = decodeDiscoveryResponse;
    }

    private void discoveryResponseListener(Map<String, T> map, Map<String, T> map2) {
        HashSet hashSet = new HashSet();
        map.forEach((str, obj) -> {
            if (Objects.equals(obj, map2.get(str))) {
                return;
            }
            hashSet.add(str);
        });
        map2.forEach((str2, obj2) -> {
            if (Objects.equals(obj2, map.get(str2))) {
                return;
            }
            hashSet.add(str2);
        });
        if (hashSet.isEmpty()) {
            return;
        }
        logger.info("Receive resource update notification from xds server. Change resource count: " + hashSet.stream() + ". Type: " + getTypeUrl());
        try {
            this.readLock.lock();
            for (Map.Entry<Set<String>, List<Consumer<Map<String, T>>>> entry : this.consumerObserveMap.entrySet()) {
                Stream<String> stream = entry.getKey().stream();
                Objects.requireNonNull(hashSet);
                if (!stream.noneMatch((v1) -> {
                    return r1.contains(v1);
                })) {
                    Map map3 = (Map) entry.getKey().stream().collect(Collectors.toMap(str3 -> {
                        return str3;
                    }, str4 -> {
                        return map2.get(str4);
                    }));
                    entry.getValue().forEach(consumer -> {
                        consumer.accept(map3);
                    });
                }
            }
        } finally {
            this.readLock.unlock();
        }
    }
}
