package org.apache.fluo.core.worker;

import com.codahale.metrics.Gauge;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.util.FluoExecutors;
import org.apache.fluo.core.util.Hex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/fluo/core/worker/NotificationProcessor.class */
public class NotificationProcessor implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(NotificationProcessor.class);
    private NotificationTracker tracker;
    private ThreadPoolExecutor executor;
    private Environment env;
    private Observers observers;
    private PriorityBlockingQueue<Runnable> queue;

    /* loaded from: input_file:org/apache/fluo/core/worker/NotificationProcessor$FutureNotificationTask.class */
    private static class FutureNotificationTask extends FutureTask<Void> implements Comparable<FutureNotificationTask> {
        private final Notification notification;

        public FutureNotificationTask(Notification notification, NotificationFinder notificationFinder, WorkTaskAsync workTaskAsync) {
            super(new NotificationProcessingTask(notification, notificationFinder, workTaskAsync), null);
            this.notification = notification;
        }

        @Override // java.lang.Comparable
        public int compareTo(FutureNotificationTask futureNotificationTask) {
            return Long.compare(this.notification.getTimestamp(), futureNotificationTask.notification.getTimestamp());
        }

        @Override // java.util.concurrent.FutureTask
        protected void setException(Throwable th) {
            super.setException(th);
            System.err.println("Uncaught Exception ");
            th.printStackTrace();
        }
    }

    /* loaded from: input_file:org/apache/fluo/core/worker/NotificationProcessor$NotificationProcessingTask.class */
    private static class NotificationProcessingTask implements Runnable {
        Notification notification;
        NotificationFinder notificationFinder;
        WorkTaskAsync workTask;

        NotificationProcessingTask(Notification notification, NotificationFinder notificationFinder, WorkTaskAsync workTaskAsync) {
            this.notification = notification;
            this.notificationFinder = notificationFinder;
            this.workTask = workTaskAsync;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (this.notificationFinder.shouldProcess(this.notification)) {
                    this.workTask.run();
                }
            } catch (Exception e) {
                NotificationProcessor.log.error("Failed to process work " + Hex.encNonAscii(this.notification), (Throwable) e);
            }
        }
    }

    /* loaded from: input_file:org/apache/fluo/core/worker/NotificationProcessor$NotificationTracker.class */
    private class NotificationTracker {
        private Map<RowColumn, Future<?>> queuedWork;
        private long sizeInBytes;
        private static final long MAX_SIZE = 16777216;

        private NotificationTracker() {
            this.queuedWork = new HashMap();
            this.sizeInBytes = 0L;
        }

        private long size(RowColumn rowColumn) {
            Column column = rowColumn.getColumn();
            return rowColumn.getRow().length() + column.getFamily().length() + column.getQualifier().length() + column.getVisibility().length();
        }

        public synchronized boolean add(RowColumn rowColumn, Future<?> future) {
            if (this.queuedWork.containsKey(rowColumn)) {
                return false;
            }
            while (this.sizeInBytes > MAX_SIZE) {
                try {
                    wait(1000L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            if (this.queuedWork.containsKey(rowColumn)) {
                return false;
            }
            this.queuedWork.put(rowColumn, future);
            this.sizeInBytes += size(rowColumn);
            return true;
        }

        public synchronized void remove(RowColumn rowColumn) {
            if (this.queuedWork.remove(rowColumn) != null) {
                this.sizeInBytes -= size(rowColumn);
                notify();
            }
        }

        public synchronized void clear() {
            Iterator<Future<?>> it = this.queuedWork.values().iterator();
            while (it.hasNext()) {
                it.next().cancel(false);
            }
            this.queuedWork.clear();
            this.sizeInBytes = 0L;
            notify();
        }

        public boolean requeue(RowColumn rowColumn, FutureTask<?> futureTask) {
            if (!this.queuedWork.containsKey(rowColumn)) {
                return false;
            }
            this.queuedWork.put(rowColumn, futureTask);
            return true;
        }
    }

    public NotificationProcessor(Environment environment) {
        int workerThreads = environment.getConfiguration().getWorkerThreads();
        this.env = environment;
        this.queue = new PriorityBlockingQueue<>();
        this.executor = FluoExecutors.newFixedThreadPool(workerThreads, this.queue, "ntfyProc");
        this.tracker = new NotificationTracker();
        this.observers = new Observers(environment);
        environment.getSharedResources().getMetricRegistry().register(environment.getMetricNames().getNotificationQueued(), new Gauge<Integer>() { // from class: org.apache.fluo.core.worker.NotificationProcessor.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.codahale.metrics.Gauge
            public Integer getValue() {
                return Integer.valueOf(NotificationProcessor.this.queue.size());
            }
        });
    }

    public boolean addNotification(NotificationFinder notificationFinder, Notification notification) {
        FutureNotificationTask futureNotificationTask = new FutureNotificationTask(notification, notificationFinder, new WorkTaskAsync(this, notificationFinder, this.env, notification, this.observers));
        if (!this.tracker.add(notification.getRowColumn(), futureNotificationTask)) {
            return false;
        }
        try {
            this.executor.execute(futureNotificationTask);
            return true;
        } catch (RejectedExecutionException e) {
            this.tracker.remove(notification.getRowColumn());
            throw e;
        }
    }

    public void requeueNotification(NotificationFinder notificationFinder, Notification notification) {
        FutureNotificationTask futureNotificationTask = new FutureNotificationTask(notification, notificationFinder, new WorkTaskAsync(this, notificationFinder, this.env, notification, this.observers));
        if (this.tracker.requeue(notification.getRowColumn(), futureNotificationTask)) {
            try {
                this.executor.execute(futureNotificationTask);
            } catch (RejectedExecutionException e) {
                this.tracker.remove(notification.getRowColumn());
                throw e;
            }
        }
    }

    public void notificationProcessed(Notification notification) {
        this.tracker.remove(notification.getRowColumn());
    }

    public int size() {
        return this.queue.size();
    }

    public void clear() {
        this.tracker.clear();
        this.executor.purge();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.executor.shutdownNow();
        this.observers.close();
        do {
            try {
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        } while (!this.executor.awaitTermination(1L, TimeUnit.SECONDS));
    }
}
