package org.apache.gobblin.util.limiter;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.callback.Callback;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.r2.RetriableRequestException;
import com.linkedin.restli.client.Response;
import com.linkedin.restli.client.RestClient;
import com.linkedin.restli.client.RestLiResponseException;
import com.linkedin.restli.common.HttpStatus;
import java.beans.ConstructorProperties;
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.gobblin.broker.ResourceInstance;
import org.apache.gobblin.broker.iface.ConfigView;
import org.apache.gobblin.broker.iface.NotConfiguredException;
import org.apache.gobblin.broker.iface.ScopeType;
import org.apache.gobblin.broker.iface.ScopedConfigView;
import org.apache.gobblin.broker.iface.SharedResourceFactory;
import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.restli.SharedRestClientFactory;
import org.apache.gobblin.restli.SharedRestClientKey;
import org.apache.gobblin.restli.UriRestClientKey;
import org.apache.gobblin.restli.throttling.PermitAllocation;
import org.apache.gobblin.restli.throttling.PermitRequest;
import org.apache.gobblin.util.ExponentialBackoff;
import org.apache.gobblin.util.limiter.RequestSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/util/limiter/RedirectAwareRestClientRequestSender.class */
public class RedirectAwareRestClientRequestSender extends RestClientRequestSender {
    private static final Logger log = LoggerFactory.getLogger(RedirectAwareRestClientRequestSender.class);
    private static final int MIN_RETRIES = 3;
    private final SharedResourcesBroker<?> broker;
    private final List<String> connectionPrefixes;
    private volatile RestClient restClient;
    private volatile String currentServerPrefix;
    private volatile int lastPrefixAttempted = -1;
    private String lastLogPrefix = "";
    private AtomicInteger requestsSinceLastLog = new AtomicInteger(0);
    private long lastLogTimeNanos = 0;

    /* loaded from: input_file:org/apache/gobblin/util/limiter/RedirectAwareRestClientRequestSender$CallbackDecorator.class */
    private class CallbackDecorator implements Callback<Response<PermitAllocation>> {
        private final PermitRequest originalRequest;
        private final Callback<Response<PermitAllocation>> underlying;
        private final ExponentialBackoff exponentialBackoff = ExponentialBackoff.builder().maxDelay(Long.valueOf(BatchedPermitsRequester.DEFAULT_TARGET_MILLIS_BETWEEN_REQUESTS)).initialDelay(500L).build();
        private int redirects = 0;
        private int retries = 0;

        public void onError(Throwable th) {
            try {
                if ((th instanceof RestLiResponseException) && ((RestLiResponseException) th).getStatus() == HttpStatus.S_301_MOVED_PERMANENTLY.getCode()) {
                    this.redirects++;
                    if (this.redirects >= 5) {
                        this.underlying.onError(new RequestSender.NonRetriableException("Too many redirects."));
                    }
                    RedirectAwareRestClientRequestSender.this.updateRestClient(SharedRestClientFactory.resolveUriPrefix(new URI((String) ((RestLiResponseException) th).getErrorDetails().get("Location"))), "301 redirect", null);
                    this.exponentialBackoff.awaitNextRetry();
                    RedirectAwareRestClientRequestSender.this.sendRequest(this.originalRequest, this);
                } else if ((th instanceof RemoteInvocationException) && RedirectAwareRestClientRequestSender.this.shouldCatchExceptionAndSwitchUrl((RemoteInvocationException) th)) {
                    this.retries++;
                    if (this.retries > RedirectAwareRestClientRequestSender.this.connectionPrefixes.size() + RedirectAwareRestClientRequestSender.MIN_RETRIES) {
                        this.underlying.onError(new RequestSender.NonRetriableException("Failed to connect to all available connection prefixes.", th));
                    }
                    RedirectAwareRestClientRequestSender.this.updateRestClient(RedirectAwareRestClientRequestSender.this.getNextConnectionPrefix(), "Failed to communicate with " + RedirectAwareRestClientRequestSender.this.getCurrentServerPrefix(), th);
                    this.exponentialBackoff.awaitNextRetry();
                    RedirectAwareRestClientRequestSender.this.sendRequest(this.originalRequest, this);
                } else {
                    this.underlying.onError(th);
                }
            } catch (Throwable th2) {
                this.underlying.onError(th2);
            }
        }

        public void onSuccess(Response<PermitAllocation> response) {
            this.underlying.onSuccess(response);
        }

        @ConstructorProperties({"originalRequest", "underlying"})
        public CallbackDecorator(PermitRequest permitRequest, Callback<Response<PermitAllocation>> callback) {
            this.originalRequest = permitRequest;
            this.underlying = callback;
        }
    }

