package org.apache.hadoop.hdds.server.events;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdds/server/events/EventQueue.class */
public class EventQueue implements EventPublisher, AutoCloseable {
    private static final String EXECUTOR_NAME_SEPARATOR = "For";
    private final Map<Event, Map<EventExecutor, List<EventHandler>>> executors = new HashMap();
    private final AtomicLong queuedCount = new AtomicLong(0);
    private final AtomicLong eventCount = new AtomicLong(0);
    private boolean isRunning = true;
    private boolean isSilent = false;
    private static final Logger LOG = LoggerFactory.getLogger(EventQueue.class);
    private static final Gson TRACING_SERIALIZER = new GsonBuilder().create();

    public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(EVENT_TYPE event_type, EventHandler<PAYLOAD> eventHandler) {
        Preconditions.checkNotNull(eventHandler, "Handler should not be null.");
        validateEvent(event_type);
        addHandler(event_type, new SingleThreadExecutor(getExecutorName(event_type, eventHandler)), eventHandler);
    }

    public static <PAYLOAD> String getExecutorName(Event<PAYLOAD> event, EventHandler<PAYLOAD> eventHandler) {
        return StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR + generateHandlerName(eventHandler);
    }

    private <EVENT_TYPE extends Event<?>> void validateEvent(EVENT_TYPE event_type) {
        Preconditions.checkArgument(!event_type.getName().contains(EXECUTOR_NAME_SEPARATOR), "Event name should not contain For string.");
    }

    private static <PAYLOAD> String generateHandlerName(EventHandler<PAYLOAD> eventHandler) {
        return !eventHandler.getClass().isAnonymousClass() ? eventHandler.getClass().getSimpleName() : eventHandler.getClass().getName();
    }

    public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(EVENT_TYPE event_type, EventExecutor<PAYLOAD> eventExecutor, EventHandler<PAYLOAD> eventHandler) {
        if (!this.isRunning) {
            LOG.warn("Not adding handler for {}, EventQueue is not running", event_type);
            return;
        }
        validateEvent(event_type);
        String executorName = getExecutorName(event_type, eventHandler);
        Preconditions.checkState(executorName.equals(eventExecutor.getName()), "Event Executor name is not matching the specified format. It should be " + executorName + " but it is " + eventExecutor.getName());
        this.executors.putIfAbsent(event_type, new HashMap());
        this.executors.get(event_type).putIfAbsent(eventExecutor, new ArrayList());
        this.executors.get(event_type).get(eventExecutor).add(eventHandler);
    }

    @Override // org.apache.hadoop.hdds.server.events.EventPublisher
    public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(EVENT_TYPE event_type, PAYLOAD payload) {
        if (!this.isRunning) {
            LOG.warn("Processing of {} is skipped, EventQueue is not running", event_type);
            return;
        }
        Map<EventExecutor, List<EventHandler>> map = this.executors.get(event_type);
        this.eventCount.incrementAndGet();
        if (map == null) {
            if (this.isSilent) {
                return;
            }
            LOG.warn("No event handler registered for event {}", event_type);
            return;
        }
        for (Map.Entry<EventExecutor, List<EventHandler>> entry : map.entrySet()) {
            for (EventHandler<PAYLOAD> eventHandler : entry.getValue()) {
                this.queuedCount.incrementAndGet();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Delivering [event={}] to executor/handler {}: <json>{}</json>", new Object[]{event_type.getName(), entry.getKey().getName(), TRACING_SERIALIZER.toJson(payload).replaceAll("\n", "\\\\n")});
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug("Delivering [event={}] to executor/handler {}: {}", new Object[]{event_type.getName(), entry.getKey().getName(), payload.getClass().getSimpleName()});
                }
                entry.getKey().onMessage(eventHandler, payload, this);
            }
        }
    }

    @VisibleForTesting
    public void processAll(long j) {
        long now = Time.now();
        while (this.isRunning) {
            if (this.executors.values().stream().flatMap(map -> {
                return map.keySet().stream();
            }).allMatch(eventExecutor -> {
                return eventExecutor.queuedEvents() == eventExecutor.successfulEvents() + eventExecutor.failedEvents();
            })) {
                return;
            }
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                LOG.warn("Interrupted exception while sleeping.", e);
                Thread.currentThread().interrupt();
            }
            if (Time.now() > now + j) {
                throw new AssertionError("Messages are not processed in the given timeframe. Queued: " + this.queuedCount.get() + " Processed: 0");
            }
        }
        LOG.warn("Processing of event skipped. EventQueue is not running");
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.isRunning = false;
        ((Set) this.executors.values().stream().flatMap(map -> {
            return map.keySet().stream();
        }).collect(Collectors.toSet())).forEach(eventExecutor -> {
            try {
                eventExecutor.close();
            } catch (Exception e) {
                LOG.error("Can't close the executor " + eventExecutor.getName(), e);
            }
        });
    }

    public void setSilent(boolean z) {
        this.isSilent = z;
    }

    @VisibleForTesting
    public Map<EventExecutor, List<EventHandler>> getExecutorAndHandler(Event event) {
        return this.executors.get(event);
    }
}
