package com.ning.metrics.collector.processing.db;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Objects;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.mogwee.executors.LoggingExecutor;
import com.mogwee.executors.NamedThreadFactory;
import com.ning.arecibo.jmx.Monitored;
import com.ning.arecibo.jmx.MonitoringType;
import com.ning.metrics.collector.binder.config.CollectorConfig;
import com.ning.metrics.collector.processing.EventSpoolProcessor;
import com.ning.metrics.collector.processing.SerializationType;
import com.ning.metrics.collector.processing.db.model.FeedEvent;
import com.ning.metrics.collector.processing.db.model.FeedEventData;
import com.ning.metrics.collector.processing.db.model.Subscription;
import com.ning.metrics.collector.processing.quartz.FeedEventCleanUpJob;
import com.ning.metrics.collector.processing.quartz.FeedUpdateQuartzJob;
import com.ning.metrics.serialization.event.Event;
import com.ning.metrics.serialization.event.EventDeserializer;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.SimpleTrigger;
import org.quartz.TriggerBuilder;
import org.skife.config.TimeSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ning/metrics/collector/processing/db/FeedEventSpoolProcessor.class */
public class FeedEventSpoolProcessor implements EventSpoolProcessor {
    private final CollectorConfig config;
    private final SubscriptionStorage subscriptionStorage;
    private final FeedEventStorage feedEventStorage;
    private static final String PROCESSOR_NAME = "DBWriter";
    private final ExecutorService executorService;
    private final TimeSpan executorShutdownTimeOut;
    private final Scheduler quartzScheduler;
    private static final Logger log = LoggerFactory.getLogger(FeedEventSpoolProcessor.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private final AtomicBoolean isCleanupCronJobScheduled = new AtomicBoolean(false);
    private final BlockingQueue<FeedEvent> eventStorageBuffer = new ArrayBlockingQueue(1000, false);

    /* loaded from: input_file:com/ning/metrics/collector/processing/db/FeedEventSpoolProcessor$FeedEventInserter.class */
    private static class FeedEventInserter implements Runnable {
        private final ExecutorService es;
        private final FeedEventSpoolProcessor feedEventSpoolProcessor;

        public FeedEventInserter(ExecutorService executorService, FeedEventSpoolProcessor feedEventSpoolProcessor) {
            this.es = executorService;
            this.feedEventSpoolProcessor = feedEventSpoolProcessor;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.feedEventSpoolProcessor.flushFeedEventsToDB();
            this.es.submit(this);
        }
    }

    /* loaded from: input_file:com/ning/metrics/collector/processing/db/FeedEventSpoolProcessor$FeedEventScheduledCleaner.class */
    private class FeedEventScheduledCleaner implements Runnable {
        private FeedEventScheduledCleaner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            FeedEventSpoolProcessor.this.feedEventStorage.cleanOldFeedEvents();
        }
    }

    @Inject
    public FeedEventSpoolProcessor(CollectorConfig collectorConfig, SubscriptionStorage subscriptionStorage, FeedEventStorage feedEventStorage, Scheduler scheduler) throws SchedulerException {
        this.config = collectorConfig;
        this.subscriptionStorage = subscriptionStorage;
        this.feedEventStorage = feedEventStorage;
        this.executorShutdownTimeOut = collectorConfig.getSpoolWriterExecutorShutdownTime();
        this.quartzScheduler = scheduler;
        if (!Splitter.on(collectorConfig.getFilters()).omitEmptyStrings().splitToList(collectorConfig.getFiltersEventType()).contains(DBStorageTypes.FEED_EVENT.getDbStorageType())) {
            this.executorService = null;
            return;
        }
        this.executorService = new LoggingExecutor(1, 1, Long.MAX_VALUE, TimeUnit.DAYS, new ArrayBlockingQueue(2), new NamedThreadFactory("FeedEvents-Storage-Threads"), new ThreadPoolExecutor.CallerRunsPolicy());
        this.executorService.submit(new FeedEventInserter(this.executorService, this));
        if (scheduler.isStarted()) {
            return;
        }
        scheduler.start();
        scheduleFeedEventCleanupCronJob();
    }

    @Override // com.ning.metrics.collector.processing.EventSpoolProcessor
    public void processEventFile(String str, SerializationType serializationType, File file, String str2) throws IOException {
        EventDeserializer deSerializer = serializationType.getDeSerializer(new FileInputStream(file));
        while (deSerializer.hasNextEvent()) {
            Event nextEvent = deSerializer.getNextEvent();
            log.debug(String.format("Recieved DB Event to store with name as %s ", nextEvent.getName()));
            if (nextEvent.getName().equalsIgnoreCase(DBStorageTypes.FEED_EVENT.getDbStorageType())) {
                log.debug(String.format("DB Event body to store is %s", nextEvent.getData()));
                FeedEventData feedEventData = (FeedEventData) mapper.readValue(nextEvent.getData().toString(), FeedEventData.class);
                boolean equal = Objects.equal(FeedEventData.EVENT_TYPE_SUPPRESS, feedEventData.getEventType());
                HashSet<Subscription> hashSet = new HashSet();
                for (String str3 : feedEventData.getTopics()) {
                    hashSet.addAll(equal ? this.subscriptionStorage.loadByStartsWithTopic(str3) : this.subscriptionStorage.loadByTopic(str3));
                }
                if (!hashSet.isEmpty()) {
                    for (Subscription subscription : hashSet) {
                        addToBuffer(nextEvent.getName(), new FeedEvent(feedEventData, subscription.getChannel(), subscription.getId(), subscription.getMetadata()));
                    }
                }
            }
        }
    }

