package gobblin.util.limiter;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.common.collect.TreeMultimap;
import com.linkedin.common.callback.Callback;
import com.linkedin.data.template.GetMode;
import com.linkedin.restli.client.Response;
import com.linkedin.restli.client.RestLiResponseException;
import com.linkedin.restli.common.HttpStatus;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import gobblin.metrics.MetricContext;
import gobblin.restli.throttling.PermitAllocation;
import gobblin.restli.throttling.PermitRequest;
import gobblin.util.ExecutorsUtils;
import gobblin.util.NoopCloseable;
import gobblin.util.limiter.RequestSender;
import java.beans.ConstructorProperties;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:gobblin/util/limiter/BatchedPermitsRequester.class */
public class BatchedPermitsRequester {
    public static final String REST_REQUEST_TIMER = "limiter.restli.restRequestTimer";
    public static final String REST_REQUEST_PERMITS_HISTOGRAM = "limiter.restli.restRequestPermitsHistogram";
    public static final long DEFAULT_TARGET_MILLIS_BETWEEN_REQUESTS = 10000;
    protected static final int MAX_RETRIES = 5;
    private static final long RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION = 60000;
    private static final double MAX_DEPLETION_RATE = 1.0E20d;
    public static final int MAX_GROWTH_REQUEST = 2;

