package org.aanguita.jacuzzi.queues;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.aanguita.jacuzzi.concurrency.monitor.Monitor;
import org.aanguita.jacuzzi.concurrency.monitor.StateSolver;

/* loaded from: input_file:org/aanguita/jacuzzi/queues/OnDemandQueueProcessor.class */
public class OnDemandQueueProcessor<T> implements StateSolver {
    private static final int DEFAULT_QUEUE_CAPACITY = 1024;
    private static final boolean MESSAGE_FAIRNESS = true;
    private final ArrayBlockingQueue<T> eventQueue;
    private final List<Monitor> monitors;
    private final Consumer<T> consumer;

    public OnDemandQueueProcessor(Consumer<T> consumer) {
        this(consumer, DEFAULT_QUEUE_CAPACITY);
    }

    public OnDemandQueueProcessor(Consumer<T> consumer, int i) {
        this(consumer, i, MESSAGE_FAIRNESS);
    }

    public OnDemandQueueProcessor(Consumer<T> consumer, int i, int i2) {
        if (i2 < MESSAGE_FAIRNESS) {
            throw new IllegalArgumentException("maxThreads must be a positive integer, received " + i2);
        }
        this.eventQueue = new ArrayBlockingQueue<>(i, true);
        this.monitors = initializeMonitors(i2);
        this.consumer = consumer;
    }

    private List<Monitor> initializeMonitors(int i) {
        return (List) IntStream.range(0, i).mapToObj(i2 -> {
            return new Monitor(this);
        }).collect(Collectors.toList());
    }

    public void addEvent(T t) {
        try {
            this.eventQueue.put(t);
        } catch (InterruptedException e) {
            addEvent(t);
        }
        wakeUpAMonitor();
    }

    private void wakeUpAMonitor() {
        for (Monitor monitor : this.monitors) {
            if (monitor.isStateSolved()) {
                monitor.stateChange();
                return;
            }
        }
        this.monitors.get(0).stateChange();
    }

    @Override // org.aanguita.jacuzzi.concurrency.monitor.StateSolver
    public boolean solveState() {
        T poll;
        synchronized (this) {
            poll = this.eventQueue.poll();
        }
        if (poll == null) {
            return true;
        }
        this.consumer.accept(poll);
        return false;
    }

    public void blockUntilEventsAreSolved() {
        this.monitors.forEach((v0) -> {
            v0.blockUntilStateIsSolved();
        });
    }

    public void blockUntilEventsAreSolved(long j) throws TimeoutException {
        Iterator<Monitor> it = this.monitors.iterator();
        while (it.hasNext()) {
            it.next().blockUntilStateIsSolved(j);
        }
    }

    public void stop() {
        this.monitors.forEach((v0) -> {
            v0.stop();
        });
    }
}
