/*
 * Decompiled with CFR 0.152.
 */
package de.dentrassi.asyncapi.jms;

import de.dentrassi.asyncapi.Message;
import de.dentrassi.asyncapi.Publish;
import de.dentrassi.asyncapi.jms.JmsPayloadFormat;
import de.dentrassi.asyncapi.jms.JmsSubscriber;
import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

public abstract class AbstractJmsServiceImpl {
    private final Connection connection;
    private final Executor executor;
    private final JmsPayloadFormat payloadFormat;
    private final Function<String, String> topicMapper;

    public AbstractJmsServiceImpl(Connection connection, Executor executor, JmsPayloadFormat payloadFormat, String baseTopic) {
        this.connection = connection;
        this.executor = executor;
        this.payloadFormat = payloadFormat;
        this.topicMapper = baseTopic == null || baseTopic.isEmpty() ? topic -> topic : topic -> baseTopic + "." + topic;
    }

    protected <T extends Message<?>> Publish<T> createPublisher(final String localTopicName) {
        return new Publish<T>(){

            public CompletionStage<?> publish(T message) {
                return AbstractJmsServiceImpl.this.publishMessage(AbstractJmsServiceImpl.this.fullTopic(localTopicName), (Message<?>)message);
            }
        };
    }

    protected CompletionStage<?> publishMessage(String topic, Message<?> message) {
        CompletableFuture future = new CompletableFuture();
        this.executor.execute(() -> {
            try {
                this.processPublishMessage(topic, message);
                future.complete(null);
            }
            catch (Exception e) {
                future.completeExceptionally(e);
            }
        });
        return future;
    }

    protected void processPublishMessage(String topic, Message<?> message) throws Exception {
        try (Session session = this.connection.createSession();){
            Topic destination = session.createTopic(topic);
            javax.jms.Message jmsMessage = this.payloadFormat.encode(session, message);
            try (MessageProducer producer = session.createProducer((Destination)destination);){
                producer.send(jmsMessage);
            }
        }
    }

    protected <M extends Message<P>, P extends Serializable> JmsSubscriber<M, P> createSubscriber(String localTopicName, Class<M> clazz, Class<P> payloadClazz) {
        return new JmsSubscriber<M, P>(clazz, payloadClazz, this.payloadFormat, this.fullTopic(localTopicName), this.connection, this.executor);
    }

    protected String fullTopic(String topic) {
        return this.topicMapper.apply(topic);
    }
}

