package org.apache.inlong.sdk.sort.manager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.sdk.sort.api.ClientContext;
import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum;
import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
import org.apache.inlong.sdk.sort.api.TopicFetcher;
import org.apache.inlong.sdk.sort.api.TopicFetcherBuilder;
import org.apache.inlong.sdk.sort.api.TopicManager;
import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/sort/manager/InlongTopicManager.class */
public class InlongTopicManager extends TopicManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(InlongTopicManager.class);
    private final ScheduledExecutorService executor;
    private final Map<String, TopicFetcher> fetchers;
    private final Map<String, PulsarClient> pulsarClients;
    private final Map<String, TubeConsumerCreator> tubeFactories;
    protected final ForkJoinPool pool;
    private volatile boolean stopAssign;
    private Collection<InLongTopic> assignedTopics;

    public InlongTopicManager(ClientContext clientContext, QueryConsumeConfig queryConsumeConfig) {
        super(clientContext, queryConsumeConfig);
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.fetchers = new ConcurrentHashMap();
        this.pulsarClients = new ConcurrentHashMap();
        this.tubeFactories = new ConcurrentHashMap();
        this.stopAssign = false;
        this.executor.scheduleWithFixedDelay(this::updateMetaData, 0L, clientContext.getConfig().getUpdateMetaDataIntervalSec(), TimeUnit.SECONDS);
        this.pool = new ForkJoinPool(clientContext.getConfig().getThreadPoolSize());
    }

    @Override // org.apache.inlong.sdk.sort.api.Cleanable
    public boolean clean() {
        String sortTaskId = this.context.getConfig().getSortTaskId();
        try {
            LOGGER.info("start to clean topic manager, sortTaskId={}", sortTaskId);
            this.stopAssign = true;
            closeAllFetchers();
            closeAllPulsarClients();
            closeAllTubeFactories();
            LOGGER.info("success to clean topic manager, sortTaskId={}", sortTaskId);
            return true;
        } catch (Exception e) {
            LOGGER.error("failed to clean topic manager, sortTaskId={}", sortTaskId, e);
            return false;
        } finally {
            this.fetchers.clear();
            this.pulsarClients.clear();
            this.tubeFactories.clear();
            this.stopAssign = false;
        }
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public TopicFetcher removeTopic(String str) {
        LOGGER.info("start to close fetcher key={} ", str);
        TopicFetcher remove = this.fetchers.remove(str);
        if (remove != null) {
            try {
                remove.close();
                this.context.addTopicOfflineCount(1);
            } catch (Exception e) {
                LOGGER.error("close fetcher failed, key={}", str, e);
            }
        }
        return remove;
    }

    private void closeAllFetchers() {
        this.pool.submit(() -> {
            ((Stream) this.fetchers.keySet().stream().parallel()).forEach(this::removeTopic);
        });
    }

    private void closeAllPulsarClients() {
        this.pool.submit(() -> {
            ((Stream) this.pulsarClients.keySet().stream().parallel()).forEach(this::closePulsarClient);
        });
    }

    private void closeAllTubeFactories() {
        this.pool.submit(() -> {
            ((Stream) this.tubeFactories.keySet().stream().parallel()).forEach(this::closeTubeFactory);
        });
    }

    private TubeConsumerCreator closeTubeFactory(String str) {
        LOGGER.info("start to close tube creator id = {}", str);
        TubeConsumerCreator remove = this.tubeFactories.remove(str);
        if (remove != null) {
            try {
                remove.getMessageSessionFactory().shutdown();
            } catch (Exception e) {
                LOGGER.error("close tube factory failed, client id = {}", str);
            }
        }
        return remove;
    }

    private PulsarClient closePulsarClient(String str) {
        LOGGER.info("start to close pulsar client id = {}", str);
        PulsarClient remove = this.pulsarClients.remove(str);
        if (remove != null) {
            try {
                remove.close();
            } catch (Exception e) {
                LOGGER.error("close pulsar client failed, client id = {}", str);
            }
        }
        return remove;
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public TopicFetcher addTopic(InLongTopic inLongTopic) {
        checkAndOnlineCluster(inLongTopic);
        return onlineNewTopic(inLongTopic);
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public TopicFetcher removeTopic(InLongTopic inLongTopic, boolean z) {
        LOGGER.info("start to remove topicKey={}", inLongTopic.getTopicKey());
        TopicFetcher remove = this.fetchers.remove(inLongTopic.getTopicKey());
        if (remove != null && z) {
            remove.close();
        }
        return remove;
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public TopicFetcher getFetcher(String str) {
        return this.fetchers.get(str);
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public Collection<TopicFetcher> getAllFetchers() {
        return new ArrayList(this.fetchers.values());
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public Set<String> getManagedInLongTopics() {
        return new HashSet(this.fetchers.keySet());
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public void offlineAllTopicsAndPartitions() {
        this.stopAssign = true;
        closeAllFetchers();
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public void close() {
        if (!this.executor.isShutdown()) {
            this.executor.shutdown();
        }
        clean();
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public void restartAssigned() {
        this.stopAssign = false;
    }

    @Override // org.apache.inlong.sdk.sort.api.TopicManager
    public void stopAssigned() {
        this.stopAssign = true;
    }

    private void updateMetaData() {
        LOGGER.debug("InLongTopicManager doWork");
        if (this.stopAssign) {
            LOGGER.warn("assign is stopped");
            return;
        }
        if (this.queryConsumeConfig == null) {
            LOGGER.error("subscribedMetaDataInfo is null");
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        this.context.addRequestManager();
        ConsumeConfig queryCurrentConsumeConfig = this.queryConsumeConfig.queryCurrentConsumeConfig(this.context.getConfig().getSortTaskId());
        if (queryCurrentConsumeConfig != null) {
            this.assignedTopics = new HashSet(this.context.getConfig().getConsumerSubset(queryCurrentConsumeConfig.getTopics()));
            handleUpdatedConsumeConfig();
        } else {
            LOGGER.warn("subscribedInfo is null");
            this.context.addRequestManagerFail(System.currentTimeMillis() - currentTimeMillis);
        }
    }

    private void handleUpdatedConsumeConfig() {
        LOGGER.info("start to handle updated consume config");
        if (CollectionUtils.isEmpty(this.assignedTopics)) {
            LOGGER.warn("assignedTopics is null or empty, do nothing");
            return;
        }
        onlinePulsarClients();
        onlineTubeFactories();
        offlineRemovedTopics();
        onlineNewTopics();
        updateCurrentTopics();
        offlinePulsarClients();
        offlineTubeFactories();
        LOGGER.info("end to handle updated consume config");
    }

    private void offlineTubeFactories() {
        Stream<R> map = getCacheZoneClusters(InlongTopicTypeEnum.TUBE).stream().map((v0) -> {
            return v0.getClusterId();
        });
        Map<String, TubeConsumerCreator> map2 = this.tubeFactories;
        map2.getClass();
        Set set = (Set) map.filter((v1) -> {
            return r1.containsKey(v1);
        }).collect(Collectors.toSet());
        this.pool.submit(() -> {
            ((Stream) new HashSet(this.tubeFactories.keySet()).stream().parallel()).filter(str -> {
                return !set.contains(str);
            }).forEach(this::offlineTubeFactory);
        });
    }

    private void offlineTubeFactory(String str) {
        TubeConsumerCreator remove = this.tubeFactories.remove(str);
        if (remove == null) {
            LOGGER.warn("when close tube client, find no client id={}", str);
            return;
        }
        LOGGER.info("start to close tube clientId={}", str);
        try {
            remove.getMessageSessionFactory().shutdown();
            LOGGER.info("success to close tube clientId={}", str);
        } catch (Exception e) {
            LOGGER.warn("failed to close tube clientId={}", str);
        }
    }

    private void offlinePulsarClients() {
        Stream<R> map = getCacheZoneClusters(InlongTopicTypeEnum.PULSAR).stream().map((v0) -> {
            return v0.getClusterId();
        });
        Map<String, PulsarClient> map2 = this.pulsarClients;
        map2.getClass();
        Set set = (Set) map.filter((v1) -> {
            return r1.containsKey(v1);
        }).collect(Collectors.toSet());
        this.pool.submit(() -> {
            ((Stream) new HashSet(this.pulsarClients.keySet()).stream().parallel()).filter(str -> {
                return !set.contains(str);
            }).forEach(this::offlinePulsarClient);
        });
    }

    private void offlinePulsarClient(String str) {
        PulsarClient remove = this.pulsarClients.remove(str);
        if (remove == null) {
            LOGGER.warn("when close pulsar client, find no client id={}", str);
            return;
        }
        LOGGER.info("start to close pulsar clientId={}", str);
        try {
            remove.close();
            LOGGER.info("success to close pulsar clientId={}", str);
        } catch (Exception e) {
            LOGGER.warn("failed to close pulsar clientId={}", str);
        }
    }

    private void onlineTubeFactories() {
        List list = (List) getCacheZoneClusters(InlongTopicTypeEnum.TUBE).stream().filter(cacheZoneCluster -> {
            return !this.tubeFactories.containsKey(cacheZoneCluster.getClusterId());
        }).collect(Collectors.toList());
        this.pool.submit(() -> {
            ((Stream) list.stream().parallel()).forEach(this::createTubeConsumerCreator);
        });
    }

    private void createTubeConsumerCreator(CacheZoneCluster cacheZoneCluster) {
        LOGGER.info("start to init tube creator for cluster={}", cacheZoneCluster);
        if (cacheZoneCluster.getBootstraps() == null) {
            LOGGER.error("bootstrap is null for cluster={}", cacheZoneCluster);
            return;
        }
        try {
            TubeClientConfig tubeClientConfig = new TubeClientConfig(cacheZoneCluster.getBootstraps());
            TubeConsumerCreator tubeConsumerCreator = new TubeConsumerCreator(new TubeSingleSessionFactory(tubeClientConfig), tubeClientConfig);
            if (this.tubeFactories.putIfAbsent(cacheZoneCluster.getClusterId(), tubeConsumerCreator) != null) {
                LOGGER.warn("close new tube creator for cluster={}", cacheZoneCluster);
                tubeConsumerCreator.getMessageSessionFactory().shutdown();
            }
            LOGGER.info("success to init tube creatorfor cluster={}", cacheZoneCluster);
        } catch (Exception e) {
            LOGGER.error("create tube creator error for cluster={}", cacheZoneCluster, e);
        }
    }

    private void onlinePulsarClients() {
        List list = (List) getCacheZoneClusters(InlongTopicTypeEnum.PULSAR).stream().filter(cacheZoneCluster -> {
            return !this.pulsarClients.containsKey(cacheZoneCluster.getClusterId());
        }).collect(Collectors.toList());
        this.pool.submit(() -> {
            ((Stream) list.stream().parallel()).forEach(this::createPulsarClient);
        });
    }

    private void createPulsarClient(CacheZoneCluster cacheZoneCluster) {
        LOGGER.info("start to init pulsar client for cluster={}", cacheZoneCluster);
        if (cacheZoneCluster.getBootstraps() == null) {
            LOGGER.error("bootstrap is null for cluster={}", cacheZoneCluster);
            return;
        }
        try {
            PulsarClient build = PulsarClient.builder().serviceUrl(cacheZoneCluster.getBootstraps()).authentication(AuthenticationFactory.token(cacheZoneCluster.getToken())).build();
            PulsarClient putIfAbsent = this.pulsarClients.putIfAbsent(cacheZoneCluster.getClusterId(), build);
            if (putIfAbsent != null && !putIfAbsent.isClosed()) {
                LOGGER.warn("close new pulsar client for cluster={}", cacheZoneCluster);
                build.close();
            }
            LOGGER.info("success to init pulsar client for cluster={}", cacheZoneCluster);
        } catch (Exception e) {
            LOGGER.error("create pulsar client error for cluster={}", cacheZoneCluster, e);
        }
    }

    private List<CacheZoneCluster> getCacheZoneClusters(InlongTopicTypeEnum inlongTopicTypeEnum) {
        return (List) this.assignedTopics.stream().filter(inLongTopic -> {
            return inlongTopicTypeEnum.getName().equalsIgnoreCase(inLongTopic.getTopicType());
        }).map((v0) -> {
            return v0.getInLongCluster();
        }).distinct().collect(Collectors.toList());
    }

    private void checkAndOnlineCluster(InLongTopic inLongTopic) {
        String lowerCase = inLongTopic.getTopicType().toLowerCase();
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case -977119363:
                if (lowerCase.equals("pulsar")) {
                    z = false;
                    break;
                }
                break;
            case 3571332:
                if (lowerCase.equals("tube")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (this.pulsarClients.containsKey(inLongTopic.getInLongCluster().getClusterId())) {
                    return;
                }
                createPulsarClient(inLongTopic.getInLongCluster());
                return;
            case true:
                if (this.tubeFactories.containsKey(inLongTopic.getInLongCluster().getClusterId())) {
                    return;
                }
                createTubeConsumerCreator(inLongTopic.getInLongCluster());
                return;
            default:
                LOGGER.error("do not support type={}", inLongTopic.getTopicType());
                return;
        }
    }

    private TopicFetcher onlineNewTopic(InLongTopic inLongTopic) {
        try {
            if (InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
                LOGGER.info("the topic is pulsar {}", inLongTopic);
                return TopicFetcherBuilder.newPulsarBuilder().pulsarClient(this.pulsarClients.get(inLongTopic.getInLongCluster().getClusterId())).topic(inLongTopic).context(this.context).subscribe();
            }
            if (InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
                LOGGER.info("the topic is kafka {}", inLongTopic);
                return TopicFetcherBuilder.newKafkaBuilder().bootstrapServers(inLongTopic.getInLongCluster().getBootstraps()).topic(inLongTopic).context(this.context).subscribe();
            }
            if (InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(inLongTopic.getTopicType())) {
                LOGGER.info("the topic is tube {}", inLongTopic);
                return TopicFetcherBuilder.newTubeBuilder().tubeConsumerCreater(this.tubeFactories.get(inLongTopic.getInLongCluster().getClusterId())).topic(inLongTopic).context(this.context).subscribe();
            }
            LOGGER.error("topic type not support " + inLongTopic.getTopicType());
            return null;
        } catch (Exception e) {
            LOGGER.error("failed to subscribe new topic={}", inLongTopic, e);
            return null;
        }
    }

    private void onlineNewTopics() {
        this.pool.submit(() -> {
            ((Stream) getOnlineTopics().stream().parallel()).forEach(this::addTopic);
        });
    }

    private void updateCurrentTopics() {
        this.pool.submit(() -> {
            ((Stream) getUpdateTopics().stream().parallel()).forEach(this::updateTopic);
        });
    }

    private void updateTopic(InLongTopic inLongTopic) {
        TopicFetcher topicFetcher = this.fetchers.get(inLongTopic.getTopicKey());
        if (topicFetcher == null) {
            LOGGER.warn("when update topic, find no topic={}", inLongTopic);
        } else {
            topicFetcher.updateTopics(Collections.singletonList(inLongTopic));
        }
    }

    private List<InLongTopic> getOnlineTopics() {
        return (List) this.assignedTopics.stream().filter(inLongTopic -> {
            return !this.fetchers.containsKey(inLongTopic.getTopicKey());
        }).distinct().collect(Collectors.toList());
    }

    private void offlineRemovedTopics() {
        this.pool.submit(() -> {
            ((Stream) getOfflineTopics().stream().parallel()).map((v0) -> {
                return v0.getTopicKey();
            }).forEach(this::removeTopic);
        });
    }

    private List<InLongTopic> getOfflineTopics() {
        Stream<R> map = this.assignedTopics.stream().map((v0) -> {
            return v0.getTopicKey();
        });
        Map<String, TopicFetcher> map2 = this.fetchers;
        map2.getClass();
        Set set = (Set) map.filter((v1) -> {
            return r1.containsKey(v1);
        }).collect(Collectors.toSet());
        return (List) this.assignedTopics.stream().filter(inLongTopic -> {
            return !set.contains(inLongTopic.getTopicKey());
        }).distinct().collect(Collectors.toList());
    }

    private List<InLongTopic> getUpdateTopics() {
        return (List) this.assignedTopics.stream().filter(inLongTopic -> {
            return this.fetchers.containsKey(inLongTopic.getTopicKey());
        }).distinct().collect(Collectors.toList());
    }
}
