package kafka.catalog;

import io.confluent.telemetry.api.events.EventEmitter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import kafka.catalog.event.CollectorStartupEvent;
import kafka.catalog.event.CollectorStopEvent;
import kafka.catalog.event.MetadataCollectorEvent;
import kafka.catalog.event.TopicConfigChangeEvent;
import kafka.catalog.event.TopicCreationEvent;
import kafka.catalog.event.TopicDeletionEvent;
import kafka.catalog.event.TopicPartitionChangeEvent;
import kafka.common.TenantHelpers;
import kafka.log.LogConfig;
import kafka.server.KafkaConfig;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;

/* loaded from: input_file:kafka/catalog/ZKTopicMetadataCollector.class */
public class ZKTopicMetadataCollector {
    private final Logger log;
    private final ZKTopicMetadataCollectorConfig config;
    private final KafkaZkClient zkClient;
    private final KafkaConfig kafkaConfig;
    private final Metrics metrics;
    private final MetadataCollectorEventQueue eventQueue;
    private final Time time;
    private volatile Optional<ZKTopicMetadataCollectorContext> context;
    private final MetricName activeCollectorMetricName;

    public ZKTopicMetadataCollector(KafkaConfig kafkaConfig, KafkaZkClient kafkaZkClient, Metrics metrics, Time time) {
        this(kafkaConfig, new ZKTopicMetadataCollectorConfig(kafkaConfig), kafkaZkClient, metrics, new LogContext("[ZKTopicMetadataCollector id=" + kafkaConfig.nodeId() + "]").logger(ZKTopicMetadataCollector.class.getClass()), new MetadataCollectorEventQueue(time), Optional.empty(), time);
    }

    ZKTopicMetadataCollector(KafkaConfig kafkaConfig, ZKTopicMetadataCollectorConfig zKTopicMetadataCollectorConfig, KafkaZkClient kafkaZkClient, Metrics metrics, Logger logger, MetadataCollectorEventQueue metadataCollectorEventQueue, Optional<ZKTopicMetadataCollectorContext> optional, Time time) {
        this.kafkaConfig = kafkaConfig;
        this.config = zKTopicMetadataCollectorConfig;
        this.zkClient = kafkaZkClient;
        this.metrics = metrics;
        this.log = logger;
        this.eventQueue = metadataCollectorEventQueue;
        this.context = optional;
        this.time = time;
        this.activeCollectorMetricName = this.metrics.metricName(CatalogMetrics.ACTIVE_COLLECTOR, CatalogMetrics.GROUP_NAME, CatalogMetrics.ACTIVE_COLLECTOR_DOC);
        registerMetric();
        logger.info("Constructed, snapshot init delay {}s, interval {}s", Integer.valueOf(this.config.snapshotInitDelaySec), Integer.valueOf(this.config.snapshotIntervalSec));
    }

    public void start(Map<String, TopicInfo> map, int i) {
        tryExecute(() -> {
            this.eventQueue.append(new CollectorStartupEvent(this, this.config, map, this.eventQueue, this.metrics, this.zkClient, this.kafkaConfig, i, this.time));
        }, exc -> {
            this.log.error("Failed to start due to", exc);
            stop();
        });
    }

    public void stop() {
        tryExecute(() -> {
            appendToQueue(new CollectorStopEvent(this, this.time));
        }, exc -> {
            this.log.error("Failed to stop due to", exc);
        });
    }

    public void shutdown() {
        try {
            stop();
            this.eventQueue.close();
            this.log.info("Finished shutdown.");
        } catch (Exception e) {
            this.log.error("Failed to shutdown due to", e);
        } finally {
            removeMetric();
        }
    }

    public boolean isActive() {
        return this.context.isPresent();
    }

    public Optional<ZKTopicMetadataCollectorContext> collectorContext() {
        return this.context;
    }

    public void setCollectorContext(Optional<ZKTopicMetadataCollectorContext> optional) {
        this.context = optional;
    }

    EventEmitter eventEmitter() {
        return this.metrics.eventEmitter();
    }

    public void onTopicCreate(Map<String, TopicInfo> map) {
        tryExecute(() -> {
            Map map2 = (Map) map.entrySet().stream().filter(entry -> {
                return !Objects.equals(((TopicInfo) entry.getValue()).logicalClusterId(), "");
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
            if (map2.size() > 0) {
                appendToQueue(new TopicCreationEvent(this, new HashMap(map2), this.time));
            }
        }, exc -> {
            this.log.error("Failed to process topic creation due to", exc);
        });
    }

    public void onTopicDelete(Set<String> set) {
        tryExecute(() -> {
            Set set2 = (Set) set.stream().filter(TenantHelpers::isTenantPrefixed).collect(Collectors.toSet());
            if (set2.size() > 0) {
                appendToQueue(new TopicDeletionEvent(this, new HashSet(set2), this.time));
            }
        }, exc -> {
            this.log.error("Failed to process topic deletion due to", exc);
        });
    }

    public void onTopicPartitionChange(String str, int i) {
        tryExecute(() -> {
            String extractTenantPrefix = TenantHelpers.extractTenantPrefix(str, false);
            if (extractTenantPrefix == null) {
                return;
            }
            appendToQueue(new TopicPartitionChangeEvent(this, extractTenantPrefix, str, i, this.time));
        }, exc -> {
            this.log.error("Failed to process topic partition change due to", exc);
        });
    }

    public void onTopicConfigChange(String str, LogConfig logConfig) {
        tryExecute(() -> {
            String extractTenantPrefix = TenantHelpers.extractTenantPrefix(str, false);
            if (extractTenantPrefix == null) {
                return;
            }
            appendToQueue(new TopicConfigChangeEvent(this, extractTenantPrefix, str, logConfig, this.time));
        }, exc -> {
            this.log.error("Failed to process topic config change due to", exc);
        });
    }

    private void appendToQueue(MetadataCollectorEvent metadataCollectorEvent) {
        try {
            this.eventQueue.append(metadataCollectorEvent);
        } catch (IllegalStateException e) {
            this.log.warn("Event {} will be ignore because the EventQueue is closing due to {}.", metadataCollectorEvent, e);
        }
    }

    private void registerMetric() {
        this.metrics.addMetric(this.activeCollectorMetricName, (metricConfig, j) -> {
            return isActive() ? 1.0d : 0.0d;
        });
    }

    private void removeMetric() {
        this.metrics.removeMetric(this.activeCollectorMetricName);
    }

    private void tryExecute(Runnable runnable, Consumer<Exception> consumer) {
        try {
            runnable.run();
        } catch (Exception e) {
            consumer.accept(e);
        }
    }
}
