package co.cask.cdap.messaging.cache;

import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.messaging.store.MessageFilter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;

/* loaded from: input_file:co/cask/cdap/messaging/cache/MessageCache.class */
public class MessageCache<T> {
    private static final String METRICS_WEIGHT = "cache.weight";
    private static final String METRICS_ENTRIES_ADDED = "cache.entries.added";
    private static final String METRICS_ENTRIES_REMOVED = "cache.entries.removed";
    private static final String METRICS_ADD_REQUESTS = "cache.add.requests";
    private static final String METRICS_ADD_REDUCE_WEIGHT = "cache.add.reduce.weight";
    private static final String METRICS_SCAN_REQUESTS = "cache.scan.requests";
    private static final String METRICS_SCAN_REDUCE_WEIGHT = "cache.scan.reduce.weight";
    private final NavigableSet<CacheEntry<T>> cache;
    private final Comparator<T> comparator;
    private final AtomicReference<Limits> limits;
    private final MetricsContext metricsContext;
    private final Weigher<T> weigher;
    private final AtomicLong currentWeight = new AtomicLong();
    private final AtomicBoolean needReduceWeight = new AtomicBoolean();
    private final AtomicBoolean adding = new AtomicBoolean();
    private final ReadWriteLock cacheLock = new ReentrantReadWriteLock();

    /* loaded from: input_file:co/cask/cdap/messaging/cache/MessageCache$AbstractScanner.class */
    private static abstract class AbstractScanner<T> extends AbstractIterator<T> implements Scanner<T> {
        private final Iterator<T> iterator;
        private final T firstInCache;
        private boolean closed;

        private AbstractScanner(Iterator<T> it, @Nullable T t) {
            this.iterator = it;
            this.firstInCache = t;
        }

        protected final T computeNext() {
            if (!this.closed && this.iterator.hasNext()) {
                return this.iterator.next();
            }
            close();
            return (T) endOfData();
        }

        @Override // co.cask.cdap.messaging.cache.MessageCache.Scanner
        @Nullable
        public final T getFirstInCache() {
            return this.firstInCache;
        }

        public final void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            doClose();
        }

        abstract void doClose();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/messaging/cache/MessageCache$CacheEntry.class */
    public static class CacheEntry<T> {
        private final T entry;
        private final int weight;

        private CacheEntry(T t, int i) {
            this.entry = t;
            this.weight = i;
        }

        T getEntry() {
            return this.entry;
        }

        int getWeight() {
            return this.weight;
        }

        public String toString() {
            return "CacheEntry{entry=" + this.entry + ", weight=" + this.weight + '}';
        }
    }

    /* loaded from: input_file:co/cask/cdap/messaging/cache/MessageCache$CacheEntryComparator.class */
    private static final class CacheEntryComparator<T> implements Comparator<CacheEntry<T>> {
        private final Comparator<T> comparator;

        private CacheEntryComparator(Comparator<T> comparator) {
            this.comparator = comparator;
        }

        @Override // java.util.Comparator
        public int compare(CacheEntry<T> cacheEntry, CacheEntry<T> cacheEntry2) {
            return this.comparator.compare(cacheEntry.getEntry(), cacheEntry2.getEntry());
        }
    }

    /* loaded from: input_file:co/cask/cdap/messaging/cache/MessageCache$EntryUpdater.class */
    public static abstract class EntryUpdater<T> {
        public abstract void updateEntry(T t);
    }

    /* loaded from: input_file:co/cask/cdap/messaging/cache/MessageCache$Limits.class */
    public static final class Limits {
        private final long minRetain;
        private final long reduceTrigger;
        private final long hardLimit;

        public Limits(long j, long j2, long j3) {
            Preconditions.checkArgument(j2 <= j3, "The reduce trigger weight should not be larger than hard limit");
            Preconditions.checkArgument(j <= j2, "The minimum retain weight should not be larger than the reduce trigger weight");
            this.minRetain = j;
            this.reduceTrigger = j2;
            this.hardLimit = j3;
        }

        public long getMinRetain() {
            return this.minRetain;
        }

        public long getReduceTrigger() {
            return this.reduceTrigger;
        }

        public long getHardLimit() {
            return this.hardLimit;
        }
    }

    /* loaded from: input_file:co/cask/cdap/messaging/cache/MessageCache$Scanner.class */
    public interface Scanner<T> extends CloseableIterator<T> {
        @Nullable
        T getFirstInCache();
    }

    /* loaded from: input_file:co/cask/cdap/messaging/cache/MessageCache$Weigher.class */
    public interface Weigher<T> {
        int weight(T t);
    }

    public MessageCache(Comparator<T> comparator, Weigher<T> weigher, Limits limits, MetricsContext metricsContext) {
        this.cache = new ConcurrentSkipListSet(new CacheEntryComparator(comparator));
        this.comparator = comparator;
        this.limits = new AtomicReference<>(limits);
        this.metricsContext = metricsContext;
        this.weigher = weigher;
    }

    public Comparator<T> getComparator() {
        return this.comparator;
    }

