/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.server.events;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.server.events.EventExecutor;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Metrics(context="EventQueue")
public class SingleThreadExecutor<T>
implements EventExecutor<T> {
    public static final String THREAD_NAME_PREFIX = "EventQueue";
    private static final Logger LOG = LoggerFactory.getLogger(SingleThreadExecutor.class);
    private final String name;
    private final ThreadPoolExecutor executor;
    @Metric
    private MutableCounterLong queued;
    @Metric
    private MutableCounterLong done;
    @Metric
    private MutableCounterLong failed;

    public SingleThreadExecutor(String name) {
        this.name = name;
        DefaultMetricsSystem.instance().register(THREAD_NAME_PREFIX + name, "Event Executor metrics ", (Object)this);
        LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
        this.executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, workQueue, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("EventQueue-" + name);
            return thread;
        });
    }

    @Override
    public void onMessage(EventHandler<T> handler, T message, EventPublisher publisher) {
        this.queued.incr();
        this.executor.execute(() -> {
            try {
                handler.onMessage(message, publisher);
                this.done.incr();
            }
            catch (Exception ex) {
                LOG.error("Error on execution message {}", message, (Object)ex);
                this.failed.incr();
            }
        });
    }

    @Override
    public long failedEvents() {
        return this.failed.value();
    }

    @Override
    public long successfulEvents() {
        return this.done.value();
    }

    @Override
    public long queuedEvents() {
        return this.queued.value();
    }

    @Override
    public void close() {
        this.executor.shutdown();
    }

    @Override
    public String getName() {
        return this.name;
    }
}

