package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;

import com.google.gson.JsonObject;
import java.util.ArrayList;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.network.management.v3.InstancePingPkg;
import org.apache.skywalking.apm.network.management.v3.InstanceProperties;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.ServiceInstanceUpdate;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.class */
public class ServiceManagementHandler implements KafkaHandler {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ServiceManagementHandler.class);
    private final SourceReceiver sourceReceiver;
    private final NamingControl namingLengthControl;
    private final KafkaFetcherConfig config;

    public ServiceManagementHandler(ModuleManager moduleManager, KafkaFetcherConfig kafkaFetcherConfig) {
        this.sourceReceiver = moduleManager.find("core").provider().getService(SourceReceiver.class);
        this.namingLengthControl = moduleManager.find("core").provider().getService(NamingControl.class);
        this.config = kafkaFetcherConfig;
    }

    @Override // org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.KafkaHandler
    public void handle(ConsumerRecord<String, Bytes> consumerRecord) {
        try {
            if (((String) consumerRecord.key()).startsWith("register-")) {
                serviceReportProperties(InstanceProperties.parseFrom(((Bytes) consumerRecord.value()).get()));
            } else {
                keepAlive(InstancePingPkg.parseFrom(((Bytes) consumerRecord.value()).get()));
            }
        } catch (Exception e) {
            log.error("handle record failed", e);
        }
    }

    private final void serviceReportProperties(InstanceProperties instanceProperties) {
        ServiceInstanceUpdate serviceInstanceUpdate = new ServiceInstanceUpdate();
        String formatServiceName = this.namingLengthControl.formatServiceName(instanceProperties.getService());
        String formatInstanceName = this.namingLengthControl.formatInstanceName(instanceProperties.getServiceInstance());
        serviceInstanceUpdate.setServiceId(IDManager.ServiceID.buildId(formatServiceName, NodeType.Normal));
        serviceInstanceUpdate.setName(formatInstanceName);
        if (log.isDebugEnabled()) {
            log.debug("Service[{}] instance[{}] registered.", formatServiceName, formatInstanceName);
        }
        JsonObject jsonObject = new JsonObject();
        ArrayList arrayList = new ArrayList();
        instanceProperties.getPropertiesList().forEach(keyStringValuePair -> {
            if ("ipv4".equals(keyStringValuePair.getKey())) {
                arrayList.add(keyStringValuePair.getValue());
            } else {
                jsonObject.addProperty(keyStringValuePair.getKey(), keyStringValuePair.getValue());
            }
        });
        jsonObject.addProperty("ipv4s", (String) arrayList.stream().collect(Collectors.joining(",")));
        serviceInstanceUpdate.setProperties(jsonObject);
        serviceInstanceUpdate.setTimeBucket(TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute));
        this.sourceReceiver.receive(serviceInstanceUpdate);
    }

    private final void keepAlive(InstancePingPkg instancePingPkg) {
        long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute);
        String formatServiceName = this.namingLengthControl.formatServiceName(instancePingPkg.getService());
        String formatInstanceName = this.namingLengthControl.formatInstanceName(instancePingPkg.getServiceInstance());
        if (log.isDebugEnabled()) {
            log.debug("A ping of Service[{}] instance[{}].", formatServiceName, formatInstanceName);
        }
        ServiceInstanceUpdate serviceInstanceUpdate = new ServiceInstanceUpdate();
        serviceInstanceUpdate.setServiceId(IDManager.ServiceID.buildId(formatServiceName, NodeType.Normal));
        serviceInstanceUpdate.setName(formatInstanceName);
        serviceInstanceUpdate.setTimeBucket(timeBucket);
        this.sourceReceiver.receive(serviceInstanceUpdate);
    }

    @Override // org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.KafkaHandler
    public String getTopic() {
        return this.config.getMm2SourceAlias() + this.config.getMm2SourceSeparator() + this.config.getTopicNameOfManagements();
    }

    @Override // org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.KafkaHandler
    public String getConsumePartitions() {
        return this.config.getConsumePartitions();
    }
}
