package org.apache.hadoop.hdds.server.events;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.utils.MetricsUtil;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Metrics(context = FixedThreadPoolWithAffinityExecutor.EVENT_QUEUE)
/* loaded from: input_file:org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.class */
public class FixedThreadPoolWithAffinityExecutor<P, Q> implements EventExecutor<P> {
    private static final String EVENT_QUEUE = "EventQueue";
    private static final Logger LOG = LoggerFactory.getLogger(FixedThreadPoolWithAffinityExecutor.class);
    private final Map<String, FixedThreadPoolWithAffinityExecutor> executorMap;
    private final String name;
    private final EventHandler<P> eventHandler;
    private final EventPublisher eventPublisher;
    private final List<BlockingQueue<Q>> workQueues;
    private final List<ThreadPoolExecutor> executors;

    @Metric
    private MutableCounterLong queued;

    @Metric
    private MutableCounterLong done;

    @Metric
    private MutableCounterLong failed;

    @Metric
    private MutableCounterLong scheduled;

    @Metric
    private MutableCounterLong dropped;

    @Metric
    private MutableCounterLong longWaitInQueue;

    @Metric
    private MutableCounterLong longTimeExecution;
    private final AtomicBoolean isRunning = new AtomicBoolean(true);
    private long queueWaitThreshold = 60000;
    private long execWaitThreshold = 120000;

    /* loaded from: input_file:org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor$ContainerReportProcessTask.class */
    public static class ContainerReportProcessTask<P> implements Runnable {
        private BlockingQueue<P> queue;
        private AtomicBoolean isRunning;
        private Map<String, FixedThreadPoolWithAffinityExecutor> executorMap;

        public ContainerReportProcessTask(BlockingQueue<P> blockingQueue, AtomicBoolean atomicBoolean, Map<String, FixedThreadPoolWithAffinityExecutor> map) {
            this.queue = blockingQueue;
            this.isRunning = atomicBoolean;
            this.executorMap = map;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.isRunning.get()) {
                try {
                    P poll = this.queue.poll(1L, TimeUnit.MILLISECONDS);
                    if (poll != null) {
                        FixedThreadPoolWithAffinityExecutor fixedThreadPoolWithAffinityExecutor = this.executorMap.get(poll.getClass().getName());
                        if (null == fixedThreadPoolWithAffinityExecutor) {
                            FixedThreadPoolWithAffinityExecutor.LOG.warn("Executor for report is not found");
                        } else {
                            long j = 0;
                            String str = "";
                            if (poll instanceof IEventInfo) {
                                j = ((IEventInfo) poll).getCreateTime();
                                str = ((IEventInfo) poll).getEventId();
                            }
                            long monotonicNow = Time.monotonicNow();
                            if (j != 0 && monotonicNow - j > fixedThreadPoolWithAffinityExecutor.queueWaitThreshold) {
                                fixedThreadPoolWithAffinityExecutor.longWaitInQueue.incr();
                                FixedThreadPoolWithAffinityExecutor.LOG.warn("Event remained in queue for long time {} millisec, {}", Long.valueOf(monotonicNow - j), str);
                            }
                            fixedThreadPoolWithAffinityExecutor.scheduled.incr();
                            try {
                                fixedThreadPoolWithAffinityExecutor.eventHandler.onMessage(poll, fixedThreadPoolWithAffinityExecutor.eventPublisher);
                                fixedThreadPoolWithAffinityExecutor.done.incr();
                                long monotonicNow2 = Time.monotonicNow();
                                if (j != 0 && monotonicNow2 - j > fixedThreadPoolWithAffinityExecutor.execWaitThreshold) {
                                    fixedThreadPoolWithAffinityExecutor.longTimeExecution.incr();
                                    FixedThreadPoolWithAffinityExecutor.LOG.warn("Event taken long execution time {} millisec, {}", Long.valueOf(monotonicNow2 - j), str);
                                }
                            } catch (Exception e) {
                                FixedThreadPoolWithAffinityExecutor.LOG.error("Error on execution message {}", poll, e);
                                fixedThreadPoolWithAffinityExecutor.failed.incr();
                            }
                            if (Thread.currentThread().isInterrupted()) {
                                FixedThreadPoolWithAffinityExecutor.LOG.warn("Interrupt of execution of Reports");
                                return;
                            }
                        }
                    }
                } catch (InterruptedException e2) {
                    FixedThreadPoolWithAffinityExecutor.LOG.warn("Interrupt of execution of Reports");
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor$IQueueMetrics.class */
    public interface IQueueMetrics {
        int getAndResetDropCount(String str);
    }

    public FixedThreadPoolWithAffinityExecutor(String str, EventHandler<P> eventHandler, List<BlockingQueue<Q>> list, EventPublisher eventPublisher, Class<P> cls, List<ThreadPoolExecutor> list2, Map<String, FixedThreadPoolWithAffinityExecutor> map) {
        this.name = str;
        this.eventHandler = eventHandler;
        this.workQueues = list;
        this.eventPublisher = eventPublisher;
        this.executors = list2;
        this.executorMap = map;
        map.put(cls.getName(), this);
        int i = 0;
        for (BlockingQueue<Q> blockingQueue : list) {
            ThreadPoolExecutor threadPoolExecutor = list2.get(i);
            if (threadPoolExecutor.getActiveCount() == 0) {
                threadPoolExecutor.submit(new ContainerReportProcessTask(blockingQueue, this.isRunning, map));
            }
            i++;
        }
        MetricsUtil.registerDynamic(this, EVENT_QUEUE + str, "Event Executor metrics ", EVENT_QUEUE);
    }

    public void setQueueWaitThreshold(long j) {
        this.queueWaitThreshold = j;
    }

    public void setExecWaitThreshold(long j) {
        this.execWaitThreshold = j;
    }

    public static <Q> List<ThreadPoolExecutor> initializeExecutorPool(List<BlockingQueue<Q>> list) {
        return initializeExecutorPool("", list);
    }

    public static <Q> List<ThreadPoolExecutor> initializeExecutorPool(String str, List<BlockingQueue<Q>> list) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < list.size(); i++) {
            arrayList.add(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue(1), new ThreadFactoryBuilder().setDaemon(true).setNameFormat(str + "FixedThreadPoolWithAffinityExecutor-" + i + "-%d").build()));
        }
        return arrayList;
    }

