package org.apache.druid.security.basic;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.ClientResponse;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.server.DruidNode;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/druid/security/basic/CommonCacheNotifier.class */
public class CommonCacheNotifier {
    private static final EmittingLogger LOG = new EmittingLogger(CommonCacheNotifier.class);
    private static final List<NodeRole> NODE_TYPES = Arrays.asList(NodeRole.BROKER, NodeRole.OVERLORD, NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.ROUTER, NodeRole.MIDDLE_MANAGER, NodeRole.INDEXER);
    private final DruidNodeDiscoveryProvider discoveryProvider;
    private final HttpClient httpClient;
    private final BlockingQueue<Pair<String, byte[]>> updateQueue = new LinkedBlockingQueue();
    private final Map<String, BasicAuthDBConfig> itemConfigMap;
    private final String baseUrl;
    private final String callerName;
    private final ExecutorService exec;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/security/basic/CommonCacheNotifier$ResponseHandler.class */
    public static class ResponseHandler implements HttpResponseHandler<StatusResponseHolder, StatusResponseHolder> {
        protected static final Logger log = new Logger(ResponseHandler.class);

        private ResponseHandler() {
        }

        public ClientResponse<StatusResponseHolder> handleResponse(HttpResponse httpResponse, HttpResponseHandler.TrafficCop trafficCop) {
            return ClientResponse.unfinished(new StatusResponseHolder(httpResponse.getStatus(), (StringBuilder) null));
        }

        public ClientResponse<StatusResponseHolder> handleChunk(ClientResponse<StatusResponseHolder> clientResponse, HttpChunk httpChunk, long j) {
            return clientResponse;
        }

        public ClientResponse<StatusResponseHolder> done(ClientResponse<StatusResponseHolder> clientResponse) {
            return ClientResponse.finished(clientResponse.getObj());
        }

        public void exceptionCaught(ClientResponse<StatusResponseHolder> clientResponse, Throwable th) {
            log.error(th, "exceptionCaught in CommonCacheNotifier ResponseHandler.", new Object[0]);
        }
    }

    public CommonCacheNotifier(Map<String, BasicAuthDBConfig> map, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, HttpClient httpClient, String str, String str2) {
        this.exec = Execs.singleThreaded(StringUtils.format("%s-notifierThread-", new Object[]{StringUtils.encodeForFormat(str2)}) + "%d");
        this.callerName = str2;
        this.itemConfigMap = map;
        this.discoveryProvider = druidNodeDiscoveryProvider;
        this.httpClient = httpClient;
        this.baseUrl = str;
    }

    public void start() {
        this.exec.submit(() -> {
            while (!Thread.interrupted()) {
                try {
                    LOG.debug(this.callerName + ":Waiting for cache update notification", new Object[0]);
                    Pair<String, byte[]> take = this.updateQueue.take();
                    String str = (String) take.lhs;
                    byte[] bArr = (byte[]) take.rhs;
                    BasicAuthDBConfig basicAuthDBConfig = this.itemConfigMap.get(take.lhs);
                    if (basicAuthDBConfig.isEnableCacheNotifications()) {
                        LOG.debug(this.callerName + ":Sending cache update notifications", new Object[0]);
                        try {
                            Iterator it = ((List) Futures.allAsList(sendUpdate(str, bArr)).get(basicAuthDBConfig.getCacheNotificationTimeout(), TimeUnit.MILLISECONDS)).iterator();
                            while (it.hasNext()) {
                                LOG.debug(this.callerName + ":Got status: " + ((StatusResponseHolder) it.next()).getStatus(), new Object[0]);
                            }
                        } catch (Exception e) {
                            LOG.makeAlert(e, this.callerName + ":Failed to get response for cache notification.", new Object[0]).emit();
                        }
                        LOG.debug(this.callerName + ":Received responses for cache update notifications.", new Object[0]);
                    }
                } catch (Throwable th) {
                    LOG.makeAlert(th, this.callerName + ":Error occured while handling updates for cachedUserMaps.", new Object[0]).emit();
                }
            }
        });
    }

    public void stop() {
        this.exec.shutdownNow();
    }

    public void addUpdate(String str, byte[] bArr) {
        this.updateQueue.add(new Pair<>(str, bArr));
    }

    private List<ListenableFuture<StatusResponseHolder>> sendUpdate(String str, byte[] bArr) {
        ArrayList arrayList = new ArrayList();
        Iterator<NodeRole> it = NODE_TYPES.iterator();
        while (it.hasNext()) {
            Iterator it2 = this.discoveryProvider.getForNodeRole(it.next()).getAllNodes().iterator();
            while (it2.hasNext()) {
                Request request = new Request(HttpMethod.POST, getListenerURL(((DiscoveryDruidNode) it2.next()).getDruidNode(), StringUtils.format(this.baseUrl, new Object[]{StringUtils.urlEncode(str)})));
                request.setContent("application/json", bArr);
                arrayList.add(this.httpClient.go(request, new ResponseHandler(), Duration.millis(this.itemConfigMap.get(str).getCacheNotificationTimeout())));
            }
        }
        return arrayList;
    }

    private URL getListenerURL(DruidNode druidNode, String str) {
        try {
            return new URL(druidNode.getServiceScheme(), druidNode.getHost(), druidNode.getPortToUse(), str);
        } catch (MalformedURLException e) {
            LOG.error(this.callerName + ": Malformed url for DruidNode[%s] and baseUrl[%s]", new Object[]{druidNode, str});
            throw new RuntimeException(e);
        }
    }
}
