/*
 * Decompiled with CFR 0.152.
 */
package de.digitalcollections.workflow.engine;

import de.digitalcollections.workflow.engine.EngineStats;
import de.digitalcollections.workflow.engine.flow.Flow;
import de.digitalcollections.workflow.engine.messagebroker.MessageBroker;
import de.digitalcollections.workflow.engine.model.Message;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Engine {
    private static final Logger LOGGER = LoggerFactory.getLogger(Engine.class);
    private static final int DEFAULT_CONCURRENT_WORKERS = 5;
    private final int concurrentWorkers;
    private final MessageBroker messageBroker;
    private final Flow flow;
    private final ExecutorService executorService;
    private final Semaphore semaphore;
    private boolean running;
    private AtomicInteger activeWorkers;

    public Engine(MessageBroker messageBroker, Flow flow) throws IOException {
        this(messageBroker, flow, 5);
    }

    public Engine(MessageBroker messageBroker, Flow flow, int concurrentWorkers) throws IOException {
        this.messageBroker = messageBroker;
        this.flow = flow;
        this.concurrentWorkers = concurrentWorkers;
        this.executorService = Executors.newFixedThreadPool(concurrentWorkers);
        this.semaphore = new Semaphore(concurrentWorkers);
        this.activeWorkers = new AtomicInteger();
    }

    public void start() {
        LOGGER.debug("Starting engine...");
        this.running = true;
        while (this.running) {
            try {
                this.semaphore.acquire();
                Message message = this.messageBroker.receive();
                if (message == null) {
                    LOGGER.debug("Checking for new message (available semaphores: {}) - Queue is empty", (Object)this.semaphore.availablePermits());
                    TimeUnit.SECONDS.sleep(1L);
                    this.semaphore.release();
                    continue;
                }
                LOGGER.debug("Checking for new message (available semaphores: {}), got {}", (Object)this.semaphore.availablePermits(), (Object)message.getEnvelope().getBody());
                this.executorService.execute(() -> {
                    this.activeWorkers.incrementAndGet();
                    this.process(message);
                    this.activeWorkers.decrementAndGet();
                    this.semaphore.release();
                });
            }
            catch (IOException | InterruptedException e) {
                LOGGER.error("Got some error", (Throwable)e);
            }
        }
    }

    void process(Message message) {
        try {
            Message result = this.flow.process(message);
            if (this.flow.writesData()) {
                this.messageBroker.send(result);
            }
            this.messageBroker.ack(message);
        }
        catch (IOException | RuntimeException e) {
            try {
                LOGGER.debug("Reject message because of processing error: {}", (Object)message.getEnvelope().getBody(), (Object)e);
                this.messageBroker.reject(message);
            }
            catch (IOException e1) {
                LOGGER.error("Could not reject message" + message.getEnvelope().getBody(), (Throwable)e1);
            }
        }
    }

    public void stop() {
        this.running = false;
        LOGGER.debug("Stopping engine...");
    }

    public EngineStats getStats() {
        return new EngineStats(this.concurrentWorkers, this.activeWorkers.get(), this.semaphore.availablePermits());
    }
}

