package ru.tinkoff.kora.jms;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import ru.tinkoff.kora.application.graph.Lifecycle;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.jms.telemetry.JmsConsumerTelemetry;
import ru.tinkoff.kora.jms.telemetry.JmsConsumerTelemetryFactory;
import ru.tinkoff.kora.logging.common.arg.StructuredArgument;

/* loaded from: input_file:ru/tinkoff/kora/jms/JmsMessageListenerContainer.class */
public class JmsMessageListenerContainer implements Lifecycle {
    private static final ConcurrentHashMap<String, AtomicInteger> threadCounters = new ConcurrentHashMap<>();
    private final ConnectionFactory connectionFactory;
    private final JmsListenerContainerConfig config;
    private final JmsMessageListener messageListener;
    private final JmsConsumerTelemetry telemetry;
    private volatile ExecutorService executorService;
    private final AtomicBoolean isStarted = new AtomicBoolean(false);
    private final Logger log = LoggerFactory.getLogger(JmsMessageListenerContainer.class);

    public JmsMessageListenerContainer(ConnectionFactory connectionFactory, JmsListenerContainerConfig jmsListenerContainerConfig, JmsMessageListener jmsMessageListener, JmsConsumerTelemetryFactory jmsConsumerTelemetryFactory) {
        this.connectionFactory = connectionFactory;
        this.config = jmsListenerContainerConfig;
        this.messageListener = jmsMessageListener;
        this.telemetry = jmsConsumerTelemetryFactory.get(jmsListenerContainerConfig.telemetry(), jmsListenerContainerConfig.queueName());
    }

    public void init() {
        if (!this.isStarted.compareAndSet(false, true) || this.config.threads() == 0) {
            return;
        }
        this.executorService = Executors.newFixedThreadPool(this.config.threads());
        for (int i = 0; i < this.config.threads(); i++) {
            this.executorService.submit(this::connectLoop);
        }
    }

    public void release() {
        if (!this.isStarted.compareAndSet(true, false) || this.config.threads() == 0) {
            return;
        }
        this.executorService.shutdownNow();
        try {
            this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
            this.executorService = null;
        } catch (InterruptedException e) {
        }
    }

    private void connectLoop() {
        Thread.currentThread().setName("jms-" + this.config.queueName() + "-" + threadCounters.computeIfAbsent(this.config.queueName(), str -> {
            return new AtomicInteger();
        }).getAndIncrement());
        this.log.info("listening...");
        while (this.isStarted.get()) {
            try {
                this.log.trace("Trying new connection");
                Connection createConnection = this.connectionFactory.createConnection();
                try {
                    Session createSession = createConnection.createSession(true, 0);
                    try {
                        createConnection.start();
                        pollLoop(createSession);
                        if (createSession != null) {
                            createSession.close();
                        }
                        if (createConnection != null) {
                            createConnection.close();
                        }
                    } catch (Throwable th) {
                        if (createSession != null) {
                            try {
                                createSession.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                        break;
                    }
                } catch (Throwable th3) {
                    if (createConnection != null) {
                        try {
                            createConnection.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                    break;
                }
            } catch (JMSException e) {
                this.log.info("Jms exception caught while processing message: {}", e.toString(), e);
                try {
                    Thread.sleep(60000L);
                } catch (InterruptedException e2) {
                    this.log.trace("Jms thread interrupted");
                }
            } catch (Exception e3) {
                this.log.trace("Unknown ex");
                try {
                    Thread.sleep(60000L);
                } catch (InterruptedException e4) {
                    this.log.trace("Jms thread interrupted");
                }
            }
        }
        this.log.info("Consumer stopped");
    }

    private void pollLoop(Session session) throws JMSException {
        MessageConsumer createConsumer = session.createConsumer(session.createQueue(this.config.queueName()), (String) null);
        while (this.isStarted.get()) {
            try {
                try {
                    try {
                        Message receiveNoWait = createConsumer.receiveNoWait();
                        System.nanoTime();
                        if (receiveNoWait != null) {
                            JmsConsumerTelemetry.JmsConsumerTelemetryContext jmsConsumerTelemetryContext = this.telemetry.get(receiveNoWait);
                            try {
                                try {
                                    if (this.log.isDebugEnabled()) {
                                        String text = JmsUtils.text(receiveNoWait);
                                        String obj = JmsUtils.dumpHeaders(receiveNoWait).toString();
                                        this.log.debug(StructuredArgument.marker("jmsInputMessage", jsonGenerator -> {
                                            jsonGenerator.writeStartObject();
                                            jsonGenerator.writeStringField("headers", obj);
                                            jsonGenerator.writeStringField("body", text);
                                            jsonGenerator.writeEndObject();
                                        }), "JmsListener.message");
                                    }
                                    this.messageListener.onMessage(session, receiveNoWait);
                                    session.commit();
                                    jmsConsumerTelemetryContext.close(null);
                                    Context.clear();
                                    MDC.clear();
                                } catch (Throwable th) {
                                    Context.clear();
                                    MDC.clear();
                                    throw th;
                                    break;
                                }
                            } catch (Exception e) {
                                jmsConsumerTelemetryContext.close(e);
                                throw e;
                                break;
                            }
                        } else {
                            this.log.trace("No message was received");
                            Thread.sleep(1000L);
                            session.commit();
                        }
                    } catch (JMSException e2) {
                        session.rollback();
                        throw e2;
                    }
                } catch (InterruptedException e3) {
                    this.log.trace("Jms thread interrupted");
                } catch (Exception e4) {
                    this.log.debug("Exception caught while processing message", e4);
                    session.rollback();
                }
            } catch (Throwable th2) {
                if (createConsumer != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                }
                throw th2;
            }
        }
        this.log.trace("Poll loop end");
        if (createConsumer != null) {
            createConsumer.close();
        }
    }
}
