/*
 * Decompiled with CFR 0.152.
 */
package org.apache.eventmesh.http.demo.sub.service;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import javax.annotation.PreDestroy;
import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
import org.apache.eventmesh.client.http.consumer.EventMeshHttpConsumer;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.ThreadUtils;
import org.apache.eventmesh.http.demo.pub.eventmeshmessage.AsyncPublishInstance;
import org.apache.eventmesh.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

@Component
public class SubService
implements InitializingBean {
    public static Logger logger = LoggerFactory.getLogger(SubService.class);
    private EventMeshHttpConsumer eventMeshHttpConsumer;
    final Properties properties = Utils.readPropertiesFile("application.properties");
    final List<SubscriptionItem> topicList = Lists.newArrayList((Object[])new SubscriptionItem[]{new SubscriptionItem("TEST-TOPIC-HTTP-ASYNC", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC)});
    final String localIp = IPUtils.getLocalAddress();
    final String localPort = this.properties.getProperty("server.port");
    final String eventMeshIp = this.properties.getProperty("eventmesh.ip");
    final String eventMeshHttpPort = this.properties.getProperty("eventmesh.http.port");
    final String url = "http://" + this.localIp + ":" + this.localPort + "/sub/test";
    final String env = "P";
    final String idc = "FT";
    final String subsys = "1234";
    private CountDownLatch countDownLatch = new CountDownLatch(AsyncPublishInstance.messageSize);

    public void afterPropertiesSet() throws Exception {
        String eventMeshIPPort = this.eventMeshIp + ":" + this.eventMeshHttpPort;
        EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder().liteEventMeshAddr(eventMeshIPPort).consumerGroup("EventMeshTest-consumerGroup").env("P").idc("FT").ip(IPUtils.getLocalAddress()).sys("1234").pid(String.valueOf(ThreadUtils.getPID())).build();
        this.eventMeshHttpConsumer = new EventMeshHttpConsumer(eventMeshClientConfig);
        this.eventMeshHttpConsumer.heartBeat(this.topicList, this.url);
        this.eventMeshHttpConsumer.subscribe(this.topicList, this.url);
        Thread stopThread = new Thread(() -> {
            try {
                this.countDownLatch.await();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            logger.info("stopThread start....");
            System.exit(0);
        });
        stopThread.start();
    }

    @PreDestroy
    public void cleanup() {
        logger.info("start destory ....");
        try {
            ArrayList<String> unSubList = new ArrayList<String>();
            for (SubscriptionItem item : this.topicList) {
                unSubList.add(item.getTopic());
            }
            this.eventMeshHttpConsumer.unsubscribe(unSubList, this.url);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        try {
            EventMeshHttpConsumer ignore = this.eventMeshHttpConsumer;
            Iterator<SubscriptionItem> iterator = null;
            if (ignore != null) {
                if (iterator != null) {
                    try {
                        ignore.close();
                    }
                    catch (Throwable throwable) {
                        ((Throwable)((Object)iterator)).addSuppressed(throwable);
                    }
                } else {
                    ignore.close();
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        logger.info("end destory.");
    }

    public void consumeMessage(String msg) {
        logger.info("consume message: {}", (Object)msg);
        this.countDownLatch.countDown();
        logger.info("remaining number: {} of messages to be consumed", (Object)this.countDownLatch.getCount());
    }
}

