/*
 * Decompiled with CFR 0.152.
 */
package net.fortytwo.smsn.p2p.sparql;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.logging.Logger;
import net.fortytwo.smsn.p2p.Connection;
import net.fortytwo.smsn.p2p.sparql.SimpleJSONRDFFormat;
import net.fortytwo.stream.BasicStreamProcessor;
import net.fortytwo.stream.BasicSubscription;
import net.fortytwo.stream.StreamProcessor;
import net.fortytwo.stream.sparql.RDFStreamProcessor;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.openrdf.model.Statement;
import org.openrdf.model.Value;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.SimpleValueFactory;
import org.openrdf.query.BindingSet;

public class ProxySparqlStreamProcessor
extends RDFStreamProcessor<String, Query> {
    private static final Logger logger = Logger.getLogger(ProxySparqlStreamProcessor.class.getName());
    public static final String TAG_RDF_DATA = "rdf-data";
    public static final String TAG_SPARQL_QUERY = "sparql-query";
    public static final String TAG_SPARQL_RESULT = "sparql-result";
    public static final String MAPPING = "mapping";
    public static final String DATASET = "dataset";
    public static final String EXPIRATION_TIME = "expirationTime";
    public static final String QUERY = "query";
    public static final String QUERY_ID = "id";
    public static final String SOLUTION = "solution";
    public static final String TTL = "ttl";
    private final Connection connection;
    private final SimpleJSONRDFFormat jsonrdfFormat;
    private final Map<String, Query> queriesById;
    private final Map<String, BiConsumer<BindingSet, Long>> handlers;

    public ProxySparqlStreamProcessor(Connection connection) {
        this.connection = connection;
        this.jsonrdfFormat = new SimpleJSONRDFFormat((ValueFactory)SimpleValueFactory.getInstance());
        this.queriesById = new HashMap<String, Query>();
        this.handlers = new HashMap<String, BiConsumer<BindingSet, Long>>();
        connection.registerHandler(TAG_SPARQL_RESULT, result -> {
            try {
                this.handleSparqlResultJSON(result);
            }
            catch (SimpleJSONRDFFormat.ParseError e) {
                logger.warning("invalid SPARQL query result: " + result);
                e.printStackTrace(System.err);
            }
        });
    }

    public void clear() {
        throw new UnsupportedOperationException("don't have rights to clear remote query engine");
    }

    public void notifyConnectionOpen() throws IOException {
        for (Map.Entry<String, Query> e : this.queriesById.entrySet()) {
            this.sendSubscriptionMessage(e.getValue().queryStr, e.getKey(), e.getValue().ttl);
        }
    }

    protected BasicSubscription<String, Query, BindingSet> createSubscription(int ttl, String sparqlQuery, BiConsumer<BindingSet, Long> consumer) {
        Query query = new Query();
        query.queryStr = sparqlQuery;
        query.ttl = ttl;
        BasicSubscription sub = new BasicSubscription((Object)sparqlQuery, (Object)query, consumer, (BasicStreamProcessor)this);
        this.queriesById.put(sub.getId(), query);
        if (this.connection.isActive()) {
            try {
                this.sendSubscriptionMessage(sparqlQuery, sub.getId(), ttl);
            }
            catch (IOException e) {
                throw new IllegalStateException("failed to create subscription", e);
            }
        }
        this.handlers.put(sub.getId(), consumer);
        return sub;
    }

    protected boolean addTuple(Value[] tuple, int ttl, long now) {
        return false;
    }

    public void unregister(BasicSubscription<String, Query, BindingSet> subscription) {
        throw new UnsupportedOperationException("not yet possible to cancel subscriptions through the proxy");
    }

    protected String parseQuery(String queryStr) throws StreamProcessor.InvalidQueryException, StreamProcessor.IncompatibleQueryException {
        return queryStr;
    }

    public boolean renew(BasicSubscription<String, Query, BindingSet> subscription, int i) {
        throw new UnsupportedOperationException("not yet possible to renew subscriptions through the proxy");
    }

    public void addStatements(int ttl, Statement ... statements) throws IOException {
        try {
            JSONArray a = this.jsonrdfFormat.statementsToJSON(statements);
            this.sendDatasetMessage(a, ttl);
        }
        catch (JSONException e) {
            throw new IOException(e);
        }
    }

    public void setClock(Supplier<Long> clock) {
        throw new UnsupportedOperationException("sorry, the clock cannot be set by proxy");
    }

    private void handleSparqlResultJSON(JSONObject result) throws SimpleJSONRDFFormat.ParseError {
        try {
            String queryId = result.getString(QUERY_ID);
            BiConsumer<BindingSet, Long> handler = this.handlers.get(queryId);
            if (null != handler) {
                JSONObject mapping = result.getJSONObject(MAPPING);
                Long expirationTime = result.getLong(EXPIRATION_TIME);
                BindingSet bindingSet = this.jsonrdfFormat.toBindingSet(mapping);
                handler.accept(bindingSet, expirationTime);
            }
        }
        catch (JSONException e) {
            throw new SimpleJSONRDFFormat.ParseError(e);
        }
    }

    private void sendSubscriptionMessage(String query, String queryId, long ttl) throws IOException {
        try {
            JSONObject j = new JSONObject();
            j.put(QUERY_ID, (Object)queryId);
            j.put(QUERY, (Object)query);
            j.put(TTL, ttl);
            this.connection.sendBuffered(TAG_SPARQL_QUERY, j);
        }
        catch (JSONException e) {
            throw new IOException(e);
        }
    }

    private void sendDatasetMessage(JSONArray statements, long ttl) throws JSONException, IOException {
        JSONObject j = new JSONObject();
        j.put(DATASET, (Object)statements);
        j.put(TTL, ttl);
        this.connection.sendNow(TAG_RDF_DATA, j);
    }

    public static class Query {
        public String queryStr;
        public long ttl;
    }
}