    @VisibleForTesting
    private final PermitBatchContainer permitBatchContainer;
    private final Lock lock;
    private final Condition newPermitsAvailable;
    private final Semaphore requestSemaphore;
    private final PermitRequest basePermitRequest;
    private final RequestSender requestSender;
    private final Timer restRequestTimer;
    private final Histogram restRequestHistogram;
    private volatile int retries;
    private final RetryStatus retryStatus;
    private final SynchronizedAverager permitsOutstanding;
    private final long targetMillisBetweenRequests;
    private static final Logger log = LoggerFactory.getLogger(BatchedPermitsRequester.class);
    public static final ImmutableSet<Integer> NON_RETRIABLE_ERRORS = ImmutableSet.of(Integer.valueOf(HttpStatus.S_403_FORBIDDEN.getCode()), Integer.valueOf(HttpStatus.S_422_UNPROCESSABLE_ENTITY.getCode()));
    private static final ScheduledExecutorService SCHEDULE_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1, ExecutorsUtils.newDaemonThreadFactory(Optional.of(log), Optional.of(BatchedPermitsRequester.class.getName() + "-schedule-%d")));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:gobblin/util/limiter/BatchedPermitsRequester$AllocationCallback.class */
    public class AllocationCallback implements Callback<Response<PermitAllocation>> {
        private final Closeable timerContext;

        public void onError(Throwable th) {
            BatchedPermitsRequester.this.lock.lock();
            try {
                try {
                    if (th instanceof RequestSender.NonRetriableException) {
                        nonRetriableFail(th, "Encountered non retriable error. ");
                    }
                    if (th instanceof RestLiResponseException) {
                        int status = ((RestLiResponseException) th).getStatus();
                        if (BatchedPermitsRequester.NON_RETRIABLE_ERRORS.contains(Integer.valueOf(status))) {
                            nonRetriableFail(th, "Encountered non retriable error. HTTP response code: " + status);
                        }
                    }
                    BatchedPermitsRequester.access$708(BatchedPermitsRequester.this);
                    if (BatchedPermitsRequester.this.retries >= BatchedPermitsRequester.MAX_RETRIES) {
                        nonRetriableFail(th, "Too many failures trying to communicate with throttling service.");
                    } else {
                        BatchedPermitsRequester.this.requestSemaphore.release();
                        BatchedPermitsRequester.this.maybeSendNewPermitRequest();
                    }
                } finally {
                    BatchedPermitsRequester.this.lock.unlock();
                    try {
                        this.timerContext.close();
                    } catch (IOException e) {
                    }
                }
            } catch (Throwable th2) {
                BatchedPermitsRequester.log.error("Error on batched permits container.", th2);
                BatchedPermitsRequester.this.lock.unlock();
                try {
                    this.timerContext.close();
                } catch (IOException e2) {
                }
            }
        }

        public void onSuccess(Response<PermitAllocation> response) {
            BatchedPermitsRequester.this.retries = 0;
            BatchedPermitsRequester.this.lock.lock();
            try {
                PermitAllocation permitAllocation = (PermitAllocation) response.getEntity();
                BatchedPermitsRequester.log.debug("Received permit allocation " + permitAllocation);
                Long minRetryDelayMillis = permitAllocation.getMinRetryDelayMillis(GetMode.NULL);
                if (minRetryDelayMillis != null) {
                    BatchedPermitsRequester.this.retryStatus.blockRetries(minRetryDelayMillis.longValue(), null);
                }
                if (permitAllocation.getPermits().longValue() > 0) {
                    BatchedPermitsRequester.this.permitBatchContainer.addPermitAllocation(permitAllocation);
                }
                BatchedPermitsRequester.this.requestSemaphore.release();
                if (permitAllocation.getPermits().longValue() > 0) {
                    BatchedPermitsRequester.this.newPermitsAvailable.signalAll();
                }
            } finally {
                try {
                    this.timerContext.close();
                } catch (IOException e) {
                }
                BatchedPermitsRequester.this.lock.unlock();
            }
        }

        private void nonRetriableFail(Throwable th, String str) {
            BatchedPermitsRequester.this.retryStatus.blockRetries(BatchedPermitsRequester.RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION, th);
            BatchedPermitsRequester.this.requestSemaphore.release();
            BatchedPermitsRequester.log.error(str, th);
            BatchedPermitsRequester.this.newPermitsAvailable.signalAll();
        }

        @ConstructorProperties({"timerContext"})
        public AllocationCallback(Closeable closeable) {
            this.timerContext = closeable;
        }
    }

    /* loaded from: input_file:gobblin/util/limiter/BatchedPermitsRequester$BatchedPermitsRequesterBuilder.class */
    public static class BatchedPermitsRequesterBuilder {
        private String resourceId;
        private String requestorIdentifier;
        private long targetMillisBetweenRequests;
        private RequestSender requestSender;
        private MetricContext metricContext;

        BatchedPermitsRequesterBuilder() {
        }

        public BatchedPermitsRequesterBuilder resourceId(String str) {
            this.resourceId = str;
            return this;
        }

        public BatchedPermitsRequesterBuilder requestorIdentifier(String str) {
            this.requestorIdentifier = str;
            return this;
        }

        public BatchedPermitsRequesterBuilder targetMillisBetweenRequests(long j) {
            this.targetMillisBetweenRequests = j;
            return this;
        }

        public BatchedPermitsRequesterBuilder requestSender(RequestSender requestSender) {
            this.requestSender = requestSender;
            return this;
        }

        public BatchedPermitsRequesterBuilder metricContext(MetricContext metricContext) {
            this.metricContext = metricContext;
            return this;
        }

        public BatchedPermitsRequester build() {
            return new BatchedPermitsRequester(this.resourceId, this.requestorIdentifier, this.targetMillisBetweenRequests, this.requestSender, this.metricContext);
        }

        public String toString() {
            return "BatchedPermitsRequester.BatchedPermitsRequesterBuilder(resourceId=" + this.resourceId + ", requestorIdentifier=" + this.requestorIdentifier + ", targetMillisBetweenRequests=" + this.targetMillisBetweenRequests + ", requestSender=" + this.requestSender + ", metricContext=" + this.metricContext + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NotThreadSafe
    /* loaded from: input_file:gobblin/util/limiter/BatchedPermitsRequester$PermitBatch.class */
    public static class PermitBatch {
        private static final AtomicLong NEXT_KEY = new AtomicLong(0);
        private volatile long permits;
        private final long expiration;
        private final long autoIncrementKey = NEXT_KEY.getAndIncrement();
        private final long initialPermits;
        private long firstUseTime;
        private long lastPermitUsedTime;
        private int permitRequests;

        PermitBatch(long j, long j2) {
            this.permits = j;
            this.expiration = j2;
            this.initialPermits = j;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void decrementPermits(long j) {
            if (this.firstUseTime == 0) {
                this.firstUseTime = System.currentTimeMillis();
            }
            this.permitRequests++;
            this.permits -= j;
            if (this.permits <= 0) {
                this.lastPermitUsedTime = System.currentTimeMillis();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public double getAverageDepletionRate() {
            if (this.firstUseTime == 0) {
                return BatchedPermitsRequester.MAX_DEPLETION_RATE;
            }
            return (this.lastPermitUsedTime > 0 ? this.lastPermitUsedTime : System.currentTimeMillis()) > this.firstUseTime ? (this.initialPermits - this.permits) / (r8 - this.firstUseTime) : BatchedPermitsRequester.MAX_DEPLETION_RATE;
        }

        public long getPermits() {
            return this.permits;
        }

        public long getExpiration() {
            return this.expiration;
        }

        public long getAutoIncrementKey() {
            return this.autoIncrementKey;
        }

        public long getInitialPermits() {
            return this.initialPermits;
        }

        public long getFirstUseTime() {
            return this.firstUseTime;
        }

        public long getLastPermitUsedTime() {
            return this.lastPermitUsedTime;
        }

        public int getPermitRequests() {
            return this.permitRequests;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:gobblin/util/limiter/BatchedPermitsRequester$PermitBatchContainer.class */
    public static class PermitBatchContainer {
        private final TreeMultimap<Long, PermitBatch> batches = TreeMultimap.create(Ordering.natural(), new Comparator<PermitBatch>() { // from class: gobblin.util.limiter.BatchedPermitsRequester.PermitBatchContainer.1
            @Override // java.util.Comparator
            public int compare(PermitBatch permitBatch, PermitBatch permitBatch2) {
                return Long.compare(permitBatch.autoIncrementKey, permitBatch2.autoIncrementKey);
            }
        });
        private volatile long totalAvailablePermits = 0;

        PermitBatchContainer() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized boolean tryTake(long j) {
            purgeExpiredBatches();
            if (this.totalAvailablePermits < j) {
                return false;
            }
            this.totalAvailablePermits -= j;
            Iterator it = this.batches.values().iterator();
            while (it.hasNext()) {
                PermitBatch permitBatch = (PermitBatch) it.next();
                if (permitBatch.getPermits() >= j) {
                    permitBatch.decrementPermits(j);
                    return true;
                }
                j -= permitBatch.getPermits();
                it.remove();
            }
            throw new RuntimeException("Total permits was unsynced! This is an error in code.");
        }

        private synchronized void printState(String str) {
            StringBuilder append = new StringBuilder(str).append("->");
            append.append("BatchedPermitsRequester state (").append(hashCode()).append("): ");
            append.append("TotalPermits: ").append(this.totalAvailablePermits).append(" ");
            append.append("Batches(").append(this.batches.size()).append("): ");
            Iterator it = this.batches.values().iterator();
            while (it.hasNext()) {
                append.append(((PermitBatch) it.next()).getPermits()).append(",");
            }
            BatchedPermitsRequester.log.info(append.toString());
        }

        private synchronized void purgeExpiredBatches() {
            purgeBatches(this.batches.asMap().subMap(Long.MIN_VALUE, Long.valueOf(System.currentTimeMillis())).values().iterator());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void purgeAll() {
            purgeBatches(this.batches.asMap().values().iterator());
        }

        private void purgeBatches(Iterator<Collection<PermitBatch>> it) {
            while (it.hasNext()) {
                Iterator<PermitBatch> it2 = it.next().iterator();
                while (it2.hasNext()) {
                    this.totalAvailablePermits -= Long.valueOf(it2.next().getPermits()).longValue();
                }
                it.remove();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void addPermitAllocation(PermitAllocation permitAllocation) {
            this.batches.put(permitAllocation.getExpiration(), new PermitBatch(permitAllocation.getPermits().longValue(), permitAllocation.getExpiration().longValue()));
            this.totalAvailablePermits += permitAllocation.getPermits().longValue();
        }

        public long getTotalAvailablePermits() {
            return this.totalAvailablePermits;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:gobblin/util/limiter/BatchedPermitsRequester$RetryStatus.class */
    public class RetryStatus {
        private long retryAt;

        @Nullable
        private Throwable exception;

        private RetryStatus() {
        }

        public boolean canRetryNow() {
            return canRetryWithinMillis(0L);
        }

        public boolean canRetryWithinMillis(long j) {
            return System.currentTimeMillis() + j >= this.retryAt;
        }

        public void blockRetries(long j, Throwable th) {
            this.exception = th;
            this.retryAt = System.currentTimeMillis() + j;
            BatchedPermitsRequester.SCHEDULE_EXECUTOR_SERVICE.schedule(new Runnable() { // from class: gobblin.util.limiter.BatchedPermitsRequester.RetryStatus.1
                @Override // java.lang.Runnable
                public void run() {
                    BatchedPermitsRequester.this.maybeSendNewPermitRequest();
                }
            }, j, TimeUnit.MILLISECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:gobblin/util/limiter/BatchedPermitsRequester$SynchronizedAverager.class */
    public static class SynchronizedAverager {
        private volatile long weight;
        private volatile long entries;

        private SynchronizedAverager() {
        }

        @SuppressFBWarnings(value = {"VO_VOLATILE_INCREMENT"}, justification = "All methods updating volatile variables are synchronized")
        public synchronized void addEntryWithWeight(long j) {
            this.entries++;
            this.weight += j;
        }

        @SuppressFBWarnings(value = {"VO_VOLATILE_INCREMENT"}, justification = "All methods updating volatile variables are synchronized")
        public synchronized void removeEntryWithWeight(long j) {
            if (this.entries == 0) {
                throw new IllegalStateException("Cannot have a negative number of entries.");
            }
            this.entries--;
            this.weight -= j;
        }

        public synchronized double getAverageWeightOrZero() {
            if (this.entries == 0) {
                return 0.0d;
            }
            return this.weight / this.entries;
        }

        public long getTotalWeight() {
            return this.weight;
        }

        public long getNumEntries() {
            return this.entries;
        }
    }

    private BatchedPermitsRequester(String str, String str2, long j, RequestSender requestSender, MetricContext metricContext) {
        this.retries = 0;
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "Must provide a resource id.");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str2), "Must provide a requestor identifier.");
        this.permitBatchContainer = new PermitBatchContainer();
        this.lock = new ReentrantLock();
        this.newPermitsAvailable = this.lock.newCondition();
        this.requestSemaphore = new Semaphore(1);
        this.permitsOutstanding = new SynchronizedAverager();
        this.targetMillisBetweenRequests = j > 0 ? j : DEFAULT_TARGET_MILLIS_BETWEEN_REQUESTS;
        this.requestSender = requestSender;
        this.retryStatus = new RetryStatus();
        this.basePermitRequest = new PermitRequest();
        this.basePermitRequest.setResource(str);
        this.basePermitRequest.setRequestorIdentifier(str2);
        this.restRequestTimer = metricContext == null ? null : metricContext.timer(REST_REQUEST_TIMER);
        this.restRequestHistogram = metricContext == null ? null : metricContext.histogram(REST_REQUEST_PERMITS_HISTOGRAM);
    }

    public boolean getPermits(long j) throws InterruptedException {
        if (j <= 0) {
            return true;
        }
        this.permitsOutstanding.addEntryWithWeight(j);
        this.lock.lock();
        while (!this.permitBatchContainer.tryTake(j)) {
            try {
                if (!this.retryStatus.canRetryWithinMillis(DEFAULT_TARGET_MILLIS_BETWEEN_REQUESTS)) {
                    return false;
                }
                maybeSendNewPermitRequest();
                this.newPermitsAvailable.await();
            } finally {
                this.lock.unlock();
            }
        }
        this.permitsOutstanding.removeEntryWithWeight(j);
        this.lock.unlock();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void maybeSendNewPermitRequest() {
        if (this.requestSemaphore.tryAcquire()) {
            if (!this.retryStatus.canRetryNow()) {
                this.requestSemaphore.release();
                return;
            }
            try {
                long computeNextPermitRequest = computeNextPermitRequest();
                if (computeNextPermitRequest <= 0) {
                    this.requestSemaphore.release();
                    return;
                }
                PermitRequest copy = this.basePermitRequest.copy();
                copy.setPermits(computeNextPermitRequest);
                copy.setMinPermits((long) this.permitsOutstanding.getAverageWeightOrZero());
                if (this.restRequestHistogram != null) {
                    this.restRequestHistogram.update(computeNextPermitRequest);
                }
                log.debug("Sending permit request " + copy);
                this.requestSender.sendRequest(copy, new AllocationCallback(this.restRequestTimer == null ? NoopCloseable.INSTANCE : this.restRequestTimer.time()));
            } catch (CloneNotSupportedException e) {
                this.requestSemaphore.release();
                throw new RuntimeException(e);
            }
        }
    }

    private long computeNextPermitRequest() {
        PermitBatch permitBatch;
        long j = 0;
        long totalWeight = this.permitsOutstanding.getTotalWeight() - this.permitBatchContainer.totalAvailablePermits;
        if (totalWeight > 0) {
            j = totalWeight;
        }
        if (this.permitBatchContainer.batches.size() <= 1 && (permitBatch = (PermitBatch) Iterables.getFirst(this.permitBatchContainer.batches.values(), (Object) null)) != null && permitBatch.getPermits() / permitBatch.getInitialPermits() <= 0.2d) {
            return Math.max(j, Math.min((long) (permitBatch.getAverageDepletionRate() * this.targetMillisBetweenRequests), 2 * permitBatch.getInitialPermits()));
        }
        return j;
    }

    @VisibleForTesting
    public void clearAllStoredPermits() {
        getPermitBatchContainer().purgeAll();
    }

    public static BatchedPermitsRequesterBuilder builder() {
        return new BatchedPermitsRequesterBuilder();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PermitBatchContainer getPermitBatchContainer() {
        return this.permitBatchContainer;
    }

    static /* synthetic */ int access$708(BatchedPermitsRequester batchedPermitsRequester) {
        int i = batchedPermitsRequester.retries;
        batchedPermitsRequester.retries = i + 1;
        return i;
    }
}