    @Override // org.apache.hadoop.hdds.server.events.EventExecutor
    public void onMessage(EventHandler<P> eventHandler, P p, EventPublisher eventPublisher) {
        this.queued.incr();
        BlockingQueue<Q> blockingQueue = this.workQueues.get(p.hashCode() & (this.workQueues.size() - 1));
        blockingQueue.add(p);
        if (blockingQueue instanceof IQueueMetrics) {
            this.dropped.incr(((IQueueMetrics) blockingQueue).getAndResetDropCount(p.getClass().getSimpleName()));
        }
    }

    @Override // org.apache.hadoop.hdds.server.events.EventExecutor
    public long failedEvents() {
        return this.failed.value();
    }

    @Override // org.apache.hadoop.hdds.server.events.EventExecutor
    public long successfulEvents() {
        return this.done.value();
    }

    @Override // org.apache.hadoop.hdds.server.events.EventExecutor
    public long queuedEvents() {
        return this.queued.value();
    }

    @Override // org.apache.hadoop.hdds.server.events.EventExecutor
    public long scheduledEvents() {
        return this.scheduled.value();
    }

    @Override // org.apache.hadoop.hdds.server.events.EventExecutor
    public long droppedEvents() {
        return this.dropped.value();
    }

    @Override // org.apache.hadoop.hdds.server.events.EventExecutor
    public long longWaitInQueueEvents() {
        return this.longWaitInQueue.value();
    }

    @Override // org.apache.hadoop.hdds.server.events.EventExecutor
    public long longTimeExecutionEvents() {
        return this.longTimeExecution.value();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.isRunning.set(false);
        Iterator<ThreadPoolExecutor> it = this.executors.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        this.executorMap.clear();
        DefaultMetricsSystem.instance().unregisterSource(EVENT_QUEUE + this.name);
    }

    @Override // org.apache.hadoop.hdds.server.events.EventExecutor
    public String getName() {
        return this.name;
    }
}
