package com.github.pandaxz.events.holder;

import com.github.pandaxz.events.dto.Change;
import com.github.pandaxz.events.holder.limit.LimitObserver;
import com.github.pandaxz.events.holder.limit.LimitObserverImpl;
import com.github.pandaxz.events.holder.limit.NoLimitObserverImpl;
import com.github.pandaxz.events.holder.statistic.EventQueueStatisticHandler;
import java.io.Closeable;
import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/pandaxz/events/holder/EventQueue.class */
public class EventQueue implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(EventQueue.class);
    private CompletableFuture poller;
    private EventQueueStatisticHandler statisticHandler;
    private LimitObserver limitObserver;
    private volatile boolean isReceiving = true;
    private volatile boolean isHandling = true;
    private BlockingQueue<Change<Map<String, String>>> eventQueue = new LinkedBlockingQueue();

    public EventQueue(EventHandler eventHandler, Executor executor, EventQueueStatisticHandler eventQueueStatisticHandler, int i, CountLatch countLatch) {
        if (i > 0) {
            this.limitObserver = new LimitObserverImpl(i, countLatch);
        } else {
            this.limitObserver = new NoLimitObserverImpl();
        }
        this.statisticHandler = eventQueueStatisticHandler;
        Runnable runnable = () -> {
            while (this.isHandling) {
                try {
                    Change<Map<String, String>> take = this.eventQueue.take();
                    this.statisticHandler.eventPolledFromQueue(Instant.now(Clock.systemUTC()), take);
                    handleEvent(take, eventHandler);
                    this.statisticHandler.eventHandled(Instant.now(Clock.systemUTC()), take);
                    this.limitObserver.delete();
                } catch (Exception e) {
                    return;
                }
            }
        };
        if (executor == null) {
            this.poller = CompletableFuture.runAsync(runnable);
        } else {
            this.poller = CompletableFuture.runAsync(runnable, executor);
        }
    }

    public EventQueue(Set<EventHandler> set, Executor executor, EventQueueStatisticHandler eventQueueStatisticHandler, int i, CountLatch countLatch, Executor executor2) {
        if (i > 0) {
            this.limitObserver = new LimitObserverImpl(i, countLatch);
        } else {
            this.limitObserver = new NoLimitObserverImpl();
        }
        this.statisticHandler = eventQueueStatisticHandler;
        Runnable runnable = () -> {
            while (this.isHandling) {
                try {
                    Change<Map<String, String>> take = this.eventQueue.take();
                    this.statisticHandler.eventPolledFromQueue(Instant.now(Clock.systemUTC()), take);
                    handle(set, take, executor2);
                    this.statisticHandler.eventHandled(Instant.now(Clock.systemUTC()), take);
                    this.limitObserver.delete();
                } catch (Exception e) {
                    return;
                }
            }
        };
        if (executor == null) {
            this.poller = CompletableFuture.runAsync(runnable);
        } else {
            this.poller = CompletableFuture.runAsync(runnable, executor);
        }
    }

    public void add(Change<Map<String, String>> change) {
        if (!this.isReceiving) {
            throw new RuntimeException("Event queue stop work");
        }
        this.statisticHandler.eventAddedToQueue(Instant.now(Clock.systemUTC()), change);
        this.limitObserver.add();
        this.eventQueue.add(change);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.isReceiving = false;
        while (!this.eventQueue.isEmpty()) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                logger.info("Event queue is interrupted, probably you lost events");
            }
        }
        this.isHandling = false;
    }

    private void handle(Set<EventHandler> set, Change<Map<String, String>> change, Executor executor) {
        CompletableFuture.allOf((CompletableFuture[]) set.stream().map(eventHandler -> {
            Runnable runnable = () -> {
                handleEvent(change, eventHandler);
            };
            return executor == null ? CompletableFuture.runAsync(runnable) : CompletableFuture.runAsync(runnable, executor);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).join();
    }

    private void handleEvent(Change<Map<String, String>> change, EventHandler eventHandler) {
        long millis = Clock.systemUTC().millis();
        try {
            eventHandler.handle(change);
        } catch (Exception e) {
            logger.error("Error when handling event: " + change + " exception: ", e);
        }
        this.statisticHandler.eventHandled(eventHandler.getHandlerName(), Long.valueOf(Clock.systemUTC().millis() - millis), change);
    }
}
