package com.mt.common.port.adapter.messaging;

import com.mt.common.CommonConstant;
import com.mt.common.domain.CommonDomainRegistry;
import com.mt.common.domain.model.domain_event.EventStreamService;
import com.mt.common.domain.model.domain_event.StoredEvent;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/mt/common/port/adapter/messaging/RabbitMQEventStreamService.class */
public class RabbitMQEventStreamService implements EventStreamService {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQEventStreamService.class);

    @Override // com.mt.common.domain.model.domain_event.EventStreamService
    public void subscribe(String str, boolean z, @Nullable String str2, Consumer<StoredEvent> consumer, String... strArr) {
        String str3 = str + "." + (z ? "internal" : "external") + ".";
        String l = str2 != null ? str2 : Long.toString(Long.valueOf(CommonDomainRegistry.getUniqueIdGeneratorService().id()).longValue(), 36);
        DeliverCallback deliverCallback = (str4, delivery) -> {
            log.debug("mq message received");
            consumer.accept((StoredEvent) CommonDomainRegistry.getCustomObjectSerializer().deserialize(new String(delivery.getBody(), StandardCharsets.UTF_8), StoredEvent.class));
            log.debug("mq message consumed");
        };
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        try {
            Channel createChannel = connectionFactory.newConnection().createChannel();
            createChannel.queueDeclare(l, true, false, false, (Map) null);
            for (String str5 : strArr) {
                createChannel.queueBind(l, CommonConstant.EXCHANGE_NAME, str3 + str5);
            }
            createChannel.basicConsume(l, true, deliverCallback, str6 -> {
            });
        } catch (IOException | TimeoutException e) {
            log.error("unable create queue for {} with routing key {} and queue name {}", new Object[]{str, str3, l, e});
        }
    }

    @Override // com.mt.common.domain.model.domain_event.EventStreamService
    public void next(String str, boolean z, String str2, StoredEvent storedEvent) {
        String str3 = str + "." + (z ? "internal" : "external") + "." + str2;
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        try {
            Connection newConnection = connectionFactory.newConnection();
            try {
                Channel createChannel = newConnection.createChannel();
                try {
                    createChannel.exchangeDeclare(CommonConstant.EXCHANGE_NAME, "topic");
                    createChannel.basicPublish(CommonConstant.EXCHANGE_NAME, str3, (AMQP.BasicProperties) null, CommonDomainRegistry.getCustomObjectSerializer().serialize(storedEvent).getBytes(StandardCharsets.UTF_8));
                    if (createChannel != null) {
                        createChannel.close();
                    }
                    if (newConnection != null) {
                        newConnection.close();
                    }
                } catch (Throwable th) {
                    if (createChannel != null) {
                        try {
                            createChannel.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException | TimeoutException e) {
            log.error("unable to publish message to rabbitmq", e);
        }
    }
}
