package org.apache.inlong.sdk.sort.manager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.sdk.sort.api.ClientContext;
import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum;
import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
import org.apache.inlong.sdk.sort.api.TopicFetcher;
import org.apache.inlong.sdk.sort.api.TopicFetcherBuilder;
import org.apache.inlong.sdk.sort.api.TopicManager;
import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator;
import org.apache.inlong.sdk.sort.util.PeriodicTask;
import org.apache.inlong.sdk.sort.util.StringUtil;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.class */
public class InlongMultiTopicManager extends TopicManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongMultiTopicManager.class);
    private final Map<String, List<TopicFetcher>> pulsarFetchers;
    private final Map<String, List<TopicFetcher>> kafkaFetchers;
    private final Map<String, List<TopicFetcher>> tubeFetchers;
    private final Map<String, TopicFetcher> allFetchers;
    private Set<String> allTopics;
    private final PeriodicTask updateMetaDataWorker;
    private boolean stopAssign;
    private int consumerSize;

    /* loaded from: input_file:org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager$UpdateMetaDataThread.class */
    private class UpdateMetaDataThread extends PeriodicTask {
        public UpdateMetaDataThread(long j, TimeUnit timeUnit) {
            super(j, timeUnit, InlongMultiTopicManager.this.context.getConfig());
        }

        @Override // org.apache.inlong.sdk.sort.util.PeriodicTask
        protected void doWork() {
            this.logger.debug("InLongTopicManagerImpl doWork");
            if (InlongMultiTopicManager.this.stopAssign) {
                this.logger.warn("assign is stopped");
                return;
            }
            if (InlongMultiTopicManager.this.queryConsumeConfig == null) {
                this.logger.error("subscribedMetaDataInfo is null");
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            InlongMultiTopicManager.this.context.getDefaultStateCounter().addRequestManagerTimes(1L);
            ConsumeConfig queryCurrentConsumeConfig = InlongMultiTopicManager.this.queryConsumeConfig.queryCurrentConsumeConfig(InlongMultiTopicManager.this.context.getConfig().getSortTaskId());
            InlongMultiTopicManager.this.context.getDefaultStateCounter().addRequestManagerTimeCost(System.currentTimeMillis() - currentTimeMillis);
            if (queryCurrentConsumeConfig != null) {
                InlongMultiTopicManager.this.handleUpdatedConsumeConfig(queryCurrentConsumeConfig.getTopics());
            } else {
                this.logger.warn("subscribedInfo is null");
                InlongMultiTopicManager.this.context.getDefaultStateCounter().addRequestManagerFailTimes(1L);
            }
        }
    }

    public InlongMultiTopicManager(ClientContext clientContext, QueryConsumeConfig queryConsumeConfig) {
        super(clientContext, queryConsumeConfig);
        this.pulsarFetchers = new ConcurrentHashMap();
        this.kafkaFetchers = new ConcurrentHashMap();
        this.tubeFetchers = new ConcurrentHashMap();
        this.allFetchers = new ConcurrentHashMap();
        this.allTopics = new HashSet();
        this.stopAssign = false;
        this.consumerSize = clientContext.getConfig().getMaxConsumerSize();
        this.updateMetaDataWorker = new UpdateMetaDataThread(clientContext.getConfig().getUpdateMetaDataIntervalSec(), TimeUnit.SECONDS);
        this.updateMetaDataWorker.start("sortsdk_multi_topic_manager_" + clientContext.getConfig().getSortTaskId() + "_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss"));
        LOGGER.info("create InlongMultiTopicManager success");
    }

    @Override // org.apache.inlong.sdk.sort.api.Cleanable
    public boolean clean() {
        LOGGER.info("start clean {}", this.context.getConfig().getSortTaskId());
        close();
        offlineAllTopicsAndPartitions();
        LOGGER.info("end clean {}", this.context.getConfig().getSortTaskId());
        return true;
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public TopicFetcher addTopic(InLongTopic inLongTopic) {
        return null;
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public TopicFetcher removeTopic(InLongTopic inLongTopic, boolean z) {
        return null;
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public TopicFetcher getFetcher(String str) {
        return this.allFetchers.get(str);
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public Collection<TopicFetcher> getAllFetchers() {
        return this.allFetchers.values();
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public Set<String> getManagedInLongTopics() {
        return this.allTopics;
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public void offlineAllTopicsAndPartitions() {
        String sortTaskId = this.context.getConfig().getSortTaskId();
        try {
            try {
                LOGGER.info("start offline {}", sortTaskId);
                this.stopAssign = true;
                for (Map.Entry<String, TopicFetcher> entry : this.allFetchers.entrySet()) {
                    String key = entry.getKey();
                    TopicFetcher value = entry.getValue();
                    boolean z = false;
                    if (value != null) {
                        try {
                            z = value.close();
                        } catch (Exception e) {
                            LOGGER.error("got exception when close fetcher={}", value.getTopics(), e);
                        }
                    }
                    LOGGER.info("close fetcher={} {}", key, Boolean.valueOf(z));
                }
            } catch (Exception e2) {
                LOGGER.error("got exception when offline topics and partitions, ", e2);
                this.allFetchers.clear();
                this.kafkaFetchers.clear();
                this.pulsarFetchers.clear();
                this.tubeFetchers.clear();
                this.stopAssign = false;
                LOGGER.info("close finished {}", sortTaskId);
            }
        } finally {
            this.allFetchers.clear();
            this.kafkaFetchers.clear();
            this.pulsarFetchers.clear();
            this.tubeFetchers.clear();
            this.stopAssign = false;
            LOGGER.info("close finished {}", sortTaskId);
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public void close() {
        if (this.updateMetaDataWorker != null) {
            this.updateMetaDataWorker.stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleUpdatedConsumeConfig(List<InLongTopic> list) {
        if (CollectionUtils.isEmpty(list)) {
            LOGGER.warn("assignedTopics is null or empty, do nothing");
            return;
        }
        this.allTopics = (Set) list.stream().map((v0) -> {
            return v0.getTopic();
        }).collect(Collectors.toSet());
        ((Map) list.stream().filter(inLongTopic -> {
            return InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType());
        }).collect(Collectors.groupingBy(inLongTopic2 -> {
            return inLongTopic2.getInLongCluster().getClusterId();
        }))).forEach(this::updateKafkaFetcher);
        ((Map) list.stream().filter(inLongTopic3 -> {
            return InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic3.getTopicType());
        }).collect(Collectors.groupingBy(inLongTopic4 -> {
            return inLongTopic4.getInLongCluster().getClusterId();
        }))).forEach(this::updatePulsarFetcher);
        ((Map) list.stream().filter(inLongTopic5 -> {
            return InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic5.getTopicType());
        }).collect(Collectors.groupingBy(inLongTopic6 -> {
            return inLongTopic6.getInLongCluster().getClusterId();
        }))).forEach(this::updateTubeFetcher);
    }

    private void updateKafkaFetcher(String str, List<InLongTopic> list) {
        List<TopicFetcher> computeIfAbsent = this.kafkaFetchers.computeIfAbsent(str, str2 -> {
            return new ArrayList();
        });
        if (CollectionUtils.isNotEmpty(computeIfAbsent)) {
            computeIfAbsent.forEach(topicFetcher -> {
                topicFetcher.updateTopics(list);
            });
            return;
        }
        String bootstraps = list.stream().findFirst().get().getInLongCluster().getBootstraps();
        TopicFetcherBuilder context = TopicFetcherBuilder.newKafkaBuilder().bootstrapServers(bootstraps).topic(list).context(this.context);
        LOGGER.info("create new kafka multi topic consumer for bootstrap {}, size is {}", bootstraps, Integer.valueOf(this.consumerSize));
        for (int i = 0; i < this.consumerSize; i++) {
            computeIfAbsent.add(context.subscribe());
        }
        computeIfAbsent.forEach(topicFetcher2 -> {
            this.allFetchers.put(topicFetcher2.getFetchKey(), topicFetcher2);
        });
    }

    private void updatePulsarFetcher(String str, List<InLongTopic> list) {
        List<TopicFetcher> computeIfAbsent = this.pulsarFetchers.computeIfAbsent(str, str2 -> {
            return new ArrayList();
        });
        if (CollectionUtils.isNotEmpty(computeIfAbsent)) {
            computeIfAbsent.forEach(topicFetcher -> {
                topicFetcher.updateTopics(list);
            });
            return;
        }
        InLongTopic inLongTopic = list.stream().findFirst().get();
        LOGGER.info("create new pulsar multi topic consumer for bootstrap {}, size is {}", inLongTopic.getInLongCluster().getBootstraps(), Integer.valueOf(this.consumerSize));
        for (int i = 0; i < this.consumerSize; i++) {
            try {
                TopicFetcher subscribe = TopicFetcherBuilder.newPulsarBuilder().pulsarClient(PulsarClient.builder().serviceUrl(inLongTopic.getInLongCluster().getBootstraps()).authentication(AuthenticationFactory.token(inLongTopic.getInLongCluster().getToken())).build()).topic(list).context(this.context).subscribe();
                computeIfAbsent.add(subscribe);
                this.allFetchers.put(subscribe.getFetchKey(), subscribe);
            } catch (PulsarClientException e) {
                LOGGER.error("failed to create pulsar client for {}\n", inLongTopic.getInLongCluster().getBootstraps(), e);
            }
        }
    }

    private void updateTubeFetcher(String str, List<InLongTopic> list) {
        List<TopicFetcher> computeIfAbsent = this.tubeFetchers.computeIfAbsent(str, str2 -> {
            return new ArrayList();
        });
        if (CollectionUtils.isNotEmpty(computeIfAbsent)) {
            computeIfAbsent.forEach(topicFetcher -> {
                topicFetcher.updateTopics(list);
            });
            return;
        }
        InLongTopic inLongTopic = list.stream().findFirst().get();
        LOGGER.info("create new tube multi topic consumer for bootstrap {}, size is {}", inLongTopic.getInLongCluster().getBootstraps(), Integer.valueOf(this.consumerSize));
        for (int i = 0; i < this.consumerSize; i++) {
            try {
                TubeClientConfig tubeClientConfig = new TubeClientConfig(inLongTopic.getInLongCluster().getBootstraps());
                TubeConsumerCreator tubeConsumerCreator = new TubeConsumerCreator(new TubeSingleSessionFactory(tubeClientConfig), tubeClientConfig);
                list.forEach(inLongTopic2 -> {
                    computeIfAbsent.add(TopicFetcherBuilder.newTubeBuilder().tubeConsumerCreater(tubeConsumerCreator).topic(inLongTopic2).context(this.context).subscribe());
                });
            } catch (TubeClientException e) {
                LOGGER.error("failed to create tube client for {}\n", inLongTopic.getInLongCluster().getBootstraps(), e);
            }
        }
        computeIfAbsent.forEach(topicFetcher2 -> {
            this.allFetchers.put(topicFetcher2.getFetchKey(), topicFetcher2);
        });
    }
}
