package org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.cloudwatch.model.StandardUnit;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.Lease;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.LeaseSelector;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import org.apache.flink.kinesis.shaded.org.apache.commons.logging.Log;
import org.apache.flink.kinesis.shaded.org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/flink/kinesis/shaded/com/amazonaws/services/kinesis/leases/impl/LeaseTaker.class */
public class LeaseTaker<T extends Lease> implements ILeaseTaker<T> {
    private static final int TAKE_RETRIES = 3;
    private static final int SCAN_RETRIES = 1;
    private final ILeaseManager<T> leaseManager;
    private final LeaseSelector<T> leaseSelector;
    private final String workerIdentifier;
    private final Map<String, T> allLeases;
    private final long leaseDurationNanos;
    private int maxLeasesForWorker;
    private int maxLeasesToStealAtOneTime;
    private long lastScanTimeNanos;
    private static final Log LOG = LogFactory.getLog(LeaseTaker.class);
    private static final Callable<Long> SYSTEM_CLOCK_CALLABLE = new Callable<Long>() { // from class: org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.LeaseTaker.1
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Long call() {
            return Long.valueOf(System.nanoTime());
        }
    };

    private static <T extends Lease> LeaseSelector<T> getDefaultLeaseSelector() {
        return new GenericLeaseSelector();
    }

    public LeaseTaker(ILeaseManager<T> iLeaseManager, String str, long j) {
        this(iLeaseManager, getDefaultLeaseSelector(), str, j);
    }

    public LeaseTaker(ILeaseManager<T> iLeaseManager, LeaseSelector<T> leaseSelector, String str, long j) {
        this.allLeases = new HashMap();
        this.maxLeasesForWorker = Integer.MAX_VALUE;
        this.maxLeasesToStealAtOneTime = 1;
        this.lastScanTimeNanos = 0L;
        this.leaseManager = iLeaseManager;
        this.leaseSelector = leaseSelector;
        this.workerIdentifier = str;
        this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(j);
    }

