package org.apache.rya.periodic.notification.application;

import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.Snapshot;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
import org.apache.rya.periodic.notification.api.BindingSetRecord;
import org.apache.rya.periodic.notification.api.NodeBin;
import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor;
import org.apache.rya.periodic.notification.notification.TimestampedNotification;
import org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor;
import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor;
import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider;
import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider;
import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;

/* loaded from: input_file:org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.class */
public class PeriodicNotificationApplicationFactory {
    public static PeriodicNotificationApplication getPeriodicApplication(PeriodicNotificationApplicationConfiguration periodicNotificationApplicationConfiguration) throws PeriodicApplicationException {
        Properties kafkaConsumerProperties = getKafkaConsumerProperties(periodicNotificationApplicationConfiguration);
        Properties kafkaProducerProperties = getKafkaProducerProperties(periodicNotificationApplicationConfiguration);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        LinkedBlockingQueue linkedBlockingQueue3 = new LinkedBlockingQueue();
        try {
            PeriodicQueryResultStorage periodicQueryResultStorage = getPeriodicQueryResultStorage(periodicNotificationApplicationConfiguration);
            FluoClient fluoClient = FluoClientFactory.getFluoClient(periodicNotificationApplicationConfiguration.getFluoAppName(), Optional.of(periodicNotificationApplicationConfiguration.getFluoTableName()), periodicNotificationApplicationConfiguration);
            NotificationCoordinatorExecutor coordinator = getCoordinator(periodicNotificationApplicationConfiguration.getCoordinatorThreads(), linkedBlockingQueue);
            addRegisteredNotices(coordinator, fluoClient.newSnapshot());
            KafkaExporterExecutor exporter = getExporter(periodicNotificationApplicationConfiguration.getExporterThreads(), kafkaProducerProperties, linkedBlockingQueue3);
            return PeriodicNotificationApplication.builder().setCoordinator(coordinator).setProvider(getProvider(periodicNotificationApplicationConfiguration.getProducerThreads(), periodicNotificationApplicationConfiguration.getNotificationTopic(), coordinator, kafkaConsumerProperties)).setExporter(exporter).setProcessor(getProcessor(periodicQueryResultStorage, linkedBlockingQueue, linkedBlockingQueue2, linkedBlockingQueue3, periodicNotificationApplicationConfiguration.getProcessorThreads())).setPruner(getPruner(periodicQueryResultStorage, fluoClient, periodicNotificationApplicationConfiguration.getPrunerThreads(), linkedBlockingQueue2)).build();
        } catch (AccumuloException | AccumuloSecurityException e) {
            throw new PeriodicApplicationException(e.getMessage());
        }
    }

    private static void addRegisteredNotices(NotificationCoordinatorExecutor notificationCoordinatorExecutor, Snapshot snapshot) {
        notificationCoordinatorExecutor.start();
        new PeriodicNotificationProvider().processRegisteredNotifications(notificationCoordinatorExecutor, snapshot);
    }

    private static NotificationCoordinatorExecutor getCoordinator(int i, BlockingQueue<TimestampedNotification> blockingQueue) {
        return new PeriodicNotificationCoordinatorExecutor(i, blockingQueue);
    }

    private static KafkaExporterExecutor getExporter(int i, Properties properties, BlockingQueue<BindingSetRecord> blockingQueue) {
        return new KafkaExporterExecutor(new KafkaProducer(properties, new StringSerializer(), new BindingSetSerDe()), i, blockingQueue);
    }

    private static PeriodicQueryPrunerExecutor getPruner(PeriodicQueryResultStorage periodicQueryResultStorage, FluoClient fluoClient, int i, BlockingQueue<NodeBin> blockingQueue) {
        return new PeriodicQueryPrunerExecutor(periodicQueryResultStorage, fluoClient, i, blockingQueue);
    }

    private static NotificationProcessorExecutor getProcessor(PeriodicQueryResultStorage periodicQueryResultStorage, BlockingQueue<TimestampedNotification> blockingQueue, BlockingQueue<NodeBin> blockingQueue2, BlockingQueue<BindingSetRecord> blockingQueue3, int i) {
        return new NotificationProcessorExecutor(periodicQueryResultStorage, blockingQueue, blockingQueue2, blockingQueue3, i);
    }

    private static KafkaNotificationProvider getProvider(int i, String str, NotificationCoordinatorExecutor notificationCoordinatorExecutor, Properties properties) {
        return new KafkaNotificationProvider(str, new StringDeserializer(), new CommandNotificationSerializer(), properties, notificationCoordinatorExecutor, i);
    }

    private static PeriodicQueryResultStorage getPeriodicQueryResultStorage(PeriodicNotificationApplicationConfiguration periodicNotificationApplicationConfiguration) throws AccumuloException, AccumuloSecurityException {
        return new AccumuloPeriodicQueryResultStorage(new ZooKeeperInstance(periodicNotificationApplicationConfiguration.getAccumuloInstance(), periodicNotificationApplicationConfiguration.getAccumuloZookeepers()).getConnector(periodicNotificationApplicationConfiguration.getAccumuloUser(), new PasswordToken(periodicNotificationApplicationConfiguration.getAccumuloPassword())), periodicNotificationApplicationConfiguration.getTablePrefix());
    }

    private static Properties getKafkaConsumerProperties(PeriodicNotificationApplicationConfiguration periodicNotificationApplicationConfiguration) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", periodicNotificationApplicationConfiguration.getBootStrapServers());
        properties.setProperty("client.id", periodicNotificationApplicationConfiguration.getNotificationClientId());
        properties.setProperty("group.id", periodicNotificationApplicationConfiguration.getNotificationGroupId());
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("metadata.max.age.ms", "30000");
        return properties;
    }

    private static Properties getKafkaProducerProperties(PeriodicNotificationApplicationConfiguration periodicNotificationApplicationConfiguration) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", periodicNotificationApplicationConfiguration.getBootStrapServers());
        return properties;
    }
}
