package org.apache.hyracks.api.util;

import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/hyracks/api/util/SingleThreadEventProcessor.class */
public abstract class SingleThreadEventProcessor<T> implements Runnable {
    private static final Logger LOGGER = LogManager.getLogger();
    protected final String name;
    private volatile Thread executorThread;
    private volatile boolean stopped = false;
    private final LinkedBlockingQueue<T> eventInbox = new LinkedBlockingQueue<>();

    public SingleThreadEventProcessor(String str) {
        this.name = str;
        this.executorThread = new Thread(this, str);
        this.executorThread.start();
    }

    @Override // java.lang.Runnable
    public final void run() {
        LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName());
        while (!this.stopped) {
            try {
                handle(this.eventInbox.take());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                LOGGER.log(Level.ERROR, "Error handling an event", e2);
            }
        }
        LOGGER.log(Level.WARN, "Stopped " + Thread.currentThread().getName());
    }

    protected abstract void handle(T t) throws Exception;

    public void add(T t) {
        if (!this.eventInbox.add(t)) {
            throw new IllegalStateException();
        }
    }

    public void stop() throws HyracksDataException {
        this.stopped = true;
        this.executorThread.interrupt();
        InvokeUtil.doUninterruptibly(() -> {
            this.executorThread.join(1000L);
        });
        int i = 0;
        while (this.executorThread.isAlive()) {
            i++;
            LOGGER.log(Level.WARN, "Failed to stop event processor after {} attempts. Interrupted exception swallowed?", Integer.valueOf(i), ExceptionUtils.fromThreadStack(this.executorThread));
            if (i == 10) {
                throw HyracksDataException.create(100, this.name);
            }
            this.executorThread.interrupt();
            InvokeUtil.doUninterruptibly(() -> {
                this.executorThread.join(1000L);
            });
        }
    }
}
