/*
 * Decompiled with CFR 0.152.
 */
package cn.herodotus.stirrup.transform.emqx.config;

import cn.herodotus.stirrup.message.mqtt.annotation.ConditionalOnMqttEnabled;
import cn.herodotus.stirrup.message.mqtt.config.MessageMqttConfiguration;
import cn.herodotus.stirrup.transform.emqx.aggregator.MonitorAggregatingMessageCorrelationIdFactory;
import cn.herodotus.stirrup.transform.emqx.converter.EmqxStatsIndicatorConverter;
import cn.herodotus.stirrup.transform.emqx.outbound.Influxdb2StoringMessageHandler;
import cn.herodotus.stirrup.transform.emqx.outbound.MonitorAggregatingMessageHandler;
import cn.herodotus.stirrup.tsdb.influxdb2.annotation.ConditionalOnInfluxdb2Enabled;
import cn.herodotus.stirrup.tsdb.influxdb2.configuration.TsdbInfluxdb2Configuration;
import cn.herodotus.stirrup.tsdb.influxdb2.pool.Influxdb2ClientObjectPool;
import jakarta.annotation.PostConstruct;
import org.eclipse.paho.mqttv5.client.IMqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.ClientManager;
import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

@Configuration(proxyBeanMethods=false)
@ConditionalOnInfluxdb2Enabled
@ConditionalOnMqttEnabled
@Import(value={MessageMqttConfiguration.class, TsdbInfluxdb2Configuration.class})
public class EmqxSystemTopicToInfluxdb2FlowConfiguration {
    private static final Logger log = LoggerFactory.getLogger(EmqxSystemTopicToInfluxdb2FlowConfiguration.class);
    private static final String[] EMQX_MONITOR_TOPICS = new String[]{"$SYS/#"};

    @PostConstruct
    public void postConstruct() {
        log.debug("[Herodotus] |- SDK [Emqx System Topic To Influxdb2 Flow] Auto Configure.");
    }

    @Bean(value={"emqxDefaultMonitorMqttInboundChannel"})
    public MessageChannel emqxMonitorInboundChannel() {
        return (MessageChannel)MessageChannels.publishSubscribe().getObject();
    }

    @Bean
    public MonitorAggregatingMessageCorrelationIdFactory monitorAggregatingMessageCorrelationIdFactory() {
        MonitorAggregatingMessageCorrelationIdFactory factory = new MonitorAggregatingMessageCorrelationIdFactory();
        log.trace("[Herodotus] |- Bean [Monitor Aggregating Message Correlation Id Factory] Auto Configure.");
        return factory;
    }

    @Bean
    public IntegrationFlow emqxSysTopicToInfluxdbFlow(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager, @Qualifier(value="emqxDefaultMonitorMqttInboundChannel") MessageChannel messageChannel, MonitorAggregatingMessageCorrelationIdFactory monitorAggregatingMessageCorrelationIdFactory, Influxdb2ClientObjectPool influxdb2ClientObjectPool) {
        Mqttv5PahoMessageDrivenChannelAdapter messageProducer = new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, EMQX_MONITOR_TOPICS);
        messageProducer.setPayloadType(String.class);
        messageProducer.setManualAcks(false);
        messageProducer.setOutputChannel(messageChannel);
        return ((IntegrationFlowBuilder)((IntegrationFlowBuilder)IntegrationFlow.from((MessageProducerSupport)messageProducer).handle((MessageHandler)new MonitorAggregatingMessageHandler(monitorAggregatingMessageCorrelationIdFactory))).handle((MessageHandler)new Influxdb2StoringMessageHandler(influxdb2ClientObjectPool, new EmqxStatsIndicatorConverter()))).get();
    }
}

