package gobblin.util.limiter;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.callback.Callback;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.r2.RetriableRequestException;
import com.linkedin.restli.client.Response;
import com.linkedin.restli.client.RestClient;
import com.linkedin.restli.client.RestLiResponseException;
import com.linkedin.restli.common.HttpStatus;
import gobblin.broker.ResourceInstance;
import gobblin.broker.iface.ConfigView;
import gobblin.broker.iface.NotConfiguredException;
import gobblin.broker.iface.ScopeType;
import gobblin.broker.iface.ScopedConfigView;
import gobblin.broker.iface.SharedResourceFactory;
import gobblin.broker.iface.SharedResourceFactoryResponse;
import gobblin.broker.iface.SharedResourcesBroker;
import gobblin.restli.SharedRestClientFactory;
import gobblin.restli.SharedRestClientKey;
import gobblin.restli.UriRestClientKey;
import gobblin.restli.throttling.PermitAllocation;
import gobblin.restli.throttling.PermitRequest;
import gobblin.util.ExponentialBackoff;
import gobblin.util.limiter.RequestSender;
import java.beans.ConstructorProperties;
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/util/limiter/RedirectAwareRestClientRequestSender.class */
public class RedirectAwareRestClientRequestSender extends RestClientRequestSender {
    private static final Logger log = LoggerFactory.getLogger(RedirectAwareRestClientRequestSender.class);
    private final SharedResourcesBroker<?> broker;
    private final List<String> connectionPrefixes;
    private volatile int lastPrefixAttempted = -1;
    private volatile RestClient restClient;
    private volatile String currentServerPrefix;

    /* loaded from: input_file:gobblin/util/limiter/RedirectAwareRestClientRequestSender$CallbackDecorator.class */
    private class CallbackDecorator implements Callback<Response<PermitAllocation>> {
        private final PermitRequest originalRequest;
        private final Callback<Response<PermitAllocation>> underlying;
        private final ExponentialBackoff exponentialBackoff = ExponentialBackoff.builder().maxDelay(Long.valueOf(BatchedPermitsRequester.DEFAULT_TARGET_MILLIS_BETWEEN_REQUESTS)).build();
        private int redirects = 0;
        private int retries = 0;

        public void onError(Throwable th) {
            try {
                if ((th instanceof RestLiResponseException) && ((RestLiResponseException) th).getStatus() == HttpStatus.S_301_MOVED_PERMANENTLY.getCode()) {
                    this.redirects++;
                    if (this.redirects >= 5) {
                        this.underlying.onError(new RequestSender.NonRetriableException("Too many redirects."));
                    }
                    RedirectAwareRestClientRequestSender.this.updateRestClient(SharedRestClientFactory.resolveUriPrefix(new URI((String) ((RestLiResponseException) th).getErrorDetails().get("Location"))), "301 redirect");
                    this.exponentialBackoff.awaitNextRetry();
                    RedirectAwareRestClientRequestSender.this.sendRequest(this.originalRequest, this);
                } else if ((th instanceof RemoteInvocationException) && RedirectAwareRestClientRequestSender.this.shouldCatchExceptionAndSwitchUrl((RemoteInvocationException) th)) {
                    this.retries++;
                    if (this.retries > RedirectAwareRestClientRequestSender.this.connectionPrefixes.size()) {
                        this.underlying.onError(new RequestSender.NonRetriableException("Failed to connect to all available connection prefixes."));
                    }
                    RedirectAwareRestClientRequestSender.log.info("Retries " + this.retries + " this " + hashCode());
                    RedirectAwareRestClientRequestSender.this.updateRestClient(RedirectAwareRestClientRequestSender.this.getNextConnectionPrefix(), "Failed to communicate with " + RedirectAwareRestClientRequestSender.this.getCurrentServerPrefix());
                    this.exponentialBackoff.awaitNextRetry();
                    RedirectAwareRestClientRequestSender.this.sendRequest(this.originalRequest, this);
                } else {
                    this.underlying.onError(th);
                }
            } catch (Throwable th2) {
                this.underlying.onError(th2);
            }
        }

