/*
 * Decompiled with CFR 0.152.
 */
package com.github.nagyesta.cacheonly.core.conurrent;

import com.github.nagyesta.cacheonly.core.AbstractCacheServiceTemplate;
import com.github.nagyesta.cacheonly.core.CacheRefreshStrategy;
import com.github.nagyesta.cacheonly.core.exception.CacheMissException;
import com.github.nagyesta.cacheonly.raw.concurrent.AsyncBatchServiceCaller;
import com.github.nagyesta.cacheonly.raw.exception.BatchServiceException;
import com.github.nagyesta.cacheonly.transform.BatchRequestTransformer;
import com.github.nagyesta.cacheonly.transform.BatchResponseTransformer;
import com.github.nagyesta.cacheonly.transform.concurrent.AsyncPartialCacheSupport;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.jetbrains.annotations.NotNull;

public class ConcurrentCacheServiceTemplate<BR, BS, PR, PS, C, I>
extends AbstractCacheServiceTemplate<AsyncBatchServiceCaller<BR, BS>, AsyncPartialCacheSupport<PR, PS, C, I>, BR, BS, PR, PS, C, I> {
    private final ForkJoinPool cachePool;
    private final ForkJoinPool originPool;

    public ConcurrentCacheServiceTemplate(@NotNull AsyncPartialCacheSupport<PR, PS, C, I> partialCacheSupport, @NotNull BatchRequestTransformer<BR, PR, I> batchRequestTransformer, @NotNull BatchResponseTransformer<BS, PS, I> batchResponseTransformer, @NotNull AsyncBatchServiceCaller<BR, BS> batchServiceCaller) {
        super(partialCacheSupport, batchRequestTransformer, batchResponseTransformer, batchServiceCaller);
        this.cachePool = partialCacheSupport.forkJoinPool();
        this.originPool = partialCacheSupport.forkJoinPool();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @NotNull
    protected Map<I, PS> fetchAllFromCache(@NotNull CacheRefreshStrategy strategy, @NotNull Map<I, PR> requestMap) throws CacheMissException {
        long start = System.currentTimeMillis();
        ConcurrentHashMap result = new ConcurrentHashMap();
        try {
            this.callCacheParallel(strategy, requestMap, result::put);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof CacheMissException) {
                this.logger().info(e.getCause().getMessage(), e.getCause());
                throw new CacheMissException(e.getCause().getMessage());
            }
            this.logger().error("Failed to fetch from cache.", e.getCause());
            result.clear();
        }
        catch (InterruptedException | TimeoutException e) {
            long end = System.currentTimeMillis();
            this.logger().warn("Cache call stopped after {} (timeout set to {}).", new Object[]{end - start, ((AsyncPartialCacheSupport)this.partialCacheSupport()).timeoutMillis(), e});
            result.clear();
        }
        finally {
            long end = System.currentTimeMillis();
            this.logger().debug("Fetch all from cache completed under {} ms.", (Object)(end - start));
        }
        return result;
    }

    @Override
    @NotNull
    protected Map<I, PS> callOriginWithPartitions(@NotNull List<Map<I, PR>> requestPartitions) throws BatchServiceException {
        long start = System.currentTimeMillis();
        ConcurrentHashMap response = new ConcurrentHashMap();
        try {
            this.callOriginParallel(requestPartitions, response::putAll);
        }
        catch (ExecutionException e) {
            this.logger().error(e.getCause().getMessage(), e.getCause());
            throw new BatchServiceException(e.getCause().getMessage(), e.getCause());
        }
        catch (InterruptedException | TimeoutException e) {
            long end = System.currentTimeMillis();
            this.logger().warn("Origin call stopped after {} ms (timeout set to {}).", new Object[]{end - start, ((AsyncBatchServiceCaller)this.batchServiceCaller()).timeoutMillis(), e});
            throw new BatchServiceException("Origin call timed out.", e);
        }
        finally {
            long end = System.currentTimeMillis();
            this.logger().debug("Fetch all from origin completed under {} ms.", (Object)(end - start));
        }
        return response;
    }

    private void callCacheParallel(@NotNull CacheRefreshStrategy strategy, @NotNull Map<I, PR> requestMap, @NotNull BiConsumer<I, PS> resultConsumer) throws InterruptedException, ExecutionException, TimeoutException {
        ((ForkJoinTask)this.cachePool.submit(() -> requestMap.entrySet().parallelStream().forEach(e -> this.fetchOneFromCache(strategy, e.getValue()).ifPresent(v -> resultConsumer.accept(e.getKey(), v))))).get(((AsyncPartialCacheSupport)this.partialCacheSupport()).timeoutMillis(), TimeUnit.MILLISECONDS);
    }

    private void callOriginParallel(@NotNull List<Map<I, PR>> requestPartitions, @NotNull Consumer<Map<I, PS>> responseProcessor) throws InterruptedException, ExecutionException, TimeoutException {
        ((ForkJoinTask)this.originPool.submit(() -> requestPartitions.parallelStream().map(partition -> this.fetchSinglePartitionFromOrigin(partition, ((AsyncBatchServiceCaller)this.batchServiceCaller()).refreshStrategy())).forEach(responseProcessor))).get(((AsyncBatchServiceCaller)this.batchServiceCaller()).timeoutMillis(), TimeUnit.MILLISECONDS);
    }
}

