package org.apache.druid.server.lookup.namespace.cache;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.druid.concurrent.ConcurrentAwaitableCounter;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.common.Cleaners;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.query.lookup.namespace.CacheGenerator;
import org.apache.druid.query.lookup.namespace.ExtractionNamespace;

@LazySingleton
/* loaded from: input_file:org/apache/druid/server/lookup/namespace/cache/CacheScheduler.class */
public final class CacheScheduler {
    private static final Logger log = new Logger(CacheScheduler.class);
    private final Map<Class<? extends ExtractionNamespace>, CacheGenerator<?>> namespaceGeneratorMap;
    private final NamespaceExtractionCacheManager cacheManager;
    private final AtomicLong updatesStarted = new AtomicLong(0);
    private final AtomicInteger activeEntries = new AtomicInteger();

    /* loaded from: input_file:org/apache/druid/server/lookup/namespace/cache/CacheScheduler$CacheState.class */
    public interface CacheState {
    }

    /* loaded from: input_file:org/apache/druid/server/lookup/namespace/cache/CacheScheduler$Entry.class */
    public final class Entry<T extends ExtractionNamespace> implements AutoCloseable {
        private final EntryImpl<T> impl;

        private Entry(T t, CacheGenerator<T> cacheGenerator) {
            this.impl = new EntryImpl<>(t, this, cacheGenerator);
        }

        public CacheState getCacheState() {
            return (CacheState) ((EntryImpl) this.impl).cacheStateHolder.get();
        }

        public Map<String, String> getCache() {
            CacheState cacheState = getCacheState();
            if (cacheState instanceof VersionedCache) {
                return ((VersionedCache) cacheState).getCache();
            }
            throw new ISE("Cannot get cache: %s", new Object[]{cacheState});
        }

        @VisibleForTesting
        Future<?> getUpdaterFuture() {
            return ((EntryImpl) this.impl).updaterFuture;
        }

        @VisibleForTesting
        public void awaitTotalUpdates(int i) throws InterruptedException {
            ((EntryImpl) this.impl).updateCounter.awaitCount(i);
        }

        @VisibleForTesting
        public void awaitTotalUpdatesWithTimeout(int i, long j) throws InterruptedException, TimeoutException {
            ((EntryImpl) this.impl).updateCounter.awaitCount(i, j, TimeUnit.MILLISECONDS);
        }

        @VisibleForTesting
        void awaitNextUpdates(int i) throws InterruptedException {
            ((EntryImpl) this.impl).updateCounter.awaitNextIncrements(i);
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.impl.close();
        }

        public String toString() {
            return this.impl.toString();
        }
    }

    /* loaded from: input_file:org/apache/druid/server/lookup/namespace/cache/CacheScheduler$EntryImpl.class */
    public class EntryImpl<T extends ExtractionNamespace> implements AutoCloseable {
        private final T namespace;
        private final String asString;
        private final AtomicReference<CacheState> cacheStateHolder;
        private final Future<?> updaterFuture;
        private final Cleaners.Cleanable entryCleanable;
        private final CacheGenerator<T> cacheGenerator;
        private final ConcurrentAwaitableCounter updateCounter;
        private final CountDownLatch startLatch;
        private final CompletableFuture<Boolean> firstLoadFinishedSuccessfully;

        private EntryImpl(T t, Entry<T> entry, CacheGenerator<T> cacheGenerator) {
            this.cacheStateHolder = new AtomicReference<>(NoCache.CACHE_NOT_INITIALIZED);
            this.updateCounter = new ConcurrentAwaitableCounter();
            this.startLatch = new CountDownLatch(1);
            this.firstLoadFinishedSuccessfully = new CompletableFuture<>();
            try {
                this.namespace = t;
                this.asString = StringUtils.format("namespace [%s] : %s", new Object[]{t, super.toString()});
                this.updaterFuture = schedule(t);
                this.entryCleanable = createCleaner(entry);
                this.cacheGenerator = cacheGenerator;
                CacheScheduler.this.activeEntries.incrementAndGet();
                this.startLatch.countDown();
            } catch (Throwable th) {
                this.startLatch.countDown();
                throw th;
            }
        }

        private Cleaners.Cleanable createCleaner(Entry<T> entry) {
            return Cleaners.register(entry, this::closeFromCleaner);
        }

        private Future<?> schedule(T t) {
            long pollMs = t.getPollMs();
            Runnable runnable = this::updateCache;
            return pollMs > 0 ? CacheScheduler.this.cacheManager.scheduledExecutorService().scheduleAtFixedRate(runnable, t.getJitterMills(), pollMs, TimeUnit.MILLISECONDS) : CacheScheduler.this.cacheManager.scheduledExecutorService().schedule(runnable, t.getJitterMills(), TimeUnit.MILLISECONDS);
        }