    public void addAll(Iterator<T> it) {
        if (!this.adding.compareAndSet(false, true)) {
            throw new ConcurrentModificationException("The MessageCache.addAll method shouldn't be called concurrently by multiple threads.");
        }
        long j = 0;
        CacheEntry<T> cacheEntry = null;
        int i = 0;
        while (it.hasNext()) {
            try {
                T next = it.next();
                CacheEntry<T> cacheEntry2 = new CacheEntry<>(next, this.weigher.weight(next));
                j = this.currentWeight.addAndGet(cacheEntry2.getWeight());
                if (j > this.limits.get().getHardLimit()) {
                    reduceWeight();
                    this.metricsContext.increment(METRICS_ADD_REDUCE_WEIGHT, 1L);
                    j = this.currentWeight.get();
                }
                CacheEntry<T> ceiling = cacheEntry == null ? this.cache.ceiling(cacheEntry2) : cacheEntry;
                if (ceiling != null && this.comparator.compare(ceiling.getEntry(), cacheEntry2.getEntry()) >= 0) {
                    this.currentWeight.addAndGet((-1) * cacheEntry2.getWeight());
                    clear();
                    throw new IllegalArgumentException("Cache entry must be in strictly increasing order. Entry " + next + " is smaller than or equal to " + ceiling.getEntry());
                }
                this.cache.add(cacheEntry2);
                i++;
                cacheEntry = cacheEntry2;
            } finally {
                this.adding.set(false);
            }
        }
        this.metricsContext.increment(METRICS_ADD_REQUESTS, 1L);
        this.metricsContext.increment(METRICS_ENTRIES_ADDED, i);
        this.metricsContext.gauge(METRICS_WEIGHT, j);
        if (j > this.limits.get().getHardLimit()) {
            reduceWeight();
            this.metricsContext.increment(METRICS_ADD_REDUCE_WEIGHT, 1L);
        } else if (j > this.limits.get().getReduceTrigger()) {
            this.needReduceWeight.compareAndSet(false, true);
        }
    }

    public Scanner<T> scan(T t, boolean z, int i, MessageFilter<T> messageFilter) {
        LinkedList linkedList = new LinkedList();
        this.cacheLock.readLock().lock();
        try {
            T entry = this.cache.isEmpty() ? null : this.cache.first().getEntry();
            for (CacheEntry<T> cacheEntry : this.cache.tailSet(new CacheEntry<>(t, 0), z)) {
                if (linkedList.size() >= i) {
                    break;
                }
                MessageFilter.Result mo21apply = messageFilter.mo21apply((MessageFilter<T>) cacheEntry.getEntry());
                if (mo21apply != MessageFilter.Result.ACCEPT) {
                    if (mo21apply == MessageFilter.Result.HOLD) {
                        break;
                    }
                } else {
                    linkedList.add(cacheEntry.getEntry());
                }
            }
            this.metricsContext.increment(METRICS_SCAN_REQUESTS, 1L);
            return new AbstractScanner<T>(linkedList.iterator(), entry) { // from class: co.cask.cdap.messaging.cache.MessageCache.1
                @Override // co.cask.cdap.messaging.cache.MessageCache.AbstractScanner
                void doClose() {
                    if (MessageCache.this.needReduceWeight.compareAndSet(true, false)) {
                        MessageCache.this.reduceWeight();
                        MessageCache.this.metricsContext.increment(MessageCache.METRICS_SCAN_REDUCE_WEIGHT, 1L);
                    }
                }
            };
        } finally {
            this.cacheLock.readLock().unlock();
        }
    }

    public void updateEntries(T t, T t2, EntryUpdater<T> entryUpdater) {
        CacheEntry<T> cacheEntry = new CacheEntry<>(t, 0);
        this.cacheLock.writeLock().lock();
        try {
            CacheEntry<T> lower = this.cache.lower(cacheEntry);
            Iterator<CacheEntry<T>> it = this.cache.subSet(cacheEntry, true, new CacheEntry<>(t2, 0), true).iterator();
            CacheEntry<T> next = it.hasNext() ? it.next() : null;
            while (next != null) {
                CacheEntry<T> next2 = it.hasNext() ? it.next() : null;
                CacheEntry<T> higher = next2 == null ? this.cache.higher(next) : next2;
                try {
                    entryUpdater.updateEntry(next.getEntry());
                    if ((lower != null && this.comparator.compare(lower.getEntry(), next.getEntry()) >= 0) || (higher != null && this.comparator.compare(higher.getEntry(), next.getEntry()) <= 0)) {
                        clear();
                        throw new IllegalStateException("Entry order should not be altered after update.");
                    }
                    lower = next;
                    next = next2;
                } catch (RuntimeException e) {
                    clear();
                    throw e;
                }
            }
        } finally {
            this.cacheLock.writeLock().unlock();
        }
    }

    public void clear() {
        Limits limits = this.limits.get();
        resize(new Limits(0L, 0L, 0L));
        resize(limits);
    }

    public void resize(Limits limits) {
        this.cacheLock.writeLock().lock();
        try {
            this.limits.set(limits);
            reduceWeight();
            this.cacheLock.writeLock().unlock();
        } catch (Throwable th) {
            this.cacheLock.writeLock().unlock();
            throw th;
        }
    }

    public Limits getLimits() {
        return this.limits.get();
    }

    @VisibleForTesting
    long getCurrentWeight() {
        return this.currentWeight.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reduceWeight() {
        int i = 0;
        this.cacheLock.writeLock().lock();
        try {
            long j = this.currentWeight.get();
            Iterator<CacheEntry<T>> it = this.cache.iterator();
            while (it.hasNext()) {
                CacheEntry<T> next = it.next();
                if (j - next.getWeight() < this.limits.get().getMinRetain()) {
                    break;
                }
                it.remove();
                i++;
                j = this.currentWeight.addAndGet((-1) * next.getWeight());
            }
            this.metricsContext.increment(METRICS_ENTRIES_REMOVED, i);
        } finally {
            this.cacheLock.writeLock().unlock();
        }
    }
}