        public void onSuccess(Response<PermitAllocation> response) {
            this.underlying.onSuccess(response);
        }

        @ConstructorProperties({"originalRequest", "underlying"})
        public CallbackDecorator(PermitRequest permitRequest, Callback<Response<PermitAllocation>> callback) {
            this.originalRequest = permitRequest;
            this.underlying = callback;
        }
    }

    /* loaded from: input_file:gobblin/util/limiter/RedirectAwareRestClientRequestSender$Factory.class */
    public static class Factory<S extends ScopeType<S>> implements SharedResourceFactory<RedirectAwareRestClientRequestSender, SharedRestClientKey, S> {
        public String getName() {
            return "restli";
        }

        public SharedResourceFactoryResponse<RedirectAwareRestClientRequestSender> createResource(SharedResourcesBroker<S> sharedResourcesBroker, ScopedConfigView<S, SharedRestClientKey> scopedConfigView) throws NotConfiguredException {
            try {
                return new ResourceInstance(new RedirectAwareRestClientRequestSender(sharedResourcesBroker, SharedRestClientFactory.parseConnectionPrefixes(scopedConfigView.getConfig(), scopedConfigView.getKey())));
            } catch (URISyntaxException e) {
                throw new RuntimeException(e);
            }
        }

        public S getAutoScope(SharedResourcesBroker<S> sharedResourcesBroker, ConfigView<S, SharedRestClientKey> configView) {
            return (S) sharedResourcesBroker.selfScope().getType().rootScope();
        }
    }

    public RedirectAwareRestClientRequestSender(SharedResourcesBroker<?> sharedResourcesBroker, List<String> list) throws NotConfiguredException {
        this.broker = sharedResourcesBroker;
        this.connectionPrefixes = list;
        updateRestClient(getNextConnectionPrefix(), "service start");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getNextConnectionPrefix() {
        if (this.lastPrefixAttempted < 0) {
            this.lastPrefixAttempted = new Random().nextInt(this.connectionPrefixes.size());
        }
        this.lastPrefixAttempted = (this.lastPrefixAttempted + 1) % this.connectionPrefixes.size();
        log.info("Round robin: " + this.lastPrefixAttempted);
        return this.connectionPrefixes.get(this.lastPrefixAttempted);
    }

    @Override // gobblin.util.limiter.RestClientRequestSender, gobblin.util.limiter.RequestSender
    public void sendRequest(PermitRequest permitRequest, Callback<Response<PermitAllocation>> callback) {
        log.info("Sending request to " + getCurrentServerPrefix());
        super.sendRequest(permitRequest, callback);
    }

    @Override // gobblin.util.limiter.RestClientRequestSender
    protected RestClient getRestClient() {
        return this.restClient;
    }

    @Override // gobblin.util.limiter.RestClientRequestSender
    protected Callback<Response<PermitAllocation>> decorateCallback(PermitRequest permitRequest, Callback<Response<PermitAllocation>> callback) {
        return callback instanceof CallbackDecorator ? callback : new CallbackDecorator(permitRequest, callback);
    }

    @VisibleForTesting
    void updateRestClient(String str, String str2) throws NotConfiguredException {
        log.info(String.format("Switching to server prefix %s due to: %s", str, str2));
        this.currentServerPrefix = str;
        this.restClient = (RestClient) this.broker.getSharedResource(new SharedRestClientFactory(), new UriRestClientKey(RestliLimiterFactory.RESTLI_SERVICE_NAME, str));
    }

    public boolean shouldCatchExceptionAndSwitchUrl(RemoteInvocationException remoteInvocationException) {
        return (remoteInvocationException.getCause() instanceof RetriableRequestException) || (remoteInvocationException.getCause() instanceof ConnectException) || (remoteInvocationException.getCause() instanceof TimeoutException);
    }

    public String getCurrentServerPrefix() {
        return this.currentServerPrefix;
    }
}
