package org.apache.inlong.sdk.sort.impl;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.sdk.sort.api.ClientContext;
import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
import org.apache.inlong.sdk.sort.api.InLongTopicManager;
import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum;
import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl;
import org.apache.inlong.sdk.sort.impl.tube.InLongTubeFetcherImpl;
import org.apache.inlong.sdk.sort.impl.tube.TubeConsumerCreater;
import org.apache.inlong.sdk.sort.util.PeriodicTask;
import org.apache.inlong.sdk.sort.util.StringUtil;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.class */
public class InLongTopicManagerImpl extends InLongTopicManager {
    private final Logger logger;
    private final ConcurrentHashMap<String, InLongTopicFetcher> fetchers;
    private final ConcurrentHashMap<String, PulsarClient> pulsarClients;
    private final ConcurrentHashMap<String, TubeConsumerCreater> tubeFactories;
    private final PeriodicTask updateMetaDataWorker;
    private volatile List<String> toBeSelectFetchers;
    private boolean stopAssign;

    /* loaded from: input_file:org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl$UpdateMetaDataThread.class */
    private class UpdateMetaDataThread extends PeriodicTask {
        public UpdateMetaDataThread(long j, TimeUnit timeUnit) {
            super(j, timeUnit, InLongTopicManagerImpl.this.context.getConfig());
        }

        @Override // org.apache.inlong.sdk.sort.util.PeriodicTask
        protected void doWork() {
            this.logger.debug("InLongTopicManagerImpl doWork");
            if (InLongTopicManagerImpl.this.stopAssign) {
                this.logger.warn("assign is stoped");
                return;
            }
            if (InLongTopicManagerImpl.this.queryConsumeConfig == null) {
                this.logger.error("subscribedMetaDataInfo is null");
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            InLongTopicManagerImpl.this.context.getStatManager().getStatistics(InLongTopicManagerImpl.this.context.getConfig().getSortTaskId()).addRequestManagerTimes(1L);
            ConsumeConfig queryCurrentConsumeConfig = InLongTopicManagerImpl.this.queryConsumeConfig.queryCurrentConsumeConfig(InLongTopicManagerImpl.this.context.getConfig().getSortTaskId());
            InLongTopicManagerImpl.this.context.getStatManager().getStatistics(InLongTopicManagerImpl.this.context.getConfig().getSortTaskId()).addRequestManagerTimeCost(System.currentTimeMillis() - currentTimeMillis);
            if (queryCurrentConsumeConfig != null) {
                InLongTopicManagerImpl.this.handleCurrentConsumeConfig(queryCurrentConsumeConfig.getTopics());
            } else {
                this.logger.warn("subscribedInfo is null");
                InLongTopicManagerImpl.this.context.getStatManager().getStatistics(InLongTopicManagerImpl.this.context.getConfig().getSortTaskId()).addRequestManagerFailTimes(1L);
            }
        }
    }

    public InLongTopicManagerImpl(ClientContext clientContext, QueryConsumeConfig queryConsumeConfig) {
        super(clientContext, queryConsumeConfig);
        this.logger = LoggerFactory.getLogger(InLongTopicManagerImpl.class);
        this.fetchers = new ConcurrentHashMap<>();
        this.pulsarClients = new ConcurrentHashMap<>();
        this.tubeFactories = new ConcurrentHashMap<>();
        this.toBeSelectFetchers = new ArrayList();
        this.stopAssign = false;
        this.updateMetaDataWorker = new UpdateMetaDataThread(clientContext.getConfig().getUpdateMetaDataIntervalSec(), TimeUnit.SECONDS);
        this.updateMetaDataWorker.start("sortsdk_inlongtopic_manager_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss"));
    }

