/*
 * Decompiled with CFR 0.152.
 */
package de.dentrassi.asyncapi.jms;

import de.dentrassi.asyncapi.ListenerHandle;
import de.dentrassi.asyncapi.Message;
import de.dentrassi.asyncapi.Subscribe;
import de.dentrassi.asyncapi.jms.JmsPayloadFormat;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsSubscriber<T extends Message<P>, P extends Serializable>
implements Subscribe<T> {
    private static final Logger logger = LoggerFactory.getLogger(JmsSubscriber.class);
    private final Class<T> clazz;
    private final Class<P> payloadClazz;
    private final JmsPayloadFormat payloadFormat;
    private final String topic;
    private final Connection connection;
    private final Executor executor;

    public JmsSubscriber(Class<T> clazz, Class<P> payloadClazz, JmsPayloadFormat payloadFormat, String topic, Connection connection, Executor executor) {
        this.clazz = clazz;
        this.payloadClazz = payloadClazz;
        this.payloadFormat = payloadFormat;
        this.topic = topic;
        this.connection = connection;
        this.executor = executor;
    }

    public ListenerHandle subscribe(Consumer<T> consumer) {
        Objects.requireNonNull(consumer);
        HandleImpl handle = new HandleImpl(consumer);
        this.executor.execute(() -> handle.subscribe());
        return handle;
    }

    private class HandleImpl
    extends CompletableFuture<Void>
    implements ListenerHandle {
        private Session session;
        private MessageConsumer consumer;
        private final Consumer<T> handler;

        public HandleImpl(Consumer<T> consumer) {
            this.handler = consumer;
        }

        public void close() throws Exception {
            try {
                this.get();
            }
            catch (Exception exception) {
                // empty catch block
            }
            LinkedList<Exception> errors = null;
            if (this.consumer != null) {
                try {
                    this.consumer.close();
                }
                catch (Exception e) {
                    if (errors == null) {
                        errors = new LinkedList<Exception>();
                    }
                    errors.add(e);
                }
            }
            if (this.session != null) {
                try {
                    this.session.close();
                }
                catch (Exception e) {
                    if (errors == null) {
                        errors = new LinkedList();
                    }
                    errors.add(e);
                }
            }
            if (errors != null && !errors.isEmpty()) {
                Exception e = (Exception)errors.pollFirst();
                errors.stream().forEach(e::addSuppressed);
                throw e;
            }
        }

        public void subscribe() {
            try {
                this.session = JmsSubscriber.this.connection.createSession(2);
                Topic destination = this.session.createTopic(JmsSubscriber.this.topic);
                this.consumer = this.session.createConsumer((Destination)destination);
                this.consumer.setMessageListener(this::processMessage);
                this.complete(null);
            }
            catch (Exception e) {
                this.completeExceptionally(e);
            }
        }

        protected void processMessage(javax.jms.Message message) {
            logger.debug("Received message: {}", (Object)message);
            try {
                Object m = JmsSubscriber.this.payloadFormat.decode(JmsSubscriber.this.clazz, JmsSubscriber.this.payloadClazz, message);
                if (m != null) {
                    this.handler.accept(m);
                }
                message.acknowledge();
            }
            catch (Exception e) {
                logger.debug("Failed to handle message", (Throwable)e);
                try {
                    this.session.recover();
                }
                catch (JMSException jMSException) {
                    // empty catch block
                }
            }
        }
    }
}

