package org.apache.skywalking.apm.plugin.kafka;

import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.kafka.define.KafkaContext;

/* loaded from: input_file:org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.class */
public class KafkaConsumerInterceptor implements InstanceMethodsAroundInterceptor {
    public static final String OPERATE_NAME_PREFIX = "Kafka/";
    public static final String CONSUMER_OPERATE_NAME = "/Consumer/";

    public void beforeMethod(EnhancedInstance enhancedInstance, Method method, Object[] objArr, Class<?>[] clsArr, MethodInterceptResult methodInterceptResult) throws Throwable {
        ((ConsumerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField()).setStartTime(System.currentTimeMillis());
    }

    public Object afterMethod(EnhancedInstance enhancedInstance, Method method, Object[] objArr, Class<?>[] clsArr, Object obj) throws Throwable {
        if (obj == null) {
            return obj;
        }
        Map<TopicPartition, List<ConsumerRecord<?, ?>>> fetchRecords = fetchRecords(obj);
        if (fetchRecords.size() > 0) {
            ConsumerEnhanceRequiredInfo consumerEnhanceRequiredInfo = (ConsumerEnhanceRequiredInfo) enhancedInstance.getSkyWalkingDynamicField();
            KafkaContext kafkaContext = (KafkaContext) ContextManager.getRuntimeContext().get("SW_KAFKA_FLAG");
            if (kafkaContext != null) {
                ContextManager.createEntrySpan(kafkaContext.getOperationName(), (ContextCarrier) null);
                kafkaContext.setNeedStop(true);
            }
            AbstractSpan start = ContextManager.createEntrySpan("Kafka/" + consumerEnhanceRequiredInfo.getTopics() + CONSUMER_OPERATE_NAME + consumerEnhanceRequiredInfo.getGroupId(), (ContextCarrier) null).start(consumerEnhanceRequiredInfo.getStartTime());
            start.setComponent(ComponentsDefine.KAFKA_CONSUMER);
            SpanLayer.asMQ(start);
            Tags.MQ_BROKER.set(start, consumerEnhanceRequiredInfo.getBrokerServers());
            Tags.MQ_TOPIC.set(start, consumerEnhanceRequiredInfo.getTopics());
            start.setPeer(consumerEnhanceRequiredInfo.getBrokerServers());
            Iterator<List<ConsumerRecord<?, ?>>> it = fetchRecords.values().iterator();
            while (it.hasNext()) {
                for (ConsumerRecord<?, ?> consumerRecord : it.next()) {
                    ContextCarrier contextCarrier = new ContextCarrier();
                    CarrierItem items = contextCarrier.items();
                    while (items.hasNext()) {
                        items = items.next();
                        Iterator it2 = consumerRecord.headers().headers(items.getHeadKey()).iterator();
                        if (it2.hasNext()) {
                            items.setHeadValue(new String(((Header) it2.next()).value(), StandardCharsets.UTF_8));
                        }
                    }
                    ContextManager.extract(contextCarrier);
                }
            }
            ContextManager.stopSpan();
        }
        return obj;
    }

    protected Map<TopicPartition, List<ConsumerRecord<?, ?>>> fetchRecords(Object obj) {
        return (Map) obj;
    }

    public void handleMethodException(EnhancedInstance enhancedInstance, Method method, Object[] objArr, Class<?>[] clsArr, Throwable th) {
        if (ContextManager.isActive()) {
            ContextManager.activeSpan().log(th);
        }
    }
}
