package org.apache.rya.api.client.accumulo;

import com.google.common.base.Optional;
import java.util.Objects;
import java.util.Properties;
import org.apache.accumulo.core.client.Connector;
import org.apache.fluo.api.client.FluoClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.api.client.CreatePeriodicPCJ;
import org.apache.rya.api.client.GetInstanceDetails;
import org.apache.rya.api.client.InstanceDoesNotExistException;
import org.apache.rya.api.client.RyaClientException;
import org.apache.rya.api.instance.RyaDetails;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
import org.apache.rya.periodic.notification.notification.CommandNotification;
import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.QueryEvaluationException;
import org.openrdf.repository.RepositoryException;
import org.openrdf.sail.SailException;

/* loaded from: input_file:org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.class */
public class AccumuloCreatePeriodicPCJ extends AccumuloCommand implements CreatePeriodicPCJ {
    private final GetInstanceDetails getInstanceDetails;

    public AccumuloCreatePeriodicPCJ(AccumuloConnectionDetails accumuloConnectionDetails, Connector connector) {
        super(accumuloConnectionDetails, connector);
        this.getInstanceDetails = new AccumuloGetInstanceDetails(accumuloConnectionDetails, connector);
    }

    @Override // org.apache.rya.api.client.CreatePeriodicPCJ
    public String createPeriodicPCJ(String str, String str2, String str3, String str4) throws RyaClientException {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Optional<RyaDetails> details = this.getInstanceDetails.getDetails(str);
        if (!details.isPresent()) {
            throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", str));
        }
        RyaDetails.PCJIndexDetails pCJIndexDetails = details.get().getPCJIndexDetails();
        if (!pCJIndexDetails.isEnabled()) {
            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", str));
        }
        Optional<RyaDetails.PCJIndexDetails.FluoDetails> fluoDetails = pCJIndexDetails.getFluoDetails();
        if (!fluoDetails.isPresent()) {
            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", str));
        }
        try {
            return updateFluoAppAndRegisterWithKafka(str, fluoDetails.get().getUpdateAppName(), str2, str3, str4);
        } catch (RyaDAOException | CreatePeriodicQuery.PeriodicQueryCreationException | PcjException | MalformedQueryException | QueryEvaluationException | RepositoryException | SailException e) {
            throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
        } catch (UnsupportedQueryException e2) {
            throw new RyaClientException("The new PCJ could not be initialized because it either contains an unsupported query node or an invalid ExportStrategy for the given QueryType.  Projection queries can be exported to either Rya or Kafka,unless they contain an aggregation, in which case they can only be exported to Kafka.  Construct queries can be exportedto Rya and Kafka, and Periodic queries can only be exported to Rya.");
        }
    }

    private String updateFluoAppAndRegisterWithKafka(String str, String str2, String str3, String str4, String str5) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException, UnsupportedQueryException, CreatePeriodicQuery.PeriodicQueryCreationException {
        Objects.requireNonNull(str3);
        Objects.requireNonNull(str4);
        Objects.requireNonNull(str5);
        AccumuloPeriodicQueryResultStorage accumuloPeriodicQueryResultStorage = new AccumuloPeriodicQueryResultStorage(getConnector(), str);
        AccumuloConnectionDetails accumuloConnectionDetails = super.getAccumuloConnectionDetails();
        FluoClient connect = new FluoClientFactory().connect(accumuloConnectionDetails.getUsername(), new String(accumuloConnectionDetails.getPassword()), accumuloConnectionDetails.getInstanceName(), accumuloConnectionDetails.getZookeepers(), str2);
        Throwable th = null;
        try {
            String queryId = new CreatePeriodicQuery(connect, accumuloPeriodicQueryResultStorage).withRyaIntegration(str3, new KafkaNotificationRegistrationClient(str4, createProducer(str5)), getConnector(), str).getQueryId();
            if (connect != null) {
                if (0 != 0) {
                    try {
                        connect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    connect.close();
                }
            }
            return queryId;
        } catch (Throwable th3) {
            if (connect != null) {
                if (0 != 0) {
                    try {
                        connect.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    connect.close();
                }
            }
            throw th3;
        }
    }

    private static KafkaProducer<String, CommandNotification> createProducer(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName());
        return new KafkaProducer<>(properties);
    }
}
