/*
 * Decompiled with CFR 0.152.
 */
package net.fortytwo.smsn.p2p.sparql;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.fortytwo.smsn.SemanticSynchrony;
import net.fortytwo.smsn.p2p.Connection;
import net.fortytwo.smsn.p2p.ConnectionHost;
import net.fortytwo.smsn.p2p.MessageHandler;
import net.fortytwo.smsn.p2p.sparql.SimpleJSONRDFFormat;
import net.fortytwo.stream.StreamProcessor;
import net.fortytwo.stream.sparql.SparqlStreamProcessor;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.openrdf.model.Statement;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.SimpleValueFactory;
import org.openrdf.query.BindingSet;

public class QueryEngineWrapper {
    private static final Logger logger = Logger.getLogger(QueryEngineWrapper.class.getName());
    private final SparqlStreamProcessor processor;
    private final MessageHandler datasetHandler;
    private final SimpleJSONRDFFormat jsonrdfFormat;
    private final Map<String, Connection> connectionsByQueryId;
    private final ConnectionHost.Notifier notifier;

    public QueryEngineWrapper(SparqlStreamProcessor processor) {
        this.processor = processor;
        this.jsonrdfFormat = new SimpleJSONRDFFormat((ValueFactory)SimpleValueFactory.getInstance());
        this.connectionsByQueryId = new HashMap<String, Connection>();
        this.datasetHandler = message -> {
            try {
                this.handleDatasetMessage(message);
            }
            catch (SimpleJSONRDFFormat.ParseError e) {
                logger.log(Level.WARNING, "invalid dataset message: " + message, e);
            }
        };
        this.notifier = this::newConnection;
    }

    public ConnectionHost.Notifier getNotifier() {
        return this.notifier;
    }

    private void newConnection(Connection c) {
        c.registerHandler("sparql-query", message -> {
            if (SemanticSynchrony.getConfiguration().isVerbose()) {
                logger.info("received query message from " + c.getSocket().getRemoteSocketAddress() + ": " + message);
            }
            try {
                this.handleQueryMessage(c, message);
            }
            catch (IOException | StreamProcessor.IncompatibleQueryException | StreamProcessor.InvalidQueryException e) {
                logger.log(Level.WARNING, "error raised by query engine", e);
            }
        });
        c.registerHandler("rdf-data", this.datasetHandler);
    }

    private void handleQueryMessage(Connection c, JSONObject message) throws StreamProcessor.InvalidQueryException, IOException, StreamProcessor.IncompatibleQueryException {
        int ttl;
        String query;
        String queryId;
        if (!c.isActive()) {
            logger.severe("can't handle query message; connection is not active");
            return;
        }
        try {
            queryId = message.getString("id");
            query = message.getString("query");
            ttl = message.getInt("ttl");
        }
        catch (JSONException e) {
            logger.log(Level.WARNING, "invalid query message: " + message, e);
            return;
        }
        if (this.connectionsByQueryId.keySet().contains(queryId)) {
            logger.warning("ignoring query with duplicate id '" + queryId + "'");
            return;
        }
        this.connectionsByQueryId.put(queryId, c);
        this.processor.addQuery(ttl, query, (solution, expirationTime) -> {
            try {
                JSONObject bindings = this.jsonrdfFormat.toJSON((BindingSet)solution, (Long)expirationTime);
                this.sendQueryResultMessage(c, queryId, bindings, (Long)expirationTime);
            }
            catch (JSONException e) {
                logger.log(Level.SEVERE, "error in creating query result JSON message", e);
            }
            catch (IOException e) {
                logger.log(Level.WARNING, "failed to send query result due to I/O error", e);
            }
        });
    }

    private void sendQueryResultMessage(Connection c, String queryId, JSONObject bindings, long expirationTime) throws JSONException, IOException {
        if (!c.isActive()) {
            logger.severe("can't send query result; connection is not active");
            return;
        }
        JSONObject j = new JSONObject();
        j.put("id", (Object)queryId);
        j.put("mapping", (Object)bindings);
        j.put("expirationTime", expirationTime);
        if (SemanticSynchrony.getConfiguration().isVerbose()) {
            logger.info("sending query result message to " + c.getSocket().getRemoteSocketAddress() + ": " + j);
        }
        c.sendNow("sparql-result", j);
    }

    private void handleDatasetMessage(JSONObject message) throws SimpleJSONRDFFormat.ParseError {
        if (SemanticSynchrony.getConfiguration().isVerbose()) {
            logger.info("received dataset message: " + message);
        }
        try {
            JSONArray dataset = message.getJSONArray("dataset");
            int ttl = message.getInt("ttl");
            int length = dataset.length();
            Statement[] a = new Statement[length];
            for (int i = 0; i < length; ++i) {
                a[i] = this.jsonrdfFormat.toStatement(dataset.getJSONArray(i));
            }
            this.processor.addInputs(ttl, a);
        }
        catch (JSONException e) {
            throw new SimpleJSONRDFFormat.ParseError(e);
        }
    }
}