    public LeaseTaker<T> withMaxLeasesForWorker(int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("maxLeasesForWorker should be >= 1");
        }
        this.maxLeasesForWorker = i;
        return this;
    }

    public LeaseTaker<T> withMaxLeasesToStealAtOneTime(int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("maxLeasesToStealAtOneTime should be >= 1");
        }
        this.maxLeasesToStealAtOneTime = i;
        return this;
    }

    @Override // org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker
    public Map<String, T> takeLeases() throws DependencyException, InvalidStateException {
        return takeLeases(SYSTEM_CLOCK_CALLABLE);
    }

    synchronized Map<String, T> takeLeases(Callable<Long> callable) throws DependencyException, InvalidStateException {
        HashMap hashMap = new HashMap();
        long currentTimeMillis = System.currentTimeMillis();
        boolean z = false;
        ProvisionedThroughputException provisionedThroughputException = null;
        for (int i = 1; i <= 1; i++) {
            try {
                try {
                    updateAllLeases(callable);
                    z = true;
                } catch (ProvisionedThroughputException e) {
                    LOG.info(String.format("Worker %s could not find expired leases on try %d out of %d", this.workerIdentifier, Integer.valueOf(i), 3));
                    provisionedThroughputException = e;
                }
            } finally {
                MetricsHelper.addSuccessAndLatency("ListLeases", currentTimeMillis, z, MetricsLevel.DETAILED);
            }
        }
        if (provisionedThroughputException != null) {
            LOG.error("Worker " + this.workerIdentifier + " could not scan leases table, aborting takeLeases. Exception caught by last retry:", provisionedThroughputException);
            return hashMap;
        }
        Set<T> computeLeasesToTake = computeLeasesToTake(getExpiredLeases());
        HashSet hashSet = new HashSet();
        for (T t : computeLeasesToTake) {
            String leaseKey = t.getLeaseKey();
            long currentTimeMillis2 = System.currentTimeMillis();
            boolean z2 = false;
            int i2 = 1;
            while (true) {
                if (i2 > 3) {
                    break;
                }
                try {
                    try {
                        if (this.leaseManager.takeLease(t, this.workerIdentifier)) {
                            t.setLastCounterIncrementNanos(Long.valueOf(System.nanoTime()));
                            hashMap.put(leaseKey, t);
                        } else {
                            hashSet.add(leaseKey);
                        }
                        z2 = true;
                    } catch (ProvisionedThroughputException e2) {
                        LOG.info(String.format("Could not take lease with key %s for worker %s on try %d out of %d due to capacity", leaseKey, this.workerIdentifier, Integer.valueOf(i2), 3));
                        i2++;
                    }
                } catch (Throwable th) {
                    MetricsHelper.addSuccessAndLatency("TakeLease", currentTimeMillis2, false, MetricsLevel.DETAILED);
                    throw th;
                }
            }
            MetricsHelper.addSuccessAndLatency("TakeLease", currentTimeMillis2, z2, MetricsLevel.DETAILED);
        }
        if (hashMap.size() > 0) {
            LOG.info(String.format("Worker %s successfully took %d leases: %s", this.workerIdentifier, Integer.valueOf(hashMap.size()), stringJoin(hashMap.keySet(), ", ")));
        }
        if (hashSet.size() > 0) {
            LOG.info(String.format("Worker %s failed to take %d leases: %s", this.workerIdentifier, Integer.valueOf(hashSet.size()), stringJoin(hashSet, ", ")));
        }
        MetricsHelper.getMetricsScope().addData("TakenLeases", hashMap.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
        return hashMap;
    }

    static String stringJoin(Collection<String> collection, String str) {
        StringBuilder sb = new StringBuilder();
        boolean z = false;
        for (String str2 : collection) {
            if (z) {
                sb.append(str);
            }
            sb.append(str2);
            z = true;
        }
        return sb.toString();
    }

    private void updateAllLeases(Callable<Long> callable) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        List<T> listLeases = this.leaseManager.listLeases();
        try {
            this.lastScanTimeNanos = callable.call().longValue();
            HashSet hashSet = new HashSet(this.allLeases.keySet());
            for (T t : listLeases) {
                String leaseKey = t.getLeaseKey();
                T t2 = this.allLeases.get(leaseKey);
                this.allLeases.put(leaseKey, t);
                hashSet.remove(leaseKey);
                if (t2 != null) {
                    if (t2.getLeaseCounter().equals(t.getLeaseCounter())) {
                        t.setLastCounterIncrementNanos(t2.getLastCounterIncrementNanos());
                    } else {
                        t.setLastCounterIncrementNanos(Long.valueOf(this.lastScanTimeNanos));
                    }
                } else if (t.getLeaseOwner() == null) {
                    t.setLastCounterIncrementNanos(0L);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Treating new lease with key " + leaseKey + " as never renewed because it is new and unowned.");
                    }
                } else {
                    t.setLastCounterIncrementNanos(Long.valueOf(this.lastScanTimeNanos));
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Treating new lease with key " + leaseKey + " as recently renewed because it is new and owned.");
                    }
                }
            }
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                this.allLeases.remove((String) it.next());
            }
        } catch (Exception e) {
            throw new DependencyException("Exception caught from timeProvider", e);
        }
    }

    private List<T> getExpiredLeases() {
        ArrayList arrayList = new ArrayList();
        for (T t : this.allLeases.values()) {
            if (t.isExpired(this.leaseDurationNanos, this.lastScanTimeNanos)) {
                arrayList.add(t);
            }
        }
        return arrayList;
    }

    private Set<T> computeLeasesToTake(List<T> list) {
        int i;
        Map<String, Integer> computeLeaseCounts = computeLeaseCounts(list);
        HashSet hashSet = new HashSet();
        IMetricsScope metricsScope = MetricsHelper.getMetricsScope();
        int leaseCountThatCanBeTaken = this.leaseSelector.getLeaseCountThatCanBeTaken(this.allLeases.values());
        int size = computeLeaseCounts.size();
        if (leaseCountThatCanBeTaken == 0) {
            return hashSet;
        }
        if (size >= leaseCountThatCanBeTaken) {
            i = 1;
        } else {
            i = (leaseCountThatCanBeTaken / size) + (leaseCountThatCanBeTaken % size == 0 ? 0 : 1);
            int max = Math.max(0, i - this.maxLeasesForWorker);
            if (i > this.maxLeasesForWorker) {
                LOG.warn(String.format("Worker %s target is %d leases and maxLeasesForWorker is %d. Resetting target to %d, lease spillover is %d.  Note that some shards may not be processed if no other workers are able to pick them up.", this.workerIdentifier, Integer.valueOf(i), Integer.valueOf(this.maxLeasesForWorker), Integer.valueOf(this.maxLeasesForWorker), Integer.valueOf(max)));
                i = this.maxLeasesForWorker;
            }
            metricsScope.addData("LeaseSpillover", max, StandardUnit.Count, MetricsLevel.SUMMARY);
        }
        int intValue = computeLeaseCounts.get(this.workerIdentifier).intValue();
        int i2 = i - intValue;
        if (i2 <= 0) {
            return hashSet;
        }
        Collections.shuffle(list);
        int size2 = list.size();
        if (list.size() > 0) {
            hashSet = this.leaseSelector.getLeasesToTakeFromExpiredLeases(list, i2);
        } else {
            for (T t : chooseLeasesToSteal(computeLeaseCounts, i2, i)) {
                LOG.info(String.format("Worker %s needed %d leases but none were expired, so it will steal lease %s from %s", this.workerIdentifier, Integer.valueOf(i2), t.getLeaseKey(), t.getLeaseOwner()));
                hashSet.add(t);
            }
        }
        if (!hashSet.isEmpty()) {
            LOG.info(String.format("Worker %s saw %d total leases, %d available leases, %d workers. Target is %d leases, I have %d leases, I will take %d leases", this.workerIdentifier, Integer.valueOf(leaseCountThatCanBeTaken), Integer.valueOf(size2), Integer.valueOf(size), Integer.valueOf(i), Integer.valueOf(intValue), Integer.valueOf(hashSet.size())));
        }
        metricsScope.addData("TotalLeases", leaseCountThatCanBeTaken, StandardUnit.Count, MetricsLevel.DETAILED);
        metricsScope.addData("ExpiredLeases", size2, StandardUnit.Count, MetricsLevel.SUMMARY);
        metricsScope.addData("NumWorkers", size, StandardUnit.Count, MetricsLevel.SUMMARY);
        metricsScope.addData("NeededLeases", i2, StandardUnit.Count, MetricsLevel.DETAILED);
        metricsScope.addData("LeasesToTake", hashSet.size(), StandardUnit.Count, MetricsLevel.DETAILED);
        return hashSet;
    }

    private List<T> chooseLeasesToSteal(Map<String, Integer> map, int i, int i2) {
        ArrayList arrayList = new ArrayList();
        Map.Entry<String, Integer> entry = null;
        for (Map.Entry<String, Integer> entry2 : map.entrySet()) {
            if (entry == null || entry.getValue().intValue() < entry2.getValue().intValue()) {
                entry = entry2;
            }
        }
        int i3 = 0;
        if (entry.getValue().intValue() >= i2 && i > 0) {
            int min = Math.min(i, entry.getValue().intValue() - i2);
            if (i > 1 && min == 0) {
                min = 1;
            }
            i3 = Math.min(min, this.maxLeasesToStealAtOneTime);
        }
        if (i3 <= 0) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Worker %s not stealing from most loaded worker %s.  He has %d, target is %d, and I need %d", this.workerIdentifier, entry.getKey(), entry.getValue(), Integer.valueOf(i2), Integer.valueOf(i)));
            }
            return arrayList;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Worker %s will attempt to steal %d leases from most loaded worker %s.  He has %d leases, target is %d, I need %d, maxLeasesToSteatAtOneTime is %d.", this.workerIdentifier, Integer.valueOf(i3), entry.getKey(), entry.getValue(), Integer.valueOf(i2), Integer.valueOf(i), Integer.valueOf(this.maxLeasesToStealAtOneTime)));
        }
        String key = entry.getKey();
        ArrayList arrayList2 = new ArrayList();
        for (T t : this.allLeases.values()) {
            if (key.equals(t.getLeaseOwner())) {
                arrayList2.add(t);
            }
        }
        Collections.shuffle(arrayList2);
        arrayList.addAll(arrayList2.subList(0, Math.min(arrayList2.size(), i3)));
        return arrayList;
    }

    private Map<String, Integer> computeLeaseCounts(List<T> list) {
        HashMap hashMap = new HashMap();
        for (T t : this.allLeases.values()) {
            if (!list.contains(t)) {
                String leaseOwner = t.getLeaseOwner();
                Integer num = (Integer) hashMap.get(leaseOwner);
                if (num == null) {
                    hashMap.put(leaseOwner, 1);
                } else {
                    hashMap.put(leaseOwner, Integer.valueOf(num.intValue() + 1));
                }
            }
        }
        if (((Integer) hashMap.get(this.workerIdentifier)) == null) {
            hashMap.put(this.workerIdentifier, 0);
        }
        return hashMap;
    }

    @Override // org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker
    public String getWorkerIdentifier() {
        return this.workerIdentifier;
    }
}
