package org.apache.inlong.sdk.sort.manager;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.inlong.sdk.sort.api.ClientContext;
import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum;
import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
import org.apache.inlong.sdk.sort.api.TopicFetcher;
import org.apache.inlong.sdk.sort.api.TopicFetcherBuilder;
import org.apache.inlong.sdk.sort.api.TopicManager;
import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator;
import org.apache.inlong.sdk.sort.util.PeriodicTask;
import org.apache.inlong.sdk.sort.util.StringUtil;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.class */
public class InlongSingleTopicManager extends TopicManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongSingleTopicManager.class);
    private final ConcurrentHashMap<String, TopicFetcher> fetchers;
    private final ConcurrentHashMap<String, PulsarClient> pulsarClients;
    private final ConcurrentHashMap<String, TubeConsumerCreator> tubeFactories;
    private final PeriodicTask updateMetaDataWorker;
    private boolean stopAssign;

    /* loaded from: input_file:org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager$UpdateMetaDataThread.class */
    private class UpdateMetaDataThread extends PeriodicTask {
        public UpdateMetaDataThread(long j, TimeUnit timeUnit) {
            super(j, timeUnit, InlongSingleTopicManager.this.context.getConfig());
        }

        @Override // org.apache.inlong.sdk.sort.util.PeriodicTask
        protected void doWork() {
            this.logger.debug("InLongTopicManagerImpl doWork");
            if (InlongSingleTopicManager.this.stopAssign) {
                this.logger.warn("assign is stoped");
                return;
            }
            if (InlongSingleTopicManager.this.queryConsumeConfig == null) {
                this.logger.error("subscribedMetaDataInfo is null");
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            InlongSingleTopicManager.this.context.getDefaultStateCounter().addRequestManagerTimes(1L);
            ConsumeConfig queryCurrentConsumeConfig = InlongSingleTopicManager.this.queryConsumeConfig.queryCurrentConsumeConfig(InlongSingleTopicManager.this.context.getConfig().getSortTaskId());
            InlongSingleTopicManager.this.context.getDefaultStateCounter().addRequestManagerTimeCost(System.currentTimeMillis() - currentTimeMillis);
            if (queryCurrentConsumeConfig != null) {
                InlongSingleTopicManager.this.handleUpdatedConsumeConfig(queryCurrentConsumeConfig.getTopics());
            } else {
                this.logger.warn("subscribedInfo is null");
                InlongSingleTopicManager.this.context.getDefaultStateCounter().addRequestManagerFailTimes(1L);
            }
        }
    }

    public InlongSingleTopicManager(ClientContext clientContext, QueryConsumeConfig queryConsumeConfig) {
        super(clientContext, queryConsumeConfig);
        this.fetchers = new ConcurrentHashMap<>();
        this.pulsarClients = new ConcurrentHashMap<>();
        this.tubeFactories = new ConcurrentHashMap<>();
        this.stopAssign = false;
        this.updateMetaDataWorker = new UpdateMetaDataThread(clientContext.getConfig().getUpdateMetaDataIntervalSec(), TimeUnit.SECONDS);
        this.updateMetaDataWorker.start("sortsdk_single_topic_manager_" + clientContext.getConfig().getSortTaskId() + "_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss"));
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public TopicFetcher addTopic(InLongTopic inLongTopic) {
        try {
            TopicFetcher topicFetcher = this.fetchers.get(inLongTopic.getTopicKey());
            if (topicFetcher == null) {
                TopicFetcher createInLongTopicFetcher = createInLongTopicFetcher(inLongTopic);
                TopicFetcher putIfAbsent = this.fetchers.putIfAbsent(inLongTopic.getTopicKey(), createInLongTopicFetcher);
                LOGGER.info("addFetcher :{}", inLongTopic.getTopicKey());
                topicFetcher = createInLongTopicFetcher;
                if (putIfAbsent != null) {
                    topicFetcher = putIfAbsent;
                    if (createInLongTopicFetcher != null) {
                        createInLongTopicFetcher.close();
                    }
                    LOGGER.info("addFetcher create same fetcher {}", inLongTopic);
                }
            }
            return topicFetcher;
        } catch (Throwable th) {
            LOGGER.error("got error when add fetcher: {}", th.getMessage(), th);
            return null;
        }
    }

    private TopicFetcher createInLongTopicFetcher(InLongTopic inLongTopic) {
        if (InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
            LOGGER.info("the topic is pulsar {}", inLongTopic);
            return TopicFetcherBuilder.newPulsarBuilder().pulsarClient(this.pulsarClients.get(inLongTopic.getInLongCluster().getClusterId())).topic(inLongTopic).context(this.context).subscribe();
        }
        if (InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
            LOGGER.info("the topic is kafka {}", inLongTopic);
            return TopicFetcherBuilder.newKafkaBuilder().bootstrapServers(inLongTopic.getInLongCluster().getBootstraps()).topic(inLongTopic).context(this.context).subscribe();
        }
        if (InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
            LOGGER.info("the topic is tube {}", inLongTopic);
            return TopicFetcherBuilder.newTubeBuilder().tubeConsumerCreater(this.tubeFactories.get(inLongTopic.getInLongCluster().getClusterId())).topic(inLongTopic).context(this.context).subscribe();
        }
        LOGGER.error("topic type not support " + inLongTopic.getTopicType());
        return null;
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public TopicFetcher removeTopic(InLongTopic inLongTopic, boolean z) {
        TopicFetcher remove = this.fetchers.remove(inLongTopic.getTopicKey());
        if (remove != null && z) {
            remove.close();
        }
        return remove;
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public TopicFetcher getFetcher(String str) {
        return this.fetchers.get(str);
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public Set<String> getManagedInLongTopics() {
        return new HashSet(this.fetchers.keySet());
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public Collection<TopicFetcher> getAllFetchers() {
        return this.fetchers.values();
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public void offlineAllTopicsAndPartitions() {
        String sortTaskId = this.context.getConfig().getSortTaskId();
        try {
            LOGGER.info("start offline {}", sortTaskId);
            this.stopAssign = true;
            closeAllFetcher();
            LOGGER.info("close finished {}", sortTaskId);
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public void close() {
        if (this.updateMetaDataWorker != null) {
            this.updateMetaDataWorker.stop();
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.Cleanable
    public boolean clean() {
        String sortTaskId = this.context.getConfig().getSortTaskId();
        try {
            LOGGER.info("start close {}", sortTaskId);
            if (this.updateMetaDataWorker != null) {
                this.updateMetaDataWorker.stop();
            }
            closeFetcher();
            closePulsarClient();
            closeTubeSessionFactory();
            LOGGER.info("close finished {}", sortTaskId);
            return true;
        } catch (Throwable th) {
            LOGGER.error("close error " + sortTaskId, th);
            return false;
        }
    }

    private void closeAllFetcher() {
        closeFetcher();
    }

    private void closeFetcher() {
        for (Map.Entry<String, TopicFetcher> entry : this.fetchers.entrySet()) {
            String key = entry.getKey();
            TopicFetcher value = entry.getValue();
            boolean z = false;
            if (value != null) {
                try {
                    z = value.close();
                } catch (Exception e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            LOGGER.info("close fetcher{} {}", key, Boolean.valueOf(z));
        }
    }

    private void closePulsarClient() {
        for (Map.Entry<String, PulsarClient> entry : this.pulsarClients.entrySet()) {
            PulsarClient value = entry.getValue();
            String key = entry.getKey();
            if (value != null) {
                try {
                    value.close();
                } catch (Exception e) {
                    LOGGER.error("close PulsarClient" + key + " error.", e);
                }
            }
        }
        this.pulsarClients.clear();
    }

    private void closeTubeSessionFactory() {
        for (Map.Entry<String, TubeConsumerCreator> entry : this.tubeFactories.entrySet()) {
            MessageSessionFactory messageSessionFactory = entry.getValue().getMessageSessionFactory();
            String key = entry.getKey();
            if (messageSessionFactory != null) {
                try {
                    messageSessionFactory.shutdown();
                } catch (Exception e) {
                    LOGGER.error("close MessageSessionFactory" + key + " error.", e);
                }
            }
        }
        this.tubeFactories.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleUpdatedConsumeConfig(List<InLongTopic> list) {
        if (null == list) {
            LOGGER.warn("assignedTopics is null, do nothing");
            return;
        }
        List<String> list2 = (List) list.stream().map((v0) -> {
            return v0.getTopicKey();
        }).collect(Collectors.toList());
        LOGGER.debug("assignedTopics name: {}", Arrays.toString(list2.toArray()));
        List<String> arrayList = new ArrayList<>(this.fetchers.keySet());
        LOGGER.debug("oldTopics :{}", Arrays.toString(arrayList.toArray()));
        arrayList.removeAll(list2);
        LOGGER.debug("removed oldTopics: {}", Arrays.toString(arrayList.toArray()));
        list2.removeAll(new ArrayList<>(this.fetchers.keySet()));
        LOGGER.debug("really new topics :{}", Arrays.toString(list2.toArray()));
        offlineRemovedTopic(arrayList);
        onlineNewTopic(list, list2);
        updateRemainTopics(list);
    }

    private void updateRemainTopics(List<InLongTopic> list) {
        list.forEach(inLongTopic -> {
            TopicFetcher topicFetcher = this.fetchers.get(inLongTopic.getTopicKey());
            if (Objects.isNull(topicFetcher)) {
                return;
            }
            topicFetcher.updateTopics(Collections.singletonList(inLongTopic));
        });
    }

    private void offlineRemovedTopic(List<String> list) {
        for (String str : list) {
            LOGGER.info("offlineRemovedTopic {}", str);
            InLongTopic inLongTopic = this.fetchers.get(str).getTopics().get(0);
            TopicFetcher topicFetcher = this.fetchers.get(str);
            if (topicFetcher != null) {
                topicFetcher.close();
            }
            this.fetchers.remove(str);
            if (this.context == null || this.context.getStatManager() == null || inLongTopic == null) {
                LOGGER.error("context == null or context.getStatManager() == null or inLongTopic == null :{}", inLongTopic);
            } else {
                this.context.getStateCounterByTopic(inLongTopic).addTopicOfflineTimes(1L);
            }
        }
    }

    private void onlineNewTopic(List<InLongTopic> list, List<String> list2) {
        for (InLongTopic inLongTopic : list) {
            if (list2.contains(inLongTopic.getTopicKey())) {
                onlineTopic(inLongTopic);
            } else {
                LOGGER.debug("!reallyNewTopic.contains(inLongTopic.getTopicKey())");
            }
        }
    }

    private void onlineTopic(InLongTopic inLongTopic) {
        if (InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
            LOGGER.info("the topic is pulsar:{}", inLongTopic);
            onlinePulsarTopic(inLongTopic);
        } else if (InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
            LOGGER.info("the topic is kafka:{}", inLongTopic);
            onlineKafkaTopic(inLongTopic);
        } else if (!InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
            LOGGER.error("topic type:{} not support", inLongTopic.getTopicType());
        } else {
            LOGGER.info("the topic is tube:{}", inLongTopic);
            onlineTubeTopic(inLongTopic);
        }
    }

    private void onlinePulsarTopic(InLongTopic inLongTopic) {
        if (checkAndCreateNewPulsarClient(inLongTopic)) {
            createNewFetcher(inLongTopic);
        } else {
            LOGGER.error("checkAndCreateNewPulsarClient error:{}", inLongTopic);
        }
    }

    private boolean checkAndCreateNewPulsarClient(InLongTopic inLongTopic) {
        if (!this.pulsarClients.containsKey(inLongTopic.getInLongCluster().getClusterId())) {
            if (inLongTopic.getInLongCluster().getBootstraps() == null) {
                LOGGER.error("bootstrap is null {}", inLongTopic.getInLongCluster());
                return false;
            }
            try {
                this.pulsarClients.put(inLongTopic.getInLongCluster().getClusterId(), PulsarClient.builder().serviceUrl(inLongTopic.getInLongCluster().getBootstraps()).authentication(AuthenticationFactory.token(inLongTopic.getInLongCluster().getToken())).build());
                LOGGER.debug("create pulsar client succ {}", new String[]{inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getInLongCluster().getBootstraps(), inLongTopic.getInLongCluster().getToken()});
            } catch (Exception e) {
                LOGGER.error("create pulsar client error {}", inLongTopic);
                LOGGER.error(e.getMessage(), e);
                return false;
            }
        }
        LOGGER.info("create pulsar client true {}", inLongTopic);
        return true;
    }

    private boolean checkAndCreateNewTubeSessionFactory(InLongTopic inLongTopic) {
        if (!this.tubeFactories.containsKey(inLongTopic.getInLongCluster().getClusterId())) {
            if (inLongTopic.getInLongCluster().getBootstraps() == null) {
                LOGGER.info("bootstrap is null {}", inLongTopic.getInLongCluster());
                return false;
            }
            try {
                TubeClientConfig tubeClientConfig = new TubeClientConfig(inLongTopic.getInLongCluster().getBootstraps());
                this.tubeFactories.put(inLongTopic.getInLongCluster().getClusterId(), new TubeConsumerCreator(new TubeSingleSessionFactory(tubeClientConfig), tubeClientConfig));
                LOGGER.debug("create tube client succ {} {} {}", new String[]{inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getInLongCluster().getBootstraps(), inLongTopic.getInLongCluster().getToken()});
            } catch (Exception e) {
                LOGGER.error("create tube client error {}", inLongTopic);
                LOGGER.error(e.getMessage(), e);
                return false;
            }
        }
        LOGGER.info("create pulsar client true {}", inLongTopic);
        return true;
    }

    private void onlineKafkaTopic(InLongTopic inLongTopic) {
        createNewFetcher(inLongTopic);
    }

    private void onlineTubeTopic(InLongTopic inLongTopic) {
        if (checkAndCreateNewTubeSessionFactory(inLongTopic)) {
            createNewFetcher(inLongTopic);
        } else {
            LOGGER.error("checkAndCreateNewPulsarClient error:{}", inLongTopic);
        }
    }

    private void createNewFetcher(InLongTopic inLongTopic) {
        if (this.fetchers.containsKey(inLongTopic.getTopicKey())) {
            return;
        }
        LOGGER.info("begin add Fetcher:{}", inLongTopic.getTopicKey());
        if (this.context == null || this.context.getStatManager() == null) {
            LOGGER.error("context == null or context.getStatManager() == null");
            return;
        }
        this.context.getStateCounterByTopic(inLongTopic).addTopicOnlineTimes(1L);
        if (addTopic(inLongTopic) == null) {
            this.fetchers.remove(inLongTopic.getTopicKey());
            LOGGER.error("add fetcher error:{}", inLongTopic.getTopicKey());
        }
    }
}
