package org.apache.druid.catalog.sync;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.server.DruidNode;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/druid/catalog/sync/RestUpdateSender.class */
public class RestUpdateSender implements Consumer<byte[]> {
    private static final EmittingLogger LOG = new EmittingLogger(RestUpdateSender.class);
    private final String callerName;
    private final Supplier<Iterable<DruidNode>> destinationSupplier;
    private final String baseUrl;
    private final RestSender sender;
    private final long cacheNotificationsTimeoutMs;

    /* loaded from: input_file:org/apache/druid/catalog/sync/RestUpdateSender$HttpClientSender.class */
    private static class HttpClientSender implements RestSender {
        private final HttpClient httpClient;
        private final Duration cacheNotificationsTimeout;

        private HttpClientSender(HttpClient httpClient, Duration duration) {
            this.httpClient = httpClient;
            this.cacheNotificationsTimeout = duration;
        }

        @Override // org.apache.druid.catalog.sync.RestUpdateSender.RestSender
        public ListenableFuture<StatusResponseHolder> send(URL url, byte[] bArr) {
            return this.httpClient.go(new Request(HttpMethod.POST, url).setContent("application/x-jackson-smile", bArr), StatusResponseHandler.getInstance(), this.cacheNotificationsTimeout);
        }
    }

    /* loaded from: input_file:org/apache/druid/catalog/sync/RestUpdateSender$RestSender.class */
    public interface RestSender {
        ListenableFuture<StatusResponseHolder> send(URL url, byte[] bArr);
    }

    public RestUpdateSender(String str, Supplier<Iterable<DruidNode>> supplier, RestSender restSender, String str2, long j) {
        this.callerName = str;
        this.destinationSupplier = supplier;
        this.sender = restSender;
        this.baseUrl = str2;
        this.cacheNotificationsTimeoutMs = j;
    }

    public static RestSender httpClientSender(HttpClient httpClient, Duration duration) {
        return new HttpClientSender(httpClient, duration);
    }

    @Override // java.util.function.Consumer
    public void accept(byte[] bArr) {
        LOG.debug(this.callerName + ": Sending update notifications", new Object[0]);
        ArrayList arrayList = new ArrayList();
        Iterator<DruidNode> it = this.destinationSupplier.get().iterator();
        while (it.hasNext()) {
            arrayList.add(this.sender.send(getListenerURL(it.next(), this.baseUrl), bArr));
        }
        try {
            for (StatusResponseHolder statusResponseHolder : getResponsesFromFutures(arrayList)) {
                if (statusResponseHolder == null) {
                    LOG.error("Got null future response from update request.", new Object[0]);
                } else {
                    HttpResponseStatus status = statusResponseHolder.getStatus();
                    if (HttpResponseStatus.OK.equals(status) || HttpResponseStatus.ACCEPTED.equals(status)) {
                        LOG.debug("Got status [%s]", new Object[]{status});
                    } else {
                        LOG.error("Got error status [%s], content [%s]", new Object[]{status, statusResponseHolder.getContent()});
                    }
                }
            }
        } catch (Exception e) {
            LOG.makeAlert(e, this.callerName + ": Failed to get response for cache notification.", new Object[0]).emit();
        }
        LOG.debug(this.callerName + ": Received responses for cache update notifications.", new Object[0]);
    }

    @VisibleForTesting
    List<StatusResponseHolder> getResponsesFromFutures(List<ListenableFuture<StatusResponseHolder>> list) throws InterruptedException, ExecutionException, TimeoutException {
        return (List) Futures.successfulAsList(list).get(this.cacheNotificationsTimeoutMs, TimeUnit.MILLISECONDS);
    }

    private URL getListenerURL(DruidNode druidNode, String str) {
        try {
            return new URL(druidNode.getServiceScheme(), druidNode.getHost(), druidNode.getPortToUse(), str);
        } catch (MalformedURLException e) {
            String format = StringUtils.format(this.callerName + ": Malformed url for DruidNode [%s] and baseUrl [%s]", new Object[]{druidNode, str});
            LOG.error(format, new Object[0]);
            throw new RE(e, format, new Object[0]);
        }
    }
}
