package org.apache.rya.indexing.pcj.fluo.demo;

import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.commons.lang3.StringUtils;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.mini.MiniFluo;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.resolver.RyaToRdfConversions;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.fluo.demo.Demo;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.rdftriplestore.RyaSailRepository;
import org.openrdf.model.Resource;
import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.parser.sparql.SPARQLParser;
import org.openrdf.queryrender.sparql.SPARQLQueryRenderer;
import org.openrdf.repository.RepositoryConnection;
import org.openrdf.repository.RepositoryException;

/* loaded from: input_file:org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.class */
public class FluoAndHistoricPcjsDemo implements Demo {
    private static final Logger log = Logger.getLogger(FluoAndHistoricPcjsDemo.class);
    private static final RyaURI alice = new RyaURI("http://Alice");
    private static final RyaURI bob = new RyaURI("http://Bob");
    private static final RyaURI charlie = new RyaURI("http://Charlie");
    private static final RyaURI frank = new RyaURI("http://Frank");
    private static final RyaURI david = new RyaURI("http://David");
    private static final RyaURI eve = new RyaURI("http://Eve");
    private static final RyaURI george = new RyaURI("http://George");
    private static final RyaURI henry = new RyaURI("http://Henry");
    private static final RyaURI irene = new RyaURI("http://Irene");
    private static final RyaURI justin = new RyaURI("http://Justin");
    private static final RyaURI kristi = new RyaURI("http://Kristi");
    private static final RyaURI luke = new RyaURI("http://Luke");
    private static final RyaURI manny = new RyaURI("http://Manny");
    private static final RyaURI nate = new RyaURI("http://Nate");
    private static final RyaURI olivia = new RyaURI("http://Olivia");
    private static final RyaURI paul = new RyaURI("http://Paul");
    private static final RyaURI ross = new RyaURI("http://Ross");
    private static final RyaURI sally = new RyaURI("http://Sally");
    private static final RyaURI tim = new RyaURI("http://Tim");
    private static final RyaURI coffeeShop = new RyaURI("http://CoffeeShop");
    private static final RyaURI burgerShop = new RyaURI("http://BurgerShop");
    private static final RyaURI cupcakeShop = new RyaURI("http://cupcakeShop");
    private static final RyaURI talksTo = new RyaURI("http://talksTo");
    private static final RyaURI worksAt = new RyaURI("http://worksAt");
    private final Scanner keyboard = new Scanner(System.in);

