/*
 * Decompiled with CFR 0.152.
 */
package net.solarnetwork.central.biz.dao;

import com.fasterxml.uuid.UUIDComparator;
import java.util.Comparator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import net.solarnetwork.central.biz.UserEventAppenderBiz;
import net.solarnetwork.central.common.dao.UserEventAppenderDao;
import net.solarnetwork.central.domain.LogEventInfo;
import net.solarnetwork.central.domain.UserEvent;
import net.solarnetwork.central.support.MqttJsonPublisher;
import net.solarnetwork.service.PingTest;
import net.solarnetwork.service.PingTestResult;
import net.solarnetwork.service.ServiceLifecycleObserver;
import net.solarnetwork.util.ObjectUtils;
import net.solarnetwork.util.StatTracker;
import net.solarnetwork.util.TimeBasedV7UuidGenerator;
import net.solarnetwork.util.UuidGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncDaoUserEventAppenderBiz
implements UserEventAppenderBiz,
PingTest,
ServiceLifecycleObserver,
Runnable {
    public final int DEFAULT_QUEUE_LAG_ALERT_THRESHOLD = 500;
    private static final Logger log = LoggerFactory.getLogger(AsyncDaoUserEventAppenderBiz.class);
    public static Comparator<UserEvent> EVENT_SORT = new Comparator<UserEvent>(){

        @Override
        public int compare(UserEvent o1, UserEvent o2) {
            int comparison = UUIDComparator.staticCompare((UUID)o1.getEventId(), (UUID)o2.getEventId());
            if (comparison != 0) {
                return comparison;
            }
            return o1.getUserId().compareTo(o2.getUserId());
        }
    };
    public static Function<UserEvent, String> SOLARFLUX_TOPIC_FN = event -> "user/" + event.getUserId() + "/event";
    public static Function<UserEvent, String> SOLARFLUX_TAGGED_TOPIC_FN = event -> {
        StringBuilder buf = new StringBuilder("user/");
        buf.append(event.getUserId()).append("/event");
        String[] tags = event.getTags();
        int tagLen = tags.length;
        for (int i = 0; i < tagLen; ++i) {
            buf.append('/');
            buf.append(tags[i]);
        }
        return buf.toString();
    };
    private final ExecutorService executorService;
    private final UserEventAppenderDao dao;
    private final StatTracker stats;
    private final BlockingQueue<UserEvent> queue;
    private final UuidGenerator uuidGenerator;
    private MqttJsonPublisher<UserEvent> solarFluxPublisher;
    private int queueLagAlertThreshold = 500;

    public AsyncDaoUserEventAppenderBiz(ExecutorService executorService, UserEventAppenderDao dao) {
        this(executorService, dao, new PriorityBlockingQueue<UserEvent>(64, EVENT_SORT), new StatTracker("AsyncDaoUserEventAppender", null, log, 500), (UuidGenerator)TimeBasedV7UuidGenerator.INSTANCE_MICROS);
    }

    public AsyncDaoUserEventAppenderBiz(ExecutorService executorService, UserEventAppenderDao dao, BlockingQueue<UserEvent> queue, StatTracker stats, UuidGenerator uuidGenerator) {
        this.executorService = (ExecutorService)ObjectUtils.requireNonNullArgument((Object)executorService, (String)"executorService");
        this.dao = (UserEventAppenderDao)ObjectUtils.requireNonNullArgument((Object)dao, (String)"datumDao");
        this.queue = (BlockingQueue)ObjectUtils.requireNonNullArgument(queue, (String)"queue");
        this.stats = (StatTracker)ObjectUtils.requireNonNullArgument((Object)stats, (String)"stats");
        this.uuidGenerator = (UuidGenerator)ObjectUtils.requireNonNullArgument((Object)uuidGenerator, (String)"uuidGenerator");
    }

    @Override
    public UserEvent addEvent(Long userId, LogEventInfo info) {
        UserEvent event = new UserEvent(userId, this.uuidGenerator.generate(), ((LogEventInfo)ObjectUtils.requireNonNullArgument((Object)info, (String)"info")).getTags(), info.getMessage(), info.getData());
        this.queue.offer(event);
        this.stats.increment((Enum)UserEventStats.EventsAdded);
        try {
            this.executorService.execute(this);
        }
        catch (RejectedExecutionException e) {
            log.warn("Discarding UserEvent {} because of RejectedExecutionException: {}", (Object)event, (Object)e.getMessage());
        }
        return event;
    }

    @Override
    public void run() {
        UserEvent event = (UserEvent)this.queue.poll();
        if (event != null) {
            try {
                this.dao.add(event);
                this.stats.increment((Enum)UserEventStats.EventsStored);
            }
            catch (RuntimeException e) {
                log.error("Unable to add event {} to DAO: {}", new Object[]{event, e.getMessage(), e});
            }
            MqttJsonPublisher<UserEvent> flux = this.getSolarFluxPublisher();
            if (flux != null) {
                flux.apply((Object)event);
            }
        }
    }

    public void serviceDidStartup() {
    }

    public void serviceDidShutdown() {
        if (!this.executorService.isShutdown()) {
            this.executorService.shutdown();
            try {
                this.executorService.awaitTermination(1L, TimeUnit.MINUTES);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    public String getPingTestId() {
        return this.getClass().getName();
    }

    public String getPingTestName() {
        return "Async DAO User Event Appender";
    }

    public long getPingTestMaximumExecutionMilliseconds() {
        return 1000L;
    }

    public PingTest.Result performPingTest() throws Exception {
        Long removeCount;
        NavigableMap allStats = this.stats.allCounts();
        Long addCount = (Long)allStats.get(UserEventStats.EventsAdded.name());
        long lagDiff = (addCount != null ? addCount : 0L) - ((removeCount = (Long)allStats.get(UserEventStats.EventsStored.name())) != null ? removeCount : 0L);
        if (lagDiff > (long)this.queueLagAlertThreshold) {
            return new PingTestResult(false, String.format("Queue removal lag %d > %d", lagDiff, this.queueLagAlertThreshold), (Map)allStats);
        }
        return new PingTestResult(true, String.format("Processed %d events; lag %d.", addCount != null ? addCount : 0L, lagDiff), (Map)allStats);
    }

    public int getQueueLagAlertThreshold() {
        return this.queueLagAlertThreshold;
    }

    public void setQueueLagAlertThreshold(int queueLagAlertThreshold) {
        this.queueLagAlertThreshold = queueLagAlertThreshold;
    }

    public MqttJsonPublisher<UserEvent> getSolarFluxPublisher() {
        return this.solarFluxPublisher;
    }

    public void setSolarFluxPublisher(MqttJsonPublisher<UserEvent> solarFluxPublisher) {
        this.solarFluxPublisher = solarFluxPublisher;
    }

    public static enum UserEventStats {
        EventsAdded,
        EventsStored;

    }
}

