package com.github.nagyesta.cacheonly.core.conurrent;

import com.github.nagyesta.cacheonly.core.AbstractCacheServiceTemplate;
import com.github.nagyesta.cacheonly.core.CacheRefreshStrategy;
import com.github.nagyesta.cacheonly.core.exception.CacheMissException;
import com.github.nagyesta.cacheonly.raw.concurrent.AsyncBatchServiceCaller;
import com.github.nagyesta.cacheonly.raw.exception.BatchServiceException;
import com.github.nagyesta.cacheonly.transform.BatchRequestTransformer;
import com.github.nagyesta.cacheonly.transform.BatchResponseTransformer;
import com.github.nagyesta.cacheonly.transform.concurrent.AsyncPartialCacheSupport;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:com/github/nagyesta/cacheonly/core/conurrent/ConcurrentCacheServiceTemplate.class */
public class ConcurrentCacheServiceTemplate<BR, BS, PR, PS, C, I> extends AbstractCacheServiceTemplate<AsyncBatchServiceCaller<BR, BS>, AsyncPartialCacheSupport<PR, PS, C, I>, BR, BS, PR, PS, C, I> {
    private final ForkJoinPool cachePool;
    private final ForkJoinPool originPool;

    public ConcurrentCacheServiceTemplate(@NotNull AsyncPartialCacheSupport<PR, PS, C, I> asyncPartialCacheSupport, @NotNull BatchRequestTransformer<BR, PR, I> batchRequestTransformer, @NotNull BatchResponseTransformer<BS, PS, I> batchResponseTransformer, @NotNull AsyncBatchServiceCaller<BR, BS> asyncBatchServiceCaller) {
        super(asyncPartialCacheSupport, batchRequestTransformer, batchResponseTransformer, asyncBatchServiceCaller);
        this.cachePool = asyncPartialCacheSupport.forkJoinPool();
        this.originPool = asyncPartialCacheSupport.forkJoinPool();
    }

    @Override // com.github.nagyesta.cacheonly.core.AbstractCacheServiceTemplate
    @NotNull
    protected Map<I, PS> fetchAllFromCache(@NotNull CacheRefreshStrategy cacheRefreshStrategy, @NotNull Map<I, PR> map) throws CacheMissException {
        long currentTimeMillis = System.currentTimeMillis();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        try {
            try {
                Objects.requireNonNull(concurrentHashMap);
                callCacheParallel(cacheRefreshStrategy, map, concurrentHashMap::put);
                logger().debug("Fetch all from cache completed under {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            } catch (InterruptedException | TimeoutException e) {
                logger().warn("Cache call stopped after {} (timeout set to {}).", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(partialCacheSupport().timeoutMillis()), e});
                concurrentHashMap.clear();
                logger().debug("Fetch all from cache completed under {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            } catch (ExecutionException e2) {
                if (e2.getCause() instanceof CacheMissException) {
                    logger().info(e2.getCause().getMessage(), e2.getCause());
                    throw new CacheMissException(e2.getCause().getMessage());
                }
                logger().error("Failed to fetch from cache.", e2.getCause());
                concurrentHashMap.clear();
                logger().debug("Fetch all from cache completed under {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            }
            return concurrentHashMap;
        } catch (Throwable th) {
            logger().debug("Fetch all from cache completed under {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            throw th;
        }
    }

    @Override // com.github.nagyesta.cacheonly.core.AbstractCacheServiceTemplate
    @NotNull
    protected Map<I, PS> callOriginWithPartitions(@NotNull List<Map<I, PR>> list) throws BatchServiceException {
        long currentTimeMillis = System.currentTimeMillis();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        try {
            try {
                Objects.requireNonNull(concurrentHashMap);
                callOriginParallel(list, concurrentHashMap::putAll);
                logger().debug("Fetch all from origin completed under {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                return concurrentHashMap;
            } catch (InterruptedException | TimeoutException e) {
                logger().warn("Origin call stopped after {} ms (timeout set to {}).", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(batchServiceCaller().timeoutMillis()), e});
                throw new BatchServiceException("Origin call timed out.", e);
            } catch (ExecutionException e2) {
                logger().error(e2.getCause().getMessage(), e2.getCause());
                throw new BatchServiceException(e2.getCause().getMessage(), e2.getCause());
            }
        } catch (Throwable th) {
            logger().debug("Fetch all from origin completed under {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            throw th;
        }
    }

    private void callCacheParallel(@NotNull CacheRefreshStrategy cacheRefreshStrategy, @NotNull Map<I, PR> map, @NotNull BiConsumer<I, PS> biConsumer) throws InterruptedException, ExecutionException, TimeoutException {
        this.cachePool.submit(() -> {
            map.entrySet().parallelStream().forEach(entry -> {
                fetchOneFromCache(cacheRefreshStrategy, entry.getValue()).ifPresent(obj -> {
                    biConsumer.accept(entry.getKey(), obj);
                });
            });
        }).get(partialCacheSupport().timeoutMillis(), TimeUnit.MILLISECONDS);
    }

    private void callOriginParallel(@NotNull List<Map<I, PR>> list, @NotNull Consumer<Map<I, PS>> consumer) throws InterruptedException, ExecutionException, TimeoutException {
        this.originPool.submit(() -> {
            list.parallelStream().map(map -> {
                return fetchSinglePartitionFromOrigin(map, batchServiceCaller().refreshStrategy());
            }).forEach(consumer);
        }).get(batchServiceCaller().timeoutMillis(), TimeUnit.MILLISECONDS);
    }
}