    @Override // org.apache.rya.indexing.pcj.fluo.demo.Demo
    public void execute(MiniAccumuloCluster miniAccumuloCluster, Connector connector, String str, RyaSailRepository ryaSailRepository, RepositoryConnection repositoryConnection, MiniFluo miniFluo, FluoClient fluoClient) throws Demo.DemoExecutionException {
        log.setLevel(Level.INFO);
        HashSet newHashSet = Sets.newHashSet(new RyaStatement[]{new RyaStatement(eve, talksTo, charlie), new RyaStatement(david, talksTo, alice), new RyaStatement(alice, worksAt, coffeeShop), new RyaStatement(bob, worksAt, coffeeShop)});
        log.info("We add some Statements that are relevant to the query we will compute:");
        prettyLogStatements(newHashSet);
        waitForEnter();
        log.info("We also some more Satements that aren't realted to the query we will compute");
        HashSet newHashSet2 = Sets.newHashSet(new RyaStatement[]{new RyaStatement(henry, worksAt, burgerShop), new RyaStatement(irene, worksAt, burgerShop), new RyaStatement(justin, worksAt, burgerShop), new RyaStatement(kristi, worksAt, burgerShop), new RyaStatement(luke, worksAt, burgerShop), new RyaStatement(manny, worksAt, cupcakeShop), new RyaStatement(nate, worksAt, cupcakeShop), new RyaStatement(olivia, worksAt, cupcakeShop), new RyaStatement(paul, worksAt, cupcakeShop), new RyaStatement(ross, worksAt, cupcakeShop), new RyaStatement(henry, talksTo, irene), new RyaStatement(henry, talksTo, justin), new RyaStatement(kristi, talksTo, irene), new RyaStatement(luke, talksTo, irene), new RyaStatement(sally, talksTo, paul), new RyaStatement(sally, talksTo, ross), new RyaStatement(sally, talksTo, kristi), new RyaStatement(tim, talksTo, nate), new RyaStatement(tim, talksTo, paul), new RyaStatement(tim, talksTo, kristi)});
        log.info("Theese statements will also be inserted into the core Rya tables:");
        prettyLogStatements(newHashSet2);
        waitForEnter();
        log.info("Loading the historic statements into Rya...");
        loadDataIntoRya(repositoryConnection, newHashSet);
        loadDataIntoRya(repositoryConnection, newHashSet2);
        log.info("");
        log.info("The following SPARQL query will be loaded into the Fluo application for incremental updates:");
        prettyLogSparql("SELECT ?patron ?employee WHERE { ?patron <http://talksTo> ?employee. ?employee <http://worksAt> <http://CoffeeShop>. }");
        waitForEnter();
        log.info("Telling Fluo to maintain the query and import the historic Statement Pattern matches.");
        AccumuloPcjStorage accumuloPcjStorage = new AccumuloPcjStorage(connector, str);
        try {
            String createPcj = accumuloPcjStorage.createPcj("SELECT ?patron ?employee WHERE { ?patron <http://talksTo> ?employee. ?employee <http://worksAt> <http://CoffeeShop>. }");
            new CreateFluoPcj().withRyaIntegration(createPcj, accumuloPcjStorage, fluoClient, connector, str);
            log.info("Waiting for the fluo application to finish exporting the initial results...");
            miniFluo.waitForObservers();
            log.info("Historic result exporting finished.");
            log.info("");
            log.info("The following Binding Sets were exported to the PCJ with ID '" + createPcj + "' in Rya:");
            try {
                PrecomputedJoinStorage.CloseableIterator listResults = accumuloPcjStorage.listResults(createPcj);
                Throwable th = null;
                while (listResults.hasNext()) {
                    try {
                        try {
                            log.info("    " + listResults.next());
                        } finally {
                        }
                    } finally {
                    }
                }
                if (listResults != null) {
                    if (0 != 0) {
                        try {
                            listResults.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        listResults.close();
                    }
                }
                waitForEnter();
                RyaStatement ryaStatement = new RyaStatement(george, talksTo, frank);
                RyaStatement ryaStatement2 = new RyaStatement(frank, worksAt, coffeeShop);
                RyaStatement ryaStatement3 = new RyaStatement(eve, talksTo, bob);
                RyaStatement ryaStatement4 = new RyaStatement(charlie, worksAt, coffeeShop);
                HashSet newHashSet3 = Sets.newHashSet(new RyaStatement[]{ryaStatement, ryaStatement2, ryaStatement3, ryaStatement4});
                log.info("We stream these relevant Statements into Fluo and the core Rya tables:");
                log.info(prettyFormat(ryaStatement) + "          - Part of a new result");
                log.info(prettyFormat(ryaStatement2) + "      - Other part of a new result");
                log.info(prettyFormat(ryaStatement3) + "               - Joins with a historic <http://talksTo> statement");
                log.info(prettyFormat(ryaStatement4) + "    - Joins with a historic <http://worksA>t statement");
                waitForEnter();
                HashSet newHashSet4 = Sets.newHashSet(new RyaStatement[]{new RyaStatement(alice, talksTo, tim), new RyaStatement(bob, talksTo, tim), new RyaStatement(charlie, talksTo, tim), new RyaStatement(frank, talksTo, tim), new RyaStatement(david, talksTo, tim), new RyaStatement(eve, talksTo, sally), new RyaStatement(george, talksTo, sally), new RyaStatement(henry, talksTo, sally), new RyaStatement(irene, talksTo, sally), new RyaStatement(justin, talksTo, sally), new RyaStatement(kristi, talksTo, manny), new RyaStatement(luke, talksTo, manny), new RyaStatement(manny, talksTo, paul), new RyaStatement(nate, talksTo, manny), new RyaStatement(olivia, talksTo, manny), new RyaStatement(paul, talksTo, kristi), new RyaStatement(ross, talksTo, kristi), new RyaStatement(sally, talksTo, kristi), new RyaStatement(olivia, talksTo, kristi), new RyaStatement(olivia, talksTo, kristi)});
                log.info("We also stream these irrelevant Statements into Fluo and the core Rya tables:");
                prettyLogStatements(newHashSet4);
                waitForEnter();
                loadDataIntoRya(repositoryConnection, newHashSet3);
                loadDataIntoFluo(fluoClient, newHashSet3);
                log.info("Waiting for the fluo application to finish exporting the newly streamed results...");
                miniFluo.waitForObservers();
                log.info("Streamed result exporting finished.");
                log.info("");
                log.info("The following Binding Sets were exported to the PCJ with ID '" + createPcj + "' in Rya:");
                try {
                    listResults = accumuloPcjStorage.listResults(createPcj);
                    Throwable th3 = null;
                    while (listResults.hasNext()) {
                        try {
                            try {
                                log.info("    " + listResults.next());
                            } finally {
                                if (listResults != null) {
                                    if (th3 != null) {
                                        try {
                                            listResults.close();
                                        } catch (Throwable th4) {
                                            th3.addSuppressed(th4);
                                        }
                                    } else {
                                        listResults.close();
                                    }
                                }
                            }
                        } finally {
                        }
                    }
                    if (listResults != null) {
                        if (0 != 0) {
                            try {
                                listResults.close();
                            } catch (Throwable th5) {
                                th3.addSuppressed(th5);
                            }
                        } else {
                            listResults.close();
                        }
                    }
                    log.info("");
                } catch (Exception e) {
                    throw new Demo.DemoExecutionException("Could not fetch the PCJ's reuslts from Accumulo. Exiting.", e);
                }
            } catch (Exception e2) {
                throw new Demo.DemoExecutionException("Could not fetch the PCJ's reuslts from Accumulo. Exiting.", e2);
            }
        } catch (MalformedQueryException | PcjException | RyaDAOException | UnsupportedQueryException e3) {
            throw new Demo.DemoExecutionException("Error while using Fluo to compute and export historic matches, so the demo can not continue. Exiting.", e3);
        }
    }

    private void waitForEnter() {
        log.info("");
        log.info("Press [Enter] to continue the demo.");
        this.keyboard.nextLine();
    }

    private static void prettyLogSparql(String str) {
        try {
            for (String str2 : prettyFormatSparql(str)) {
                log.info(str2);
            }
        } catch (Exception e) {
            log.info(str);
        }
    }

    private static void loadDataIntoFluo(FluoClient fluoClient, Set<RyaStatement> set) {
        InsertTriples insertTriples = new InsertTriples();
        Iterator<RyaStatement> it = set.iterator();
        while (it.hasNext()) {
            insertTriples.insert(fluoClient, it.next(), Optional.absent());
        }
    }

    private static String prettyFormat(RyaStatement ryaStatement) {
        return "<" + ryaStatement.getSubject().getData() + "> <" + ryaStatement.getPredicate().getData() + "> <" + ryaStatement.getObject().getData() + ">";
    }

    private static void prettyLogStatements(Set<RyaStatement> set) {
        Iterator<RyaStatement> it = set.iterator();
        while (it.hasNext()) {
            log.info("    " + prettyFormat(it.next()));
        }
    }

    private static String[] prettyFormatSparql(String str) throws Exception {
        return StringUtils.split(new SPARQLQueryRenderer().render(new SPARQLParser().parseQuery(str, (String) null)), '\n');
    }

    private static void loadDataIntoRya(RepositoryConnection repositoryConnection, Set<RyaStatement> set) throws Demo.DemoExecutionException {
        Iterator<RyaStatement> it = set.iterator();
        while (it.hasNext()) {
            try {
                repositoryConnection.add(RyaToRdfConversions.convertStatement(it.next()), new Resource[0]);
            } catch (RepositoryException e) {
                throw new Demo.DemoExecutionException("Could not load one of the historic statements into Rya, so the demo can not continue. Exiting.", e);
            }
        }
    }
}
