package org.apache.flink.runtime.taskexecutor;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.shaded.guava31.com.google.common.base.Ticker;
import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache.class */
public class DefaultShuffleDescriptorsCache implements ShuffleDescriptorsCache {
    private final Cache<PermanentBlobKey, ShuffleDescriptorCacheEntry> shuffleDescriptorsCache;
    private final Map<JobID, Set<PermanentBlobKey>> cachedBlobKeysPerJob;

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache$Factory.class */
    public static class Factory {
        private static final int DEFAULT_CACHE_SIZE_LIMIT = 100;
        private final Duration cacheExpireTimeout;
        private final int cacheSizeLimit;
        private final Ticker ticker;
        private static final Duration DEFAULT_CACHE_EXPIRE_TIMEOUT = Duration.ofSeconds(300);
        private static final Ticker DEFAULT_TICKER = Ticker.systemTicker();

        public Factory() {
            this(DEFAULT_CACHE_EXPIRE_TIMEOUT, 100, DEFAULT_TICKER);
        }

        @VisibleForTesting
        public Factory(Duration duration, int i, Ticker ticker) {
            this.cacheExpireTimeout = duration;
            this.cacheSizeLimit = i;
            this.ticker = ticker;
        }

        public DefaultShuffleDescriptorsCache create() {
            return new DefaultShuffleDescriptorsCache(this.cacheExpireTimeout, this.cacheSizeLimit, this.ticker);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache$ShuffleDescriptorCacheEntry.class */
    private static class ShuffleDescriptorCacheEntry {
        private final TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup shuffleDescriptorGroup;
        private final JobID jobId;

        public ShuffleDescriptorCacheEntry(TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup shuffleDescriptorGroup, JobID jobID) {
            this.shuffleDescriptorGroup = (TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup) Preconditions.checkNotNull(shuffleDescriptorGroup);
            this.jobId = (JobID) Preconditions.checkNotNull(jobID);
        }

        public TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup getShuffleDescriptorGroup() {
            return this.shuffleDescriptorGroup;
        }

        public JobID getJobId() {
            return this.jobId;
        }
    }

    private DefaultShuffleDescriptorsCache(Duration duration, int i, Ticker ticker) {
        this.cachedBlobKeysPerJob = new HashMap();
        this.shuffleDescriptorsCache = CacheBuilder.newBuilder().concurrencyLevel(1).maximumSize(i).expireAfterAccess(duration).ticker(ticker).removalListener(this::onCacheRemoval).build();
    }

    @Override // org.apache.flink.runtime.taskexecutor.ShuffleDescriptorsCache
    public void clear() {
        this.cachedBlobKeysPerJob.clear();
        this.shuffleDescriptorsCache.cleanUp();
    }

    @Override // org.apache.flink.runtime.taskexecutor.ShuffleDescriptorsCache
    public TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup get(PermanentBlobKey permanentBlobKey) {
        ShuffleDescriptorCacheEntry ifPresent = this.shuffleDescriptorsCache.getIfPresent(permanentBlobKey);
        if (ifPresent == null) {
            return null;
        }
        return ifPresent.getShuffleDescriptorGroup();
    }

    @Override // org.apache.flink.runtime.taskexecutor.ShuffleDescriptorsCache
    public void put(JobID jobID, PermanentBlobKey permanentBlobKey, TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup shuffleDescriptorGroup) {
        this.shuffleDescriptorsCache.put(permanentBlobKey, new ShuffleDescriptorCacheEntry(shuffleDescriptorGroup, jobID));
        this.cachedBlobKeysPerJob.computeIfAbsent(jobID, jobID2 -> {
            return new HashSet();
        }).add(permanentBlobKey);
    }

    @Override // org.apache.flink.runtime.taskexecutor.ShuffleDescriptorsCache
    public void clearCacheForJob(JobID jobID) {
        Set<PermanentBlobKey> remove = this.cachedBlobKeysPerJob.remove(jobID);
        if (remove != null) {
            this.shuffleDescriptorsCache.invalidateAll(remove);
        }
    }

    private void onCacheRemoval(RemovalNotification<PermanentBlobKey, ShuffleDescriptorCacheEntry> removalNotification) {
        PermanentBlobKey key = removalNotification.getKey();
        ShuffleDescriptorCacheEntry value = removalNotification.getValue();
        if (key == null || value == null) {
            return;
        }
        this.cachedBlobKeysPerJob.computeIfPresent(value.getJobId(), (jobID, set) -> {
            set.remove(key);
            if (set.isEmpty()) {
                return null;
            }
            return set;
        });
    }
}
