/*
 * Decompiled with CFR 0.152.
 */
package net.di2e.ecdr.federation;

import ddf.catalog.data.Result;
import ddf.catalog.federation.FederationStrategy;
import ddf.catalog.operation.Query;
import ddf.catalog.operation.QueryRequest;
import ddf.catalog.operation.QueryResponse;
import ddf.catalog.operation.SourceResponse;
import ddf.catalog.operation.impl.ProcessingDetailsImpl;
import ddf.catalog.operation.impl.QueryImpl;
import ddf.catalog.operation.impl.QueryRequestImpl;
import ddf.catalog.operation.impl.QueryResponseImpl;
import ddf.catalog.plugin.PluginExecutionException;
import ddf.catalog.plugin.PostFederatedQueryPlugin;
import ddf.catalog.plugin.PreFederatedQueryPlugin;
import ddf.catalog.plugin.StopProcessingException;
import ddf.catalog.source.Source;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.opengis.filter.Filter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FifoFederationStrategy
implements FederationStrategy {
    private static final Logger LOGGER = LoggerFactory.getLogger(FifoFederationStrategy.class);
    private static final int DEFAULT_MAX_START_INDEX = 50000;
    private int maxStartIndex;
    private ExecutorService queryExecutorService = null;
    private List<PreFederatedQueryPlugin> preQuery;
    private List<PostFederatedQueryPlugin> postQuery;

    public FifoFederationStrategy(ExecutorService queryExecutorService, List<PreFederatedQueryPlugin> preQuery, List<PostFederatedQueryPlugin> postQuery) {
        this.queryExecutorService = queryExecutorService;
        this.preQuery = preQuery;
        this.postQuery = postQuery;
        this.maxStartIndex = 50000;
    }

    public QueryResponse federate(List<Source> sources, QueryRequest queryRequest) {
        Query originalQuery = queryRequest.getQuery();
        int offset = originalQuery.getStartIndex();
        if (offset > this.maxStartIndex) {
            offset = this.maxStartIndex;
        }
        int pageSize = originalQuery.getPageSize();
        QueryResponseImpl queryResponse = new QueryResponseImpl(queryRequest, null);
        HashMap<Source, Future<SourceResponse>> futures = new HashMap<Source, Future<SourceResponse>>();
        Query modifiedQuery = this.getModifiedQuery(originalQuery, sources.size(), offset, pageSize);
        QueryRequestImpl modifiedQueryRequest = new QueryRequestImpl(modifiedQuery, queryRequest.isEnterprise(), (Collection)queryRequest.getSourceIds(), queryRequest.getProperties());
        this.executeSourceQueries(sources, futures, (QueryRequest)modifiedQueryRequest);
        int resultsToSkip = 0;
        if (offset > 1 && sources.size() > 1) {
            resultsToSkip = offset - 1;
        }
        this.queryExecutorService.submit(new FifoQueryMonitor(this.queryExecutorService, futures, queryResponse, modifiedQueryRequest.getQuery(), resultsToSkip));
        return this.executePostFederationPlugins((QueryResponse)queryResponse);
    }

    protected QueryResponse executePostFederationPlugins(QueryResponse queryResponse) {
        try {
            for (PostFederatedQueryPlugin service : this.postQuery) {
                try {
                    queryResponse = service.process(queryResponse);
                }
                catch (PluginExecutionException e) {
                    LOGGER.warn("Error executing PostFederatedQueryPlugin: " + e.getMessage(), (Throwable)e);
                }
            }
        }
        catch (StopProcessingException e) {
            LOGGER.warn("Plugin stopped processing: ", (Throwable)e);
        }
        return queryResponse;
    }

    protected void executeSourceQueries(List<Source> sources, Map<Source, Future<SourceResponse>> futures, QueryRequest modifiedQueryRequest) {
        for (Source source : sources) {
            if (source == null) continue;
            if (!futures.containsKey(source)) {
                try {
                    for (PreFederatedQueryPlugin service : this.preQuery) {
                        try {
                            modifiedQueryRequest = service.process(source, modifiedQueryRequest);
                        }
                        catch (PluginExecutionException e) {
                            LOGGER.warn("Error executing PreFederatedQueryPlugin: " + e.getMessage(), (Throwable)e);
                        }
                    }
                }
                catch (StopProcessingException e) {
                    LOGGER.warn("Plugin stopped processing: ", (Throwable)e);
                }
                futures.put(source, this.queryExecutorService.submit(new CallableSourceResponse(source, modifiedQueryRequest.getQuery(), modifiedQueryRequest.getProperties())));
                continue;
            }
            LOGGER.warn("Duplicate source found with name " + source.getId() + ". Ignoring second one.");
        }
    }

    protected Query getModifiedQuery(Query originalQuery, int numberOfSources, int offset, int pageSize) {
        Query query = null;
        if (offset > 1 && numberOfSources > 1) {
            boolean modifiedOffset = true;
            int modifiedPageSize = offset + pageSize - 1;
            query = new QueryImpl((Filter)originalQuery, 1, modifiedPageSize, originalQuery.getSortBy(), originalQuery.requestsTotalResultsCount(), originalQuery.getTimeoutMillis());
        } else {
            query = originalQuery;
        }
        return query;
    }

    private static final class FifoQueryMonitor
    implements Runnable {
        private QueryResponseImpl returnResults;
        private Map<Source, Future<SourceResponse>> futures;
        private Query query;
        private ExecutorService pool;
        private AtomicInteger sites = new AtomicInteger();
        private AtomicInteger resultsToSkip = null;

        public FifoQueryMonitor(ExecutorService pool, Map<Source, Future<SourceResponse>> futuress, QueryResponseImpl returnResults, Query query, int resultsToSkip) {
            this.pool = pool;
            this.returnResults = returnResults;
            this.query = query;
            this.futures = futuress;
            this.resultsToSkip = new AtomicInteger(resultsToSkip);
        }

        private int updateSites(int addition) {
            return this.sites.addAndGet(addition);
        }

        @Override
        public void run() {
            int pageSize = this.query.getPageSize() > 0 ? this.query.getPageSize() : Integer.MAX_VALUE;
            for (Map.Entry<Source, Future<SourceResponse>> entry : this.futures.entrySet()) {
                Source site = entry.getKey();
                ArrayList siteListObject = (ArrayList)this.returnResults.getProperties().get("site-list");
                if (siteListObject != null && siteListObject instanceof List) {
                    ((List)siteListObject).add(site.getId());
                } else {
                    siteListObject = new ArrayList();
                    ((List)siteListObject).add(site.getId());
                    this.returnResults.getProperties().put("site-list", siteListObject);
                }
                this.updateSites(1);
                this.pool.submit(new SourceQueryThread(site, entry.getValue(), this.returnResults, pageSize));
            }
        }

        private long getTimeRemaining(long deadline) {
            long timeleft = System.currentTimeMillis() > deadline ? 0L : deadline - System.currentTimeMillis();
            return timeleft;
        }

        private class SourceQueryThread
        implements Runnable {
            private long maxResults = 0L;
            Future<SourceResponse> curFuture = null;
            QueryResponseImpl returnResults = null;
            private Source site = null;

            public SourceQueryThread(Source site, Future<SourceResponse> curFuture, QueryResponseImpl returnResults, long maxResults) {
                this.curFuture = curFuture;
                this.returnResults = returnResults;
                this.site = site;
                this.maxResults = maxResults;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                SourceResponse sourceResponse = null;
                Set processingDetails = this.returnResults.getProcessingDetails();
                try {
                    sourceResponse = FifoQueryMonitor.this.query.getTimeoutMillis() < 1L ? this.curFuture.get() : this.curFuture.get(FifoQueryMonitor.this.getTimeRemaining(System.currentTimeMillis() + FifoQueryMonitor.this.query.getTimeoutMillis()), TimeUnit.MILLISECONDS);
                    sourceResponse = this.curFuture.get();
                }
                catch (InterruptedException | ExecutionException | TimeoutException e) {
                    LOGGER.warn("Federated query returned exception " + e.getMessage());
                    processingDetails.add(new ProcessingDetailsImpl(this.site.getId(), e));
                }
                long sourceHits = 0L;
                if (sourceResponse != null) {
                    sourceHits = sourceResponse.getHits();
                    List results = sourceResponse.getResults();
                    int resultsReturned = results.size();
                    HashMap<String, Number> newSourceProperties = new HashMap<String, Number>();
                    newSourceProperties.put("total-hits", sourceHits);
                    newSourceProperties.put("total-results-returned", resultsReturned);
                    QueryResponseImpl queryResponseImpl = this.returnResults;
                    synchronized (queryResponseImpl) {
                        long sentTotal = this.returnResults.getHits();
                        this.returnResults.setHits(sourceHits + sentTotal);
                        for (Result result : results) {
                            if (sentTotal >= this.maxResults) {
                                LOGGER.debug("Received max number of results so ending polling");
                                break;
                            }
                            if (FifoQueryMonitor.this.resultsToSkip.get() == 0) {
                                this.returnResults.addResult(result, false);
                                ++sentTotal;
                                continue;
                            }
                            FifoQueryMonitor.this.resultsToSkip.decrementAndGet();
                            ++sentTotal;
                        }
                        if (sentTotal >= this.maxResults) {
                            this.returnResults.closeResultQueue();
                            LOGGER.debug("sending terminator for fifo federation strategy.");
                        }
                    }
                    this.returnResults.getProperties().put(this.site.getId(), (Serializable)newSourceProperties);
                    Map originalSourceProperties = sourceResponse.getProperties();
                    if (originalSourceProperties != null) {
                        Serializable object = (Serializable)originalSourceProperties.get("elapsed-time");
                        if (object != null && object instanceof Long) {
                            newSourceProperties.put("elapsed-time", (Long)object);
                            originalSourceProperties.remove("elapsed-time");
                            LOGGER.debug("Setting the elapsedTime responseProperty to {} for source {}", (Object)object, (Object)this.site.getId());
                        }
                        this.returnResults.getProperties().putAll(originalSourceProperties);
                    }
                }
                if (FifoQueryMonitor.this.updateSites(-1) == 0) {
                    LOGGER.debug("sending terminator for fifo federation strategy.");
                    this.returnResults.closeResultQueue();
                }
            }
        }
    }

    private static final class CallableSourceResponse
    implements Callable<SourceResponse> {
        private Query query = null;
        private Source source = null;
        private Map<String, Serializable> properties = null;

        public CallableSourceResponse(Source source, Query query, Map<String, Serializable> properties) {
            this.source = source;
            this.query = query;
            this.properties = properties;
        }

        @Override
        public SourceResponse call() throws Exception {
            long startTime = System.currentTimeMillis();
            SourceResponse sourceResponse = this.source.query((QueryRequest)new QueryRequestImpl(this.query, this.properties));
            long elapsedTime = System.currentTimeMillis() - startTime;
            LOGGER.debug("The source {} responded to the query in {} milliseconds", (Object)this.source.getId(), (Object)elapsedTime);
            sourceResponse.getProperties().put("elapsed-time", elapsedTime);
            return sourceResponse;
        }
    }
}

