/*
 * Decompiled with CFR 0.152.
 */
package org.apache.asterix.active;

import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hyracks.api.exceptions.HyracksDataException;

public abstract class SingleThreadEventProcessor<T>
implements Runnable {
    private static final Logger LOGGER = Logger.getLogger(SingleThreadEventProcessor.class.getName());
    private final String name;
    private final LinkedBlockingQueue<T> eventInbox;
    private final ExecutorService executorService;
    private final Future<?> future;

    public SingleThreadEventProcessor(String threadName) {
        this.name = threadName;
        this.eventInbox = new LinkedBlockingQueue();
        this.executorService = Executors.newSingleThreadExecutor(r -> new Thread(r, threadName));
        this.future = this.executorService.submit(this);
    }

    @Override
    public final void run() {
        LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName());
        while (!Thread.currentThread().isInterrupted()) {
            try {
                T event = this.eventInbox.take();
                this.handle(event);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                LOGGER.log(Level.SEVERE, "Error handling an event", e);
            }
        }
        LOGGER.log(Level.WARNING, "Stopped " + Thread.currentThread().getName());
    }

    protected abstract void handle(T var1) throws Exception;

    public void add(T event) {
        if (!this.eventInbox.add(event)) {
            throw new IllegalStateException();
        }
    }

    public void stop() throws HyracksDataException, InterruptedException {
        this.future.cancel(true);
        this.executorService.shutdown();
        if (!this.executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
            throw HyracksDataException.create((int)3100, (Serializable[])new Serializable[]{this.name});
        }
    }
}

