package org.apache.rya.streams.client.command;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.AbstractScheduledService;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rya.streams.api.entity.QueryResultStream;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
import org.apache.rya.streams.client.RyaStreamsCommand;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.algebra.Reduced;
import org.eclipse.rdf4j.query.parser.sparql.SPARQLParser;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/streams/client/command/StreamResultsCommand.class */
public class StreamResultsCommand implements RyaStreamsCommand {

    /* loaded from: input_file:org/apache/rya/streams/client/command/StreamResultsCommand$StreamResultsParameters.class */
    private static final class StreamResultsParameters extends RyaStreamsCommand.KafkaParameters {

        @Parameter(names = {"--queryId", "-q"}, required = true, description = "The query whose results will be streamed to the console.")
        private String queryId;

        private StreamResultsParameters() {
        }

        @Override // org.apache.rya.streams.client.RyaStreamsCommand.KafkaParameters
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append(super.toString());
            if (!Strings.isNullOrEmpty(this.queryId)) {
                sb.append("\tQuery ID: " + this.queryId);
                sb.append("\n");
            }
            return sb.toString();
        }
    }

    @Override // org.apache.rya.streams.client.RyaStreamsCommand
    public String getCommand() {
        return "stream-results";
    }

    @Override // org.apache.rya.streams.client.RyaStreamsCommand
    public String getDescription() {
        return "Stream the results of a query to the console.";
    }

    @Override // org.apache.rya.streams.client.RyaStreamsCommand
    public String getUsage() {
        JCommander jCommander = new JCommander(new StreamResultsParameters());
        StringBuilder sb = new StringBuilder();
        jCommander.usage(sb);
        return sb.toString();
    }

    @Override // org.apache.rya.streams.client.RyaStreamsCommand
    public boolean validArguments(String[] strArr) {
        boolean z = true;
        try {
            new JCommander(new StreamResultsParameters(), strArr);
        } catch (ParameterException e) {
            z = false;
        }
        return z;
    }

    @Override // org.apache.rya.streams.client.RyaStreamsCommand
    public void execute(String[] strArr) throws RyaStreamsCommand.ArgumentsException, RyaStreamsCommand.ExecutionException {
        Objects.requireNonNull(strArr);
        StreamResultsParameters streamResultsParameters = new StreamResultsParameters();
        try {
            new JCommander(streamResultsParameters, strArr);
            KafkaQueryChangeLog make = KafkaQueryChangeLogFactory.make(streamResultsParameters.kafkaIP + ":" + streamResultsParameters.kafkaPort, KafkaTopics.queryChangeLogTopic(streamResultsParameters.ryaInstance));
            try {
                UUID fromString = UUID.fromString(streamResultsParameters.queryId);
                try {
                    Optional optional = new InMemoryQueryRepository(make, AbstractScheduledService.Scheduler.newFixedRateSchedule(0L, 5L, TimeUnit.SECONDS)).get(fromString);
                    if (!optional.isPresent()) {
                        throw new RyaStreamsCommand.ExecutionException("Could not read the results for query with ID " + fromString + " because no such query exists.");
                    }
                    String sparql = ((StreamsQuery) optional.get()).getSparql();
                    final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                    Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.rya.streams.client.command.StreamResultsCommand.1
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            atomicBoolean.set(true);
                        }
                    });
                    try {
                        try {
                            QueryResultStream fromStart = (new SPARQLParser().parseQuery(sparql, (String) null).getTupleExpr() instanceof Reduced ? new KafkaGetQueryResultStream(streamResultsParameters.kafkaIP, streamResultsParameters.kafkaPort, VisibilityStatementDeserializer.class) : new KafkaGetQueryResultStream(streamResultsParameters.kafkaIP, streamResultsParameters.kafkaPort, VisibilityBindingSetDeserializer.class)).fromStart(streamResultsParameters.ryaInstance, fromString);
                            Throwable th = null;
                            while (!atomicBoolean.get()) {
                                try {
                                    try {
                                        Iterator it = fromStart.poll(1000L).iterator();
                                        while (it.hasNext()) {
                                            System.out.println(it.next());
                                        }
                                    } catch (Throwable th2) {
                                        th = th2;
                                        throw th2;
                                    }
                                } finally {
                                }
                            }
                            if (fromStart != null) {
                                if (0 != 0) {
                                    try {
                                        fromStart.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    fromStart.close();
                                }
                            }
                        } catch (Exception e) {
                            System.err.println("Error while reading the results from the stream.");
                            e.printStackTrace();
                            System.exit(1);
                        }
                    } catch (MalformedQueryException e2) {
                        throw new RyaStreamsCommand.ExecutionException("Could not parse the SPARQL for the query: " + sparql, e2);
                    }
                } catch (Exception e3) {
                    throw new RyaStreamsCommand.ExecutionException("Problem encountered while closing the QueryRepository.", e3);
                }
            } catch (IllegalArgumentException e4) {
                throw new RyaStreamsCommand.ArgumentsException("Invalid Query ID " + streamResultsParameters.queryId);
            }
        } catch (ParameterException e5) {
            throw new RyaStreamsCommand.ArgumentsException("Could not stream the query's results because of invalid command line parameters.", e5);
        }
    }
}
