/*
 * Decompiled with CFR 0.152.
 */
package net.fortytwo.stream.sparql.impl.caching;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.logging.Level;
import net.fortytwo.stream.BasicStreamProcessor;
import net.fortytwo.stream.BasicSubscription;
import net.fortytwo.stream.caching.Bindings;
import net.fortytwo.stream.caching.Query;
import net.fortytwo.stream.caching.QueryIndex;
import net.fortytwo.stream.model.VariableOrConstant;
import net.fortytwo.stream.sparql.SparqlQuery;
import net.fortytwo.stream.sparql.SparqlStreamProcessor;
import org.openrdf.model.Value;
import org.openrdf.query.BindingSet;
import org.openrdf.query.impl.MapBindingSet;

public class CachingSparqlStreamProcessor
extends SparqlStreamProcessor<Query<Value, ?>> {
    private final QueryIndex<Value, BasicSubscription<SparqlQuery, Query<Value, ?>, BindingSet>> queryIndex = new QueryIndex(3);
    private final BiConsumer<BasicSubscription<SparqlQuery, Query<Value, ?>, BindingSet>, Bindings<Value>> solutionHandler = (subscription, bindings) -> {
        try {
            this.handleCandidateSolution(subscription, this.toBindingSet((Bindings<Value>)bindings), Long.MAX_VALUE);
        }
        catch (IOException e) {
            logger.log(Level.WARNING, "failed to handle solution " + bindings, e);
        }
    };
    private CleanupPolicy cleanupPolicy = (secondsElapsedSinceLast, queriesAddedSinceLast, statementsAddedSinceLast) -> secondsElapsedSinceLast >= 30;
    private long timeOfLastCleanup = 0L;
    private int queriesAddedSinceLastCleanup = 0;
    private int statementsAddedSinceLastCleanup = 0;
    private final Object cleanupLock = "";
    private long cleanupNow;

    public CachingSparqlStreamProcessor() {
        this.clear();
        new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    while (true) {
                        Object object = CachingSparqlStreamProcessor.this.cleanupLock;
                        synchronized (object) {
                            try {
                                CachingSparqlStreamProcessor.this.cleanupLock.wait();
                            }
                            catch (InterruptedException e) {
                                logger.warning("interrupted while waiting on TTL cleanup lock");
                            }
                        }
                        if (!CachingSparqlStreamProcessor.this.isActive()) {
                            return;
                        }
                        CachingSparqlStreamProcessor.this.queryIndex.removeExpired(CachingSparqlStreamProcessor.this.cleanupNow);
                    }
                }
                catch (Throwable t) {
                    logger.log(Level.SEVERE, "TTL cleanup thread failed", t);
                    return;
                }
            }
        }).start();
    }

    public void setCleanupPolicy(CleanupPolicy cleanupPolicy) {
        this.cleanupPolicy = cleanupPolicy;
    }

    public void clear() {
        this.queryIndex.clear();
        this.clearCounters();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutDown() {
        Object object = this.cleanupLock;
        synchronized (object) {
            this.cleanupLock.notify();
        }
        super.shutDown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void checkCleanup(long now) {
        int seconds = (int)((now - this.timeOfLastCleanup) / 1000L);
        if (this.cleanupPolicy.doCleanup(seconds, this.queriesAddedSinceLastCleanup, this.statementsAddedSinceLastCleanup)) {
            this.timeOfLastCleanup = now;
            this.queriesAddedSinceLastCleanup = 0;
            this.statementsAddedSinceLastCleanup = 0;
            this.cleanupNow = now;
            Object object = this.cleanupLock;
            synchronized (object) {
                this.cleanupLock.notify();
            }
        }
    }

    public void unregister(BasicSubscription<SparqlQuery, Query<Value, ?>, BindingSet> subscription) {
        this.queryIndex.remove((Query)subscription.getQuery());
    }

    public boolean renew(BasicSubscription<SparqlQuery, Query<Value, ?>, BindingSet> subscription, int ttl) {
        if (this.isActive()) {
            this.queryIndex.renew((Query)subscription.getQuery(), ttl, this.getNow());
            return true;
        }
        return false;
    }

    @Override
    protected void visitQueryPatterns(Query<Value, ?> query, Consumer<VariableOrConstant<String, Value>[]> visitor) {
        for (Query.PatternInQuery p : query.getPatterns()) {
            visitor.accept(p.getPattern());
        }
    }

    @Override
    protected void register(BasicSubscription<SparqlQuery, Query<Value, ?>, BindingSet> subscription) {
        Query query = (Query)subscription.getQuery();
        this.setQuerySubscription(query, subscription);
        this.queryIndex.add(query);
        ++this.queriesAddedSinceLastCleanup;
        this.checkCleanup(this.getNow());
    }

    @Override
    protected boolean addTupleInternal(Value[] tuple, int ttl, long now) {
        boolean changed = this.queryIndex.add((Object[])tuple, this.solutionHandler, ttl, now);
        if (changed) {
            ++this.statementsAddedSinceLastCleanup;
            this.checkCleanup(this.getNow());
        }
        return changed;
    }

    @Override
    protected BasicSubscription<SparqlQuery, Query<Value, ?>, BindingSet> createSubscriptionInternal(SparqlQuery sparqlQuery, List<VariableOrConstant<String, Value>[]> patterns, long expirationTime, BiConsumer<BindingSet, Long> consumer) {
        Query query = new Query(patterns, expirationTime);
        return new BasicSubscription((Object)sparqlQuery, (Object)query, consumer, (BasicStreamProcessor)this);
    }

    private void setQuerySubscription(Query<Value, ?> query, BasicSubscription<SparqlQuery, Query<Value, ?>, BindingSet> subscription) {
        query.setSubscription(subscription);
    }

    private BindingSet toBindingSet(Bindings<Value> bindings) {
        MapBindingSet bs = new MapBindingSet();
        for (Map.Entry e : bindings.entrySet()) {
            bs.addBinding((String)e.getKey(), (Value)e.getValue());
        }
        return bs;
    }

    public static interface CleanupPolicy {
        public boolean doCleanup(int var1, int var2, int var3);
    }
}