    /* loaded from: input_file:org/apache/gobblin/util/limiter/RedirectAwareRestClientRequestSender$Factory.class */
    public static class Factory<S extends ScopeType<S>> implements SharedResourceFactory<RequestSender, SharedRestClientKey, S> {
        public String getName() {
            return "restli";
        }

        public SharedResourceFactoryResponse<RequestSender> createResource(SharedResourcesBroker<S> sharedResourcesBroker, ScopedConfigView<S, SharedRestClientKey> scopedConfigView) throws NotConfiguredException {
            try {
                return new ResourceInstance(new RedirectAwareRestClientRequestSender(sharedResourcesBroker, SharedRestClientFactory.parseConnectionPrefixes(scopedConfigView.getConfig(), scopedConfigView.getKey())));
            } catch (URISyntaxException e) {
                throw new RuntimeException(e);
            }
        }

        public S getAutoScope(SharedResourcesBroker<S> sharedResourcesBroker, ConfigView<S, SharedRestClientKey> configView) {
            return (S) sharedResourcesBroker.selfScope().getType().rootScope();
        }
    }

    public RedirectAwareRestClientRequestSender(SharedResourcesBroker<?> sharedResourcesBroker, List<String> list) throws NotConfiguredException {
        this.broker = sharedResourcesBroker;
        this.connectionPrefixes = list;
        updateRestClient(getNextConnectionPrefix(), "service start", null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getNextConnectionPrefix() {
        if (this.lastPrefixAttempted < 0) {
            this.lastPrefixAttempted = new Random().nextInt(this.connectionPrefixes.size());
        }
        this.lastPrefixAttempted = (this.lastPrefixAttempted + 1) % this.connectionPrefixes.size();
        log.info("Round robin: " + this.lastPrefixAttempted);
        return this.connectionPrefixes.get(this.lastPrefixAttempted);
    }

    @Override // org.apache.gobblin.util.limiter.RestClientRequestSender, org.apache.gobblin.util.limiter.RequestSender
    public void sendRequest(PermitRequest permitRequest, Callback<Response<PermitAllocation>> callback) {
        logRequest();
        super.sendRequest(permitRequest, callback);
    }

    private void logRequest() {
        String currentServerPrefix = getCurrentServerPrefix();
        if (!currentServerPrefix.equals(this.lastLogPrefix)) {
            logAggregatedRequests(this.lastLogPrefix);
            log.info("Sending request to " + currentServerPrefix);
            this.lastLogPrefix = currentServerPrefix;
        } else {
            this.requestsSinceLastLog.incrementAndGet();
            log.debug("Sending request to {}", currentServerPrefix);
            if (TimeUnit.SECONDS.convert(System.nanoTime() - this.lastLogTimeNanos, TimeUnit.NANOSECONDS) > 60) {
                logAggregatedRequests(currentServerPrefix);
            }
        }
    }

    private void logAggregatedRequests(String str) {
        int andSet = this.requestsSinceLastLog.getAndSet(0);
        long nanoTime = System.nanoTime();
        long convert = TimeUnit.MILLISECONDS.convert(nanoTime - this.lastLogTimeNanos, TimeUnit.NANOSECONDS);
        this.lastLogTimeNanos = nanoTime;
        if (andSet > 0) {
            log.info(String.format("Made %d requests to %s over the last %d millis.", Integer.valueOf(andSet), str, Long.valueOf(convert)));
        }
    }

    @Override // org.apache.gobblin.util.limiter.RestClientRequestSender
    protected RestClient getRestClient() {
        return this.restClient;
    }

    @Override // org.apache.gobblin.util.limiter.RestClientRequestSender
    protected Callback<Response<PermitAllocation>> decorateCallback(PermitRequest permitRequest, Callback<Response<PermitAllocation>> callback) {
        return callback instanceof CallbackDecorator ? callback : new CallbackDecorator(permitRequest, callback);
    }

    @VisibleForTesting
    void updateRestClient(String str, String str2, Throwable th) throws NotConfiguredException {
        if (th == null) {
            log.info(String.format("Switching to server prefix %s due to: %s", str, str2));
        } else {
            log.error(String.format("Switching to server prefix %s due to: %s", str, str2), th);
        }
        this.currentServerPrefix = str;
        this.restClient = (RestClient) this.broker.getSharedResource(new SharedRestClientFactory(), new UriRestClientKey(RestliLimiterFactory.RESTLI_SERVICE_NAME, str));
    }

    public boolean shouldCatchExceptionAndSwitchUrl(RemoteInvocationException remoteInvocationException) {
        return (remoteInvocationException.getCause() instanceof RetriableRequestException) || (remoteInvocationException.getCause() instanceof ConnectException) || (remoteInvocationException.getCause() instanceof TimeoutException);
    }

    public String getCurrentServerPrefix() {
        return this.currentServerPrefix;
    }
}
