package com.github.dbmdz.flusswerk.framework.engine;

import com.github.dbmdz.flusswerk.framework.jackson.FlusswerkObjectMapper;
import com.github.dbmdz.flusswerk.framework.model.IncomingMessageType;
import com.github.dbmdz.flusswerk.framework.model.Message;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;
import net.logstash.logback.argument.StructuredArguments;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/dbmdz/flusswerk/framework/engine/FlusswerkConsumer.class */
public class FlusswerkConsumer extends DefaultConsumer {
    public static final FlusswerkObjectMapper FALLBACK_MAPPER = new FlusswerkObjectMapper(new IncomingMessageType());
    private static final Logger LOGGER = LoggerFactory.getLogger(FlusswerkConsumer.class);
    private final Semaphore availableWorkers;
    private final Channel channel;
    private final FlusswerkObjectMapper flusswerkObjectMapper;
    private final PriorityBlockingQueue<Task> taskQueue;
    private final int priority;
    private final String inputQueue;

    public FlusswerkConsumer(Semaphore semaphore, Channel channel, FlusswerkObjectMapper flusswerkObjectMapper, String str, int i, PriorityBlockingQueue<Task> priorityBlockingQueue) {
        super(channel);
        this.availableWorkers = semaphore;
        this.channel = channel;
        this.flusswerkObjectMapper = flusswerkObjectMapper;
        this.inputQueue = str;
        this.priority = i;
        this.taskQueue = priorityBlockingQueue;
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        try {
            this.availableWorkers.acquire();
            String str2 = new String(bArr, StandardCharsets.UTF_8);
            try {
                Message deserialize = this.flusswerkObjectMapper.deserialize(str2);
                deserialize.getEnvelope().setSource(this.inputQueue);
                deserialize.getEnvelope().setDeliveryTag(envelope.getDeliveryTag());
                PriorityBlockingQueue<Task> priorityBlockingQueue = this.taskQueue;
                int i = this.priority;
                Semaphore semaphore = this.availableWorkers;
                Objects.requireNonNull(semaphore);
                priorityBlockingQueue.put(new Task(deserialize, i, semaphore::release));
            } catch (Exception e) {
                List<String> list = null;
                try {
                    list = FALLBACK_MAPPER.deserialize(str2).getTracing();
                } catch (Exception e2) {
                    LOGGER.error("Deserialize message fallback failed, too", e2);
                }
                if (list != null) {
                    LOGGER.error("Could not deserialize message", StructuredArguments.kv("tracing", list), e);
                }
                this.channel.basicAck(envelope.getDeliveryTag(), false);
                this.availableWorkers.release();
            }
        } catch (InterruptedException e3) {
            LOGGER.warn("FlusswerkConsumer interrupted while waiting for free worker", e3);
            this.channel.basicReject(envelope.getDeliveryTag(), true);
        }
    }

    public String getInputQueue() {
        return this.inputQueue;
    }
}
