/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rya.shell;

import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.rya.api.client.RyaClient;
import org.apache.rya.api.client.RyaClientException;
import org.apache.rya.api.instance.RyaDetails;
import org.apache.rya.shell.SharedShellState;
import org.apache.rya.shell.util.SparqlPrompt;
import org.apache.rya.shell.util.StreamsQueryFormatter;
import org.apache.rya.streams.api.RyaStreamsClient;
import org.apache.rya.streams.api.entity.StreamsQuery;
import org.apache.rya.streams.api.exception.RyaStreamsException;
import org.apache.rya.streams.kafka.KafkaRyaStreamsClientFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;

@Component
public class RyaStreamsCommands
implements CommandMarker {
    public static final String STREAMS_CONFIGURE_CMD = "streams-configure";
    public static final String STREAMS_DETAILS_CMD = "streams-details";
    public static final String STREAM_QUERIES_ADD_CMD = "streams-queries-add";
    public static final String STREAM_QUERIES_DELETE_CMD = "streams-queries-delete";
    public static final String STREAM_QUERIES_START_CMD = "streams-queries-start";
    public static final String STREAM_QUERIES_STOP_CMD = "streams-queries-stop";
    public static final String STREAM_QUERIES_LIST_CMD = "streams-queries-list";
    public static final String STREAM_QUERIES_DETAILS_CMD = "streams-queries-details";
    private final SharedShellState state;
    private final SparqlPrompt sparqlPrompt;

    @Autowired
    public RyaStreamsCommands(SharedShellState state, SparqlPrompt sparqlPrompt) {
        this.state = Objects.requireNonNull(state);
        this.sparqlPrompt = Objects.requireNonNull(sparqlPrompt);
    }

    @CliAvailabilityIndicator(value={"streams-configure", "streams-details"})
    public boolean areConfigCommandsAvailable() {
        return this.state.getShellState().getConnectionState() == SharedShellState.ConnectionState.CONNECTED_TO_INSTANCE;
    }

    @CliAvailabilityIndicator(value={"streams-queries-add", "streams-queries-delete", "streams-queries-start", "streams-queries-stop", "streams-queries-list", "streams-queries-details"})
    public boolean areQueriesCommandsAvailable() {
        return this.state.getShellState().getRyaStreamsCommands().isPresent();
    }

    @CliCommand(value={"streams-configure"}, help="Connect a Rya Streams subsystem to a Rya Instance.")
    public String configureRyaStreams(@CliOption(key={"kafkaHostname"}, mandatory=true, help="The hostname of the Kafka Broker.") String kafkaHostname, @CliOption(key={"kafkaPort"}, mandatory=true, help="The port of the Kafka Broker.") int kafkaPort) {
        com.google.common.base.Optional<RyaStreamsClient> oldClient = this.state.getShellState().getRyaStreamsCommands();
        if (oldClient.isPresent()) {
            try {
                ((RyaStreamsClient)oldClient.get()).close();
            }
            catch (Exception e) {
                System.err.print("Warning: Could not close the old Rya Streams Client.");
                e.printStackTrace();
            }
        }
        String ryaInstance = (String)this.state.getShellState().getRyaInstanceName().get();
        RyaClient ryaClient = (RyaClient)this.state.getShellState().getConnectedCommands().get();
        try {
            RyaDetails.RyaStreamsDetails streamsDetails = new RyaDetails.RyaStreamsDetails(kafkaHostname, kafkaPort);
            ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, streamsDetails);
        }
        catch (RyaClientException e) {
            throw new RuntimeException("Could not update the Rya instance's Rya Details to include the new information. This command failed to complete.", e);
        }
        RyaStreamsClient newClient = KafkaRyaStreamsClientFactory.make((String)ryaInstance, (String)kafkaHostname, (int)kafkaPort);
        this.state.connectedToRyaStreams(newClient);
        if (oldClient.isPresent()) {
            return "The Rya Streams subsystem that this Rya instance uses has been changed. Any queries that were maintained by the previous subsystem will need to be migrated to the new one.";
        }
        return "The Rya Instance has been updated to use the provided Rya Streams subsystem. Rya Streams commands are now avaiable while connected to this instance.";
    }

    @CliCommand(value={"streams-details"}, help="Print information about which Rya Streams subsystem the Rya instance is connected to.")
    public String printRyaStreamsDetails() {
        String ryaInstance = (String)this.state.getShellState().getRyaInstanceName().get();
        RyaClient client = (RyaClient)this.state.getShellState().getConnectedCommands().get();
        try {
            com.google.common.base.Optional details = client.getGetInstanceDetails().getDetails(ryaInstance);
            if (!details.isPresent()) {
                return "This instance does not have any Rya Details, so it is unable to be connected to the Rya Streams subsystem.";
            }
            com.google.common.base.Optional streamsDetails = ((RyaDetails)details.get()).getRyaStreamsDetails();
            if (!streamsDetails.isPresent()) {
                return "This instance of Rya has not been configured to use a Rya Streams subsystem.";
            }
            return "Kafka Hostname: " + ((RyaDetails.RyaStreamsDetails)streamsDetails.get()).getHostname() + ", Kafka Port: " + ((RyaDetails.RyaStreamsDetails)streamsDetails.get()).getPort();
        }
        catch (RyaClientException e) {
            throw new RuntimeException("Could not fetch the Rya Details for this Rya instance.", e);
        }
    }

    @CliCommand(value={"streams-queries-add"}, help="Add a SPARQL query to the Rya Streams subsystem.")
    public String addQuery(@CliOption(key={"inactive"}, mandatory=false, unspecifiedDefaultValue="false", specifiedDefaultValue="true", help="Setting this flag will add the query, but not run it. (default: false)") boolean inactive) {
        RyaStreamsClient streamsClient = (RyaStreamsClient)this.state.getShellState().getRyaStreamsCommands().get();
        try {
            com.google.common.base.Optional<String> sparql = this.sparqlPrompt.getSparql();
            if (!sparql.isPresent()) {
                return "";
            }
            StreamsQuery streamsQuery = streamsClient.getAddQuery().addQuery((String)sparql.get(), !inactive);
            return "The added query's ID is " + streamsQuery.getQueryId();
        }
        catch (IOException | RyaStreamsException e) {
            throw new RuntimeException("Unable to add the SPARQL query to the Rya Streams subsystem.", e);
        }
    }

    @CliCommand(value={"streams-queries-delete"}, help="Delete a SPARQL query from the Rya Streams subsystem.")
    public String deleteQuery(@CliOption(key={"queryId"}, mandatory=true, help="The ID of the query to remove.") String queryId) {
        RyaStreamsClient streamsClient = (RyaStreamsClient)this.state.getShellState().getRyaStreamsCommands().get();
        UUID id = UUID.fromString(queryId);
        try {
            streamsClient.getDeleteQuery().delete(id);
        }
        catch (RyaStreamsException e) {
            throw new RuntimeException("Could not delete the query from the Rya Streams subsystem.", e);
        }
        return "The query has been deleted.";
    }

    @CliCommand(value={"streams-queries-start"}, help="Start processing a SPARQL query using the Rya Streams subsystem.")
    public String startQuery(@CliOption(key={"queryId"}, mandatory=true, help="The ID of the query to start processing.") String queryId) {
        RyaStreamsClient streamsClient = (RyaStreamsClient)this.state.getShellState().getRyaStreamsCommands().get();
        try {
            UUID id = UUID.fromString(queryId);
            Optional streamsQuery = streamsClient.getGetQuery().getQuery(id);
            if (!streamsQuery.isPresent()) {
                throw new RuntimeException("No Rya Streams query exists for ID " + queryId);
            }
            if (((StreamsQuery)streamsQuery.get()).isActive()) {
                return "That query is already running.";
            }
            streamsClient.getStartQuery().start(id);
            return "The query will be processed by the Rya Streams subsystem.";
        }
        catch (RyaStreamsException e) {
            throw new RuntimeException("Unable to start the Query.", e);
        }
    }

    @CliCommand(value={"streams-queries-stop"}, help="Stop processing a SPARQL query using the Rya Streams subsystem.")
    public String stopQuery(@CliOption(key={"queryId"}, mandatory=true, help="The ID of the query to stop processing.") String queryId) {
        RyaStreamsClient streamsClient = (RyaStreamsClient)this.state.getShellState().getRyaStreamsCommands().get();
        try {
            UUID id = UUID.fromString(queryId);
            Optional streamsQuery = streamsClient.getGetQuery().getQuery(id);
            if (!streamsQuery.isPresent()) {
                throw new RuntimeException("No Rya Streams query exists for ID " + queryId);
            }
            if (!((StreamsQuery)streamsQuery.get()).isActive()) {
                return "That query is already stopped.";
            }
            streamsClient.getStopQuery().stop(id);
            return "The query will no longer be processed by the Rya Streams subsystem.";
        }
        catch (RyaStreamsException e) {
            throw new RuntimeException("Unable to start the Query.", e);
        }
    }

    @CliCommand(value={"streams-queries-list"}, help="List the queries that are being managed by the configured Rya Streams subsystem.")
    public String listQueries() {
        RyaStreamsClient streamsClient = (RyaStreamsClient)this.state.getShellState().getRyaStreamsCommands().get();
        try {
            Set queries = streamsClient.getListQueries().all();
            return StreamsQueryFormatter.format(queries);
        }
        catch (RyaStreamsException e) {
            throw new RuntimeException("Unable to fetch the queries from the Rya Streams subsystem.", e);
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to print the query to the console.", e);
        }
    }

    @CliCommand(value={"streams-queries-details"}, help="Print detailed information about a specific query managed by the Rya Streams subsystem.")
    public String printQueryDetails(@CliOption(key={"queryId"}, mandatory=true, help="The ID of the query whose details will be printed.") String queryId) {
        RyaStreamsClient streamsClient = (RyaStreamsClient)this.state.getShellState().getRyaStreamsCommands().get();
        UUID id = UUID.fromString(queryId);
        try {
            Optional query = streamsClient.getGetQuery().getQuery(id);
            if (!query.isPresent()) {
                return "There is no query with the specified ID.";
            }
            return StreamsQueryFormatter.format((StreamsQuery)query.get());
        }
        catch (RyaStreamsException e) {
            throw new RuntimeException("Unable to fetch the query from the Rya Streams subsystem.", e);
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to print the query to the console.", e);
        }
    }
}