    private void addToBuffer(String str, FeedEvent feedEvent) {
        try {
            this.eventStorageBuffer.put(feedEvent);
        } catch (InterruptedException e) {
            log.warn(String.format("Could not add event %s to the buffer", str), e);
        }
    }

    public void flushFeedEventsToDB() {
        int drainTo;
        try {
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(this.eventStorageBuffer.size());
            boolean z = false;
            do {
                drainTo = this.eventStorageBuffer.drainTo(newArrayListWithCapacity, 1000);
                if (drainTo > 0) {
                    z = true;
                    List<String> insert = this.feedEventStorage.insert(newArrayListWithCapacity);
                    log.info(String.format("Inserted %d events successfully!", Integer.valueOf(drainTo)));
                    newArrayListWithCapacity.clear();
                    scheduleFeedCollectionJob(insert);
                }
            } while (drainTo > 0);
            if (!z) {
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        } catch (Exception e2) {
            log.warn("unexpected exception trying to insert events!", e2);
        }
    }

    private void scheduleFeedCollectionJob(List<String> list) {
        try {
            if (this.quartzScheduler.isStarted()) {
                SimpleTrigger build = TriggerBuilder.newTrigger().withIdentity(UUID.randomUUID().toString() + "_feedUpdateTrigger", "feedUpdateGroup").withSchedule(SimpleScheduleBuilder.simpleSchedule().withMisfireHandlingInstructionFireNow()).build();
                JobDataMap jobDataMap = new JobDataMap();
                jobDataMap.put("feedEventIdList", list);
                this.quartzScheduler.scheduleJob(JobBuilder.newJob(FeedUpdateQuartzJob.class).withIdentity(UUID.randomUUID().toString() + "_feedUpdateJob", "feedUpdateJobGroup").usingJobData(jobDataMap).build(), build);
            }
        } catch (SchedulerException e) {
            log.warn("unexpected exception trying to schedule Quartz job for feed preparation of the inserted events!", e);
        }
    }

    @Override // com.ning.metrics.collector.processing.EventSpoolProcessor
    public void close() {
        try {
            this.feedEventStorage.cleanUp();
            this.subscriptionStorage.cleanUp();
            log.info("Shutting Down Executor Service for Feed Event Storage");
            if (this.executorService != null) {
                this.executorService.shutdown();
                try {
                    this.executorService.awaitTermination(this.executorShutdownTimeOut.getPeriod(), this.executorShutdownTimeOut.getUnit());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                this.executorService.shutdownNow();
            }
            log.info("Executor Service for Feed Event Storage Shut Down success!");
            if (!this.eventStorageBuffer.isEmpty()) {
                log.info("Flushing remaining events to database");
                flushFeedEventsToDB();
            }
            log.info("Shutting Down Quartz Scheduler");
            try {
                if (!this.quartzScheduler.isShutdown()) {
                    JobKey jobKey = new JobKey("feedEventCleanupCronJob", "feedEventCleanupCronJobGroup");
                    if (this.quartzScheduler.checkExists(jobKey)) {
                        this.quartzScheduler.deleteJob(jobKey);
                    }
                    this.quartzScheduler.shutdown(true);
                }
            } catch (SchedulerException e2) {
                log.error("Unexpected error while shutting down Quartz Scheduler!", e2);
            }
            log.info("Quartz Scheduler shutdown success");
        } catch (Throwable th) {
            log.info("Shutting Down Executor Service for Feed Event Storage");
            if (this.executorService != null) {
                this.executorService.shutdown();
                try {
                    this.executorService.awaitTermination(this.executorShutdownTimeOut.getPeriod(), this.executorShutdownTimeOut.getUnit());
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                }
                this.executorService.shutdownNow();
            }
            log.info("Executor Service for Feed Event Storage Shut Down success!");
            if (!this.eventStorageBuffer.isEmpty()) {
                log.info("Flushing remaining events to database");
                flushFeedEventsToDB();
            }
            log.info("Shutting Down Quartz Scheduler");
            try {
                if (!this.quartzScheduler.isShutdown()) {
                    JobKey jobKey2 = new JobKey("feedEventCleanupCronJob", "feedEventCleanupCronJobGroup");
                    if (this.quartzScheduler.checkExists(jobKey2)) {
                        this.quartzScheduler.deleteJob(jobKey2);
                    }
                    this.quartzScheduler.shutdown(true);
                }
            } catch (SchedulerException e4) {
                log.error("Unexpected error while shutting down Quartz Scheduler!", e4);
            }
            log.info("Quartz Scheduler shutdown success");
            throw th;
        }
    }

    private void scheduleFeedEventCleanupCronJob() throws SchedulerException {
        if (!this.quartzScheduler.isStarted() || this.isCleanupCronJobScheduled.get()) {
            return;
        }
        JobKey jobKey = new JobKey("feedEventCleanupCronJob", "feedEventCleanupCronJobGroup");
        if (!this.quartzScheduler.checkExists(jobKey)) {
            this.quartzScheduler.scheduleJob(JobBuilder.newJob(FeedEventCleanUpJob.class).withIdentity(jobKey).build(), TriggerBuilder.newTrigger().withIdentity("feedEventCleanupCronTrigger", "feedEventCleanupCronTriggerGroup").withSchedule(CronScheduleBuilder.cronSchedule(this.config.getFeedEventsCleanupCronExpression()).withMisfireHandlingInstructionDoNothing()).build());
        }
        this.isCleanupCronJobScheduled.set(true);
    }

    @Override // com.ning.metrics.collector.processing.EventSpoolProcessor
    public String getProcessorName() {
        return PROCESSOR_NAME;
    }

    @Monitored(description = "Number of events in buffer", monitoringType = {MonitoringType.VALUE, MonitoringType.RATE})
    public long getEventsInBuffer() {
        return this.eventStorageBuffer.size();
    }
}
