package com.github.dbmdz.flusswerk.framework.engine;

import com.github.dbmdz.flusswerk.framework.rabbitmq.ChannelListener;
import com.github.dbmdz.flusswerk.framework.rabbitmq.RabbitClient;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/dbmdz/flusswerk/framework/engine/Engine.class */
public class Engine implements ChannelListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(Engine.class);
    private final ExecutorService executorService;
    private final List<Worker> workers;
    private final List<FlusswerkConsumer> consumers;
    private final RabbitClient rabbitClient;
    private final Semaphore startOnlyOnce;

    public Engine(RabbitClient rabbitClient, List<FlusswerkConsumer> list, List<Worker> list2) {
        this(rabbitClient, list, list2, Executors.newFixedThreadPool(list2.size()));
    }

    public Engine(RabbitClient rabbitClient, List<FlusswerkConsumer> list, List<Worker> list2, ExecutorService executorService) {
        this.rabbitClient = rabbitClient;
        this.executorService = executorService;
        this.workers = list2;
        this.consumers = list;
        this.startOnlyOnce = new Semaphore(1);
    }

    public void start() {
        if (!this.startOnlyOnce.tryAcquire()) {
            LOGGER.error("Engine had already been started once. Starting again is not possible.");
            return;
        }
        LOGGER.debug("Starting worker threads");
        for (Worker worker : this.workers) {
            LOGGER.debug("Starting worker {}", worker);
            this.executorService.execute(worker);
        }
        LOGGER.debug("Starting consumers");
        Iterator<FlusswerkConsumer> it = this.consumers.iterator();
        while (it.hasNext()) {
            this.rabbitClient.consume(it.next(), false);
        }
        this.rabbitClient.addChannelListener(this);
    }

    public void stop() {
        this.consumers.forEach(flusswerkConsumer -> {
            try {
                this.rabbitClient.cancel(flusswerkConsumer.getConsumerTag());
            } catch (IOException e) {
                LOGGER.error("Could not cancel consumer", e);
            }
        });
        PriorityBlockingQueue priorityBlockingQueue = new PriorityBlockingQueue();
        ArrayList arrayList = new ArrayList();
        priorityBlockingQueue.drainTo(arrayList);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            long deliveryTag = ((Task) it.next()).getMessage().getEnvelope().getDeliveryTag();
            try {
                this.rabbitClient.nack(deliveryTag, false, true);
            } catch (IOException e) {
                LOGGER.error("Could not NACK message with delivery tag {}", Long.valueOf(deliveryTag), e);
            }
        }
        this.workers.forEach((v0) -> {
            v0.stop();
        });
        this.executorService.shutdown();
        try {
            if (!this.executorService.awaitTermination(5L, TimeUnit.MINUTES)) {
                LOGGER.error("Not all workers did terminate during shutdown window of 5 minutes");
            }
        } catch (InterruptedException e2) {
            LOGGER.error("Timeout awaiting worker shutdown after 5 minutes", e2);
        }
    }

    @Override // com.github.dbmdz.flusswerk.framework.rabbitmq.ChannelListener
    public void handleReset() {
        LOGGER.debug("Register consumers again after channel reset");
        Iterator<FlusswerkConsumer> it = this.consumers.iterator();
        while (it.hasNext()) {
            this.rabbitClient.consume(it.next(), false);
        }
    }
}