        private void updateCache() {
            boolean z = false;
            try {
                try {
                    this.startLatch.await();
                    CacheState cacheState = this.cacheStateHolder.get();
                    if (!Thread.currentThread().isInterrupted() && cacheState != NoCache.ENTRY_CLOSED) {
                        z = tryUpdateCache(currentVersionOrNull(cacheState));
                    }
                    if (this.firstLoadFinishedSuccessfully.isDone()) {
                        return;
                    }
                    this.firstLoadFinishedSuccessfully.complete(Boolean.valueOf(z));
                } catch (Throwable th) {
                    try {
                        close();
                    } catch (Exception e) {
                        th.addSuppressed(e);
                    }
                    if (Thread.currentThread().isInterrupted() || (th instanceof InterruptedException) || (th instanceof Error)) {
                        throw new RuntimeException(th);
                    }
                    if (this.firstLoadFinishedSuccessfully.isDone()) {
                        return;
                    }
                    this.firstLoadFinishedSuccessfully.complete(Boolean.valueOf(z));
                }
            } catch (Throwable th2) {
                if (!this.firstLoadFinishedSuccessfully.isDone()) {
                    this.firstLoadFinishedSuccessfully.complete(Boolean.valueOf(z));
                }
                throw th2;
            }
        }

        /* JADX WARN: Removed duplicated region for block: B:29:0x00e7 A[DONT_GENERATE, FINALLY_INSNS] */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        private boolean tryUpdateCache(java.lang.String r9) throws java.lang.Exception {
            /*
                Method dump skipped, instructions count: 252
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.druid.server.lookup.namespace.cache.CacheScheduler.EntryImpl.tryUpdateCache(java.lang.String):boolean");
        }

        private String currentVersionOrNull(CacheState cacheState) {
            if (cacheState instanceof VersionedCache) {
                return ((VersionedCache) cacheState).version;
            }
            return null;
        }

        private CacheState swapCacheState(VersionedCache versionedCache) {
            CacheState cacheState;
            do {
                cacheState = this.cacheStateHolder.get();
                if (cacheState == NoCache.ENTRY_CLOSED) {
                    return cacheState;
                }
            } while (!this.cacheStateHolder.compareAndSet(cacheState, versionedCache));
            this.updateCounter.increment();
            return cacheState;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            if (!doClose(true)) {
                CacheScheduler.log.error("Cache for %s has already been closed", new Object[]{this});
            }
            this.entryCleanable.clean();
        }

        private void closeFromCleaner() {
            try {
                if (doClose(false)) {
                    CacheScheduler.log.error("Entry.close() was not called, closed resources by the JVM", new Object[0]);
                }
            } catch (Throwable th) {
                try {
                    CacheScheduler.log.error(th, "Error while closing %s", new Object[]{this});
                } catch (Exception e) {
                    th.addSuppressed(e);
                }
                Throwables.propagateIfInstanceOf(th, Error.class);
            }
        }

        private boolean doClose(boolean z) {
            CacheState andSet = this.cacheStateHolder.getAndSet(NoCache.ENTRY_CLOSED);
            if (andSet == NoCache.ENTRY_CLOSED) {
                return false;
            }
            try {
                CacheScheduler.log.info("Closing %s", new Object[]{this});
                logExecutionError();
                return true;
            } finally {
                CacheScheduler.this.activeEntries.decrementAndGet();
                this.updaterFuture.cancel(true);
                if (z && (andSet instanceof VersionedCache)) {
                    ((VersionedCache) andSet).cacheHandler.close();
                }
            }
        }

        private void logExecutionError() {
            if (this.updaterFuture.isDone()) {
                try {
                    this.updaterFuture.get();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                } catch (CancellationException e2) {
                    CacheScheduler.log.error(e2, "Future for %s has already been cancelled", new Object[]{this});
                } catch (ExecutionException e3) {
                    CacheScheduler.log.error(e3.getCause(), "Error in %s", new Object[]{this});
                }
            }
        }

        public String toString() {
            return this.asString;
        }
    }

    /* loaded from: input_file:org/apache/druid/server/lookup/namespace/cache/CacheScheduler$NoCache.class */
    public enum NoCache implements CacheState {
        CACHE_NOT_INITIALIZED,
        ENTRY_CLOSED
    }

    /* loaded from: input_file:org/apache/druid/server/lookup/namespace/cache/CacheScheduler$VersionedCache.class */
    public static final class VersionedCache implements CacheState, AutoCloseable {
        final String entryId;
        final CacheHandler cacheHandler;
        final String version;

        private VersionedCache(String str, String str2, CacheHandler cacheHandler) {
            this.entryId = str;
            this.cacheHandler = cacheHandler;
            this.version = str2;
        }

        public Map<String, String> getCache() {
            return this.cacheHandler.getCache();
        }

        public LookupExtractor asLookupExtractor(boolean z, Supplier<byte[]> supplier) {
            return this.cacheHandler.asLookupExtractor(z, supplier);
        }

        public String getVersion() {
            return this.version;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.cacheHandler.close();
            CacheScheduler.log.debug("Closed version [%s] of %s", new Object[]{this.version, this.entryId});
        }
    }

    @Inject
    public CacheScheduler(final ServiceEmitter serviceEmitter, Map<Class<? extends ExtractionNamespace>, CacheGenerator<?>> map, NamespaceExtractionCacheManager namespaceExtractionCacheManager) {
        this.namespaceGeneratorMap = new IdentityHashMap(map);
        this.cacheManager = namespaceExtractionCacheManager;
        namespaceExtractionCacheManager.scheduledExecutorService().scheduleAtFixedRate(new Runnable() { // from class: org.apache.druid.server.lookup.namespace.cache.CacheScheduler.1
            long priorUpdatesStarted = 0;

            @Override // java.lang.Runnable
            public void run() {
                try {
                    long j = CacheScheduler.this.updatesStarted.get();
                    serviceEmitter.emit(ServiceMetricEvent.builder().setMetric("namespace/deltaTasksStarted", Long.valueOf(j - this.priorUpdatesStarted)));
                    this.priorUpdatesStarted = j;
                } catch (Exception e) {
                    CacheScheduler.log.error(e, "Error emitting namespace stats", new Object[0]);
                    if (Thread.currentThread().isInterrupted()) {
                        throw new RuntimeException(e);
                    }
                }
            }
        }, 1L, 10L, TimeUnit.MINUTES);
    }

    @VisibleForTesting
    long updatesStarted() {
        return this.updatesStarted.get();
    }

    @VisibleForTesting
    public long getActiveEntries() {
        return this.activeEntries.get();
    }

    @Nullable
    public Entry scheduleAndWait(ExtractionNamespace extractionNamespace, long j) throws InterruptedException {
        Entry schedule = schedule(extractionNamespace);
        log.debug("Scheduled new %s", new Object[]{schedule});
        boolean z = false;
        try {
            z = ((Boolean) schedule.impl.firstLoadFinishedSuccessfully.get(j, TimeUnit.MILLISECONDS)).booleanValue();
            if (z) {
                if (!z) {
                    schedule.close();
                    if (0 != 0) {
                        log.error((Throwable) null, "CacheScheduler[%s] - problem during start or waiting for the first run", new Object[]{schedule});
                    } else {
                        log.error("CacheScheduler[%s] - problem during start or waiting for the first run", new Object[]{schedule});
                    }
                }
                return schedule;
            }
            if (!z) {
                schedule.close();
                if (0 != 0) {
                    log.error((Throwable) null, "CacheScheduler[%s] - problem during start or waiting for the first run", new Object[]{schedule});
                } else {
                    log.error("CacheScheduler[%s] - problem during start or waiting for the first run", new Object[]{schedule});
                }
            }
            return null;
        } catch (ExecutionException | TimeoutException e) {
            if (!z) {
                schedule.close();
                if (e != null) {
                    log.error(e, "CacheScheduler[%s] - problem during start or waiting for the first run", new Object[]{schedule});
                } else {
                    log.error("CacheScheduler[%s] - problem during start or waiting for the first run", new Object[]{schedule});
                }
            }
            return null;
        } catch (Throwable th) {
            if (!z) {
                schedule.close();
                if (0 != 0) {
                    log.error((Throwable) null, "CacheScheduler[%s] - problem during start or waiting for the first run", new Object[]{schedule});
                } else {
                    log.error("CacheScheduler[%s] - problem during start or waiting for the first run", new Object[]{schedule});
                }
            }
            throw th;
        }
    }

    public <T extends ExtractionNamespace> Entry schedule(T t) {
        CacheGenerator<?> cacheGenerator = this.namespaceGeneratorMap.get(t.getClass());
        if (cacheGenerator == null) {
            throw new ISE("Cannot find generator for namespace [%s]", new Object[]{t});
        }
        return new Entry(t, cacheGenerator);
    }
}
