/*
 * Decompiled with CFR 0.152.
 */
package io.basestar.event;

import io.basestar.event.Event;
import io.basestar.event.Handler;
import io.basestar.event.Pump;
import io.basestar.event.Receiver;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultPump
implements Pump {
    private static final Logger log = LoggerFactory.getLogger(DefaultPump.class);
    private static final int INITIAL_DELAY_MILLIS = 500;
    private static final int MIN_DELAY_MILLIS = 1;
    private static final int MAX_DELAY_MILLIS = 500;
    private static final int SHUTDOWN_WAIT_SECONDS = 5;
    private final Receiver receiver;
    private final Handler<Event> handler;
    private final int minThreads;
    private final int maxThreads;
    private final ScheduledExecutorService executorService;
    private final Counter total = Metrics.counter((String)"events.pump.total", (String[])new String[0]);
    private final Object lock = new Object();
    private final Random random = new Random();
    private volatile int count;

    public DefaultPump(Receiver receiver, Handler<Event> handler, int minThreads, int maxThreads) {
        this.receiver = receiver;
        this.handler = handler;
        this.minThreads = minThreads;
        this.maxThreads = maxThreads;
        this.executorService = Executors.newScheduledThreadPool(minThreads);
        Metrics.gauge((String)"events.pump.threads", (Object)this, t -> t.count);
    }

    @Override
    public void start() {
        for (int i = 0; i != this.minThreads; ++i) {
            this.another(500L + this.delay());
        }
    }

    @Override
    public void flush() {
        Integer results;
        do {
            results = this.receiver.receive(this.handler).join();
            assert (results != null);
        } while (results != 0);
    }

    private void another() {
        this.another(this.delay());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void another(long delay) {
        Object object = this.lock;
        synchronized (object) {
            if (this.count < this.maxThreads) {
                ++this.count;
                this.executorService.schedule(() -> {
                    while (true) {
                        try {
                            while (true) {
                                Integer results = this.receiver.receive(this.handler).join();
                                assert (results != null);
                                this.total.increment((double)results.intValue());
                                Object object = this.lock;
                                synchronized (object) {
                                    if (Thread.interrupted()) {
                                        --this.count;
                                        return;
                                    }
                                    if (results == 0) {
                                        if (this.count > this.minThreads) {
                                            --this.count;
                                            return;
                                        }
                                    } else {
                                        this.another();
                                    }
                                }
                            }
                        }
                        catch (Throwable e) {
                            log.error("Uncaught in event pump", e);
                            continue;
                        }
                        break;
                    }
                }, delay, TimeUnit.MILLISECONDS);
            }
        }
    }

    private long delay() {
        return 1L + (long)this.random.nextInt(499);
    }

    @Override
    public void stop() {
        this.executorService.shutdown();
        try {
            if (!this.executorService.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.executorService.shutdownNow();
                if (!this.executorService.awaitTermination(5L, TimeUnit.SECONDS)) {
                    log.error("Failed to shut down executor service");
                }
            }
        }
        catch (InterruptedException e) {
            this.executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

