package com.github.dbmdz.flusswerk.framework.engine;

import com.github.dbmdz.flusswerk.framework.exceptions.SkipProcessingException;
import com.github.dbmdz.flusswerk.framework.exceptions.StopProcessingException;
import com.github.dbmdz.flusswerk.framework.flow.Flow;
import com.github.dbmdz.flusswerk.framework.model.Message;
import com.github.dbmdz.flusswerk.framework.monitoring.FlusswerkMetrics;
import com.github.dbmdz.flusswerk.framework.rabbitmq.MessageBroker;
import com.github.dbmdz.flusswerk.framework.reporting.ProcessReport;
import com.github.dbmdz.flusswerk.framework.reporting.Tracing;
import java.io.IOException;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:com/github/dbmdz/flusswerk/framework/engine/Worker.class */
public class Worker implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Worker.class);
    private final Flow flow;
    private final FlusswerkMetrics metrics;
    private final MessageBroker messageBroker;
    private final ProcessReport processReport;
    private final PriorityBlockingQueue<Task> queue;
    private boolean running = true;
    private final Tracing tracing;

    public Worker(Flow flow, FlusswerkMetrics flusswerkMetrics, MessageBroker messageBroker, ProcessReport processReport, PriorityBlockingQueue<Task> priorityBlockingQueue, Tracing tracing) {
        this.flow = flow;
        this.messageBroker = messageBroker;
        this.metrics = flusswerkMetrics;
        this.processReport = processReport;
        this.queue = priorityBlockingQueue;
        this.tracing = tracing;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            step();
        }
    }

    void step() {
        try {
            Task poll = this.queue.poll(1L, TimeUnit.SECONDS);
            if (poll == null) {
                return;
            }
            executeProcessing(poll.getMessage());
            poll.done();
        } catch (InterruptedException e) {
            LOGGER.debug("Interrupt while waiting for message", e);
        }
    }

    void executeProcessing(Message message) {
        this.metrics.incrementActiveWorkers();
        this.tracing.register(message.getTracing());
        process(message);
        this.tracing.deregister();
        this.metrics.decrementActiveWorkers();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void process(Message message) {
        Collection outgoingMessages;
        SkipProcessingException skipProcessingException = null;
        try {
            outgoingMessages = this.flow.process(message);
            MDC.put("status", "success");
        } catch (SkipProcessingException e) {
            outgoingMessages = e.getOutgoingMessages();
            outgoingMessages.stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).filter(message2 -> {
                return message2.getTracing() == null || message2.getTracing().isEmpty();
            }).forEach(message3 -> {
                message3.setTracing(this.tracing.tracingPath());
            });
            skipProcessingException = e;
            MDC.put("status", "skip");
            MDC.put("skipReason", e.getMessage());
        } catch (StopProcessingException e2) {
            MDC.put("status", "stop");
            fail(message, e2);
            return;
        } catch (RuntimeException e3) {
            MDC.put("status", "retry");
            retryOrFail(message, e3);
            return;
        }
        try {
            if (!outgoingMessages.isEmpty()) {
                this.messageBroker.send((Collection<? extends Message>) outgoingMessages);
            }
            this.messageBroker.ack(message);
            if (skipProcessingException != null) {
                this.processReport.reportSkip(message, skipProcessingException);
            } else {
                this.processReport.reportSuccess(message);
            }
        } catch (Exception e4) {
            fail(message, new StopProcessingException("Could not finish message handling").causedBy(e4));
        }
    }

    private void retryOrFail(Message message, RuntimeException runtimeException) {
        try {
            if (this.messageBroker.reject(message)) {
                this.processReport.reportReject(message, runtimeException);
            } else {
                this.processReport.reportFailAfterMaxRetries(message, runtimeException);
            }
        } catch (IOException e) {
            LOGGER.error("Could not reject message" + message.getEnvelope().getBody(), e);
        }
    }

    private void fail(Message message, StopProcessingException stopProcessingException) {
        try {
            this.processReport.reportFail(message, stopProcessingException);
            this.messageBroker.fail(message);
        } catch (IOException e) {
            LOGGER.error("Could not fail message" + message.getEnvelope().getBody(), e);
        }
    }

    public void stop() {
        this.running = false;
        LOGGER.debug("Stopping engine...");
    }
}