    private void updateToBeSelectFetchers(Collection<String> collection) {
        this.toBeSelectFetchers = new ArrayList(collection);
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicManager
    public InLongTopicFetcher addFetcher(InLongTopic inLongTopic) {
        try {
            InLongTopicFetcher inLongTopicFetcher = this.fetchers.get(inLongTopic.getTopicKey());
            if (inLongTopicFetcher == null) {
                InLongTopicFetcher createInLongTopicFetcher = createInLongTopicFetcher(inLongTopic);
                InLongTopicFetcher putIfAbsent = this.fetchers.putIfAbsent(inLongTopic.getTopicKey(), createInLongTopicFetcher);
                this.logger.info("addFetcher :{}", inLongTopic.getTopicKey());
                if (putIfAbsent != null) {
                    inLongTopicFetcher = putIfAbsent;
                    if (createInLongTopicFetcher != null) {
                        createInLongTopicFetcher.close();
                    }
                    this.logger.info("addFetcher create same fetcher {}", inLongTopic);
                } else {
                    inLongTopicFetcher = createInLongTopicFetcher;
                    if (inLongTopicFetcher != null && !inLongTopicFetcher.init(this.pulsarClients.get(inLongTopic.getInLongCluster().getClusterId()))) {
                        this.logger.info("addFetcher init fail {}", inLongTopic.getTopicKey());
                        inLongTopicFetcher.close();
                        inLongTopicFetcher = null;
                    }
                }
            }
            return inLongTopicFetcher;
        } finally {
            updateToBeSelectFetchers(this.fetchers.keySet());
        }
    }

    private InLongTopicFetcher createInLongTopicFetcher(InLongTopic inLongTopic) {
        if (InlongTopicTypeEnum.PULSAR.getName().equals(inLongTopic.getTopicType())) {
            this.logger.info("the topic is pulsar {}", inLongTopic);
            return new InLongPulsarFetcherImpl(inLongTopic, this.context);
        }
        if (InlongTopicTypeEnum.TUBE.getName().equals(inLongTopic.getTopicType())) {
            this.logger.info("the topic is tube {}", inLongTopic);
            return new InLongTubeFetcherImpl(inLongTopic, this.context);
        }
        this.logger.error("topic type not support " + inLongTopic.getTopicType());
        return null;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicManager
    public InLongTopicFetcher removeFetcher(InLongTopic inLongTopic, boolean z) {
        InLongTopicFetcher remove = this.fetchers.remove(inLongTopic.getTopicKey());
        if (remove != null && z) {
            remove.close();
        }
        return remove;
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicManager
    public InLongTopicFetcher getFetcher(String str) {
        return this.fetchers.get(str);
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicManager
    public Set<String> getManagedInLongTopics() {
        return new HashSet(this.fetchers.keySet());
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicManager
    public Collection<InLongTopicFetcher> getAllFetchers() {
        return this.fetchers.values();
    }

    @Override // org.apache.inlong.sdk.sort.api.InLongTopicManager
    public void offlineAllTp() {
        String sortTaskId = this.context.getConfig().getSortTaskId();
        try {
            this.logger.info("start offline {}", sortTaskId);
            this.stopAssign = true;
            closeAllFetcher();
            this.logger.info("close finished {}", sortTaskId);
        } catch (Exception e) {
            this.logger.error(e.getMessage(), e);
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.Cleanable
    public boolean clean() {
        String sortTaskId = this.context.getConfig().getSortTaskId();
        try {
            this.logger.info("start close {}", sortTaskId);
            if (this.updateMetaDataWorker != null) {
                this.updateMetaDataWorker.stop();
            }
            closeFetcher();
            closePulsarClient();
            closeTubeSessionFactory();
            this.logger.info("close finished {}", sortTaskId);
            return true;
        } catch (Throwable th) {
            this.logger.error("close error " + sortTaskId, th);
            return false;
        }
    }

    private void closeAllFetcher() {
        closeFetcher();
    }

    private void closeFetcher() {
        for (Map.Entry<String, InLongTopicFetcher> entry : this.fetchers.entrySet()) {
            String key = entry.getKey();
            InLongTopicFetcher value = entry.getValue();
            boolean z = false;
            if (value != null) {
                try {
                    z = value.close();
                } catch (Exception e) {
                    this.logger.error(e.getMessage(), e);
                }
            }
            this.logger.info(" close {} {}", key, Boolean.valueOf(z));
        }
    }

    private void closePulsarClient() {
        for (Map.Entry<String, PulsarClient> entry : this.pulsarClients.entrySet()) {
            PulsarClient value = entry.getValue();
            String key = entry.getKey();
            if (value != null) {
                try {
                    value.close();
                } catch (Exception e) {
                    this.logger.error("close PulsarClient" + key + " error.", e);
                }
            }
        }
        this.pulsarClients.clear();
    }

    private void closeTubeSessionFactory() {
        for (Map.Entry<String, TubeConsumerCreater> entry : this.tubeFactories.entrySet()) {
            MessageSessionFactory messageSessionFactory = entry.getValue().getMessageSessionFactory();
            String key = entry.getKey();
            if (messageSessionFactory != null) {
                try {
                    messageSessionFactory.shutdown();
                } catch (Exception e) {
                    this.logger.error("close MessageSessionFactory" + key + " error.", e);
                }
            }
        }
        this.tubeFactories.clear();
    }

    private List<String> getNewTopics(List<InLongTopic> list) {
        if (list == null || list.size() <= 0) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        Iterator<InLongTopic> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getTopicKey());
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleCurrentConsumeConfig(List<InLongTopic> list) {
        if (null == list) {
            this.logger.warn("List<InLongTopic> currentConsumeConfig is null");
            return;
        }
        ArrayList arrayList = new ArrayList(list);
        this.logger.info("newConsumeConfig List:{}", Arrays.toString(arrayList.toArray()));
        List<String> newTopics = getNewTopics(arrayList);
        this.logger.info("newTopics :{}", Arrays.toString(newTopics.toArray()));
        ArrayList arrayList2 = new ArrayList(this.fetchers.keySet());
        this.logger.info("oldInLongTopics :{}", Arrays.toString(arrayList2.toArray()));
        arrayList2.removeAll(newTopics);
        this.logger.info("removed oldInLongTopics :{}", Arrays.toString(arrayList2.toArray()));
        newTopics.removeAll(new ArrayList(this.fetchers.keySet()));
        this.logger.info("really new topics :{}", Arrays.toString(newTopics.toArray()));
        offlineRmovedTopic(arrayList2);
        onlineNewTopic(arrayList, newTopics);
    }

    private void offlineRmovedTopic(List<String> list) {
        for (String str : list) {
            this.logger.info("offlineRmovedTopic {}", str);
            InLongTopic inLongTopic = this.fetchers.get(str).getInLongTopic();
            InLongTopicFetcher orDefault = this.fetchers.getOrDefault(str, null);
            if (orDefault != null) {
                orDefault.close();
            }
            this.fetchers.remove(str);
            if (this.context == null || this.context.getStatManager() == null || inLongTopic == null) {
                this.logger.error("context == null or context.getStatManager() == null or inLongTopic == null :{}", inLongTopic);
            } else {
                this.context.getStatManager().getStatistics(this.context.getConfig().getSortTaskId(), inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addTopicOfflineTimes(1L);
            }
        }
    }

    private void onlineNewTopic(List<InLongTopic> list, List<String> list2) {
        for (InLongTopic inLongTopic : list) {
            if (list2.contains(inLongTopic.getTopicKey())) {
                onlineTopic(inLongTopic);
            } else {
                this.logger.info("!reallyNewTopic.contains(inLongTopic.getTopicKey())");
            }
        }
    }

    private void onlineTopic(InLongTopic inLongTopic) {
        if (InlongTopicTypeEnum.PULSAR.getName().equals(inLongTopic.getTopicType())) {
            this.logger.info("the topic is pulsar:{}", inLongTopic);
            onlinePulsarTopic(inLongTopic);
        } else if (InlongTopicTypeEnum.KAFKA.getName().equals(inLongTopic.getTopicType())) {
            this.logger.info("the topic is kafka:{}", inLongTopic);
            onlineKafkaTopic(inLongTopic);
        } else if (!InlongTopicTypeEnum.TUBE.getName().equals(inLongTopic.getTopicType())) {
            this.logger.error("topic type:{} not support", inLongTopic.getTopicType());
        } else {
            this.logger.info("the topic is tube:{}", inLongTopic);
            onlineTubeTopic(inLongTopic);
        }
    }

    private void onlinePulsarTopic(InLongTopic inLongTopic) {
        if (checkAndCreateNewPulsarClient(inLongTopic)) {
            createNewFetcher(inLongTopic);
        } else {
            this.logger.error("checkAndCreateNewPulsarClient error:{}", inLongTopic);
        }
    }

    private boolean checkAndCreateNewPulsarClient(InLongTopic inLongTopic) {
        if (!this.pulsarClients.containsKey(inLongTopic.getInLongCluster().getClusterId())) {
            if (inLongTopic.getInLongCluster().getBootstraps() == null) {
                this.logger.info("bootstrap is null {}", inLongTopic.getInLongCluster());
                return false;
            }
            try {
                this.pulsarClients.put(inLongTopic.getInLongCluster().getClusterId(), PulsarClient.builder().serviceUrl(inLongTopic.getInLongCluster().getBootstraps()).authentication(AuthenticationFactory.token(inLongTopic.getInLongCluster().getToken())).build());
                this.logger.info("create pulsar client succ {} {} {}", new Object[]{inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getInLongCluster().getBootstraps(), inLongTopic.getInLongCluster().getToken()});
            } catch (Exception e) {
                this.logger.error("create pulsar client error {}", inLongTopic);
                this.logger.error(e.getMessage(), e);
                return false;
            }
        }
        this.logger.info("create pulsar client true {}", inLongTopic);
        return true;
    }

    private boolean checkAndCreateNewTubeSessionFactory(InLongTopic inLongTopic) {
        if (!this.tubeFactories.containsKey(inLongTopic.getInLongCluster().getClusterId())) {
            if (inLongTopic.getInLongCluster().getBootstraps() == null) {
                this.logger.info("bootstrap is null {}", inLongTopic.getInLongCluster());
                return false;
            }
            try {
                TubeClientConfig tubeClientConfig = new TubeClientConfig(inLongTopic.getInLongCluster().getBootstraps());
                this.tubeFactories.put(inLongTopic.getInLongCluster().getClusterId(), new TubeConsumerCreater(new TubeSingleSessionFactory(tubeClientConfig), tubeClientConfig));
                this.logger.info("create tube client succ {} {} {}", new Object[]{inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getInLongCluster().getBootstraps(), inLongTopic.getInLongCluster().getToken()});
            } catch (Exception e) {
                this.logger.error("create tube client error {}", inLongTopic);
                this.logger.error(e.getMessage(), e);
                return false;
            }
        }
        this.logger.info("create pulsar client true {}", inLongTopic);
        return true;
    }

    private void onlineKafkaTopic(InLongTopic inLongTopic) {
    }

    private void onlineTubeTopic(InLongTopic inLongTopic) {
        if (checkAndCreateNewTubeSessionFactory(inLongTopic)) {
            createNewFetcher(inLongTopic);
        } else {
            this.logger.error("checkAndCreateNewPulsarClient error:{}", inLongTopic);
        }
    }

    private void createNewFetcher(InLongTopic inLongTopic) {
        if (this.fetchers.containsKey(inLongTopic.getTopicKey())) {
            return;
        }
        this.logger.info("begin add Fetcher:{}", inLongTopic.getTopicKey());
        if (this.context == null || this.context.getStatManager() == null) {
            this.logger.error("context == null or context.getStatManager() == null");
            return;
        }
        this.context.getStatManager().getStatistics(this.context.getConfig().getSortTaskId(), inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic()).addTopicOnlineTimes(1L);
        if (addFetcher(inLongTopic) == null) {
            this.fetchers.remove(inLongTopic.getTopicKey());
            this.logger.error("add fetcher error:{}", inLongTopic.getTopicKey());
        }
    }
}
