package org.apache.kafka.streams.processor.internals;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.SensorAccessor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/SourceNodeTest.class */
public class SourceNodeTest {

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/SourceNodeTest$TheDeserializer.class */
    public static class TheDeserializer implements Deserializer<String> {
        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public String m107deserialize(String str, Headers headers, byte[] bArr) {
            return str + headers + new String(bArr, StandardCharsets.UTF_8);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public String m108deserialize(String str, byte[] bArr) {
            return m107deserialize(str, (Headers) null, bArr);
        }
    }

    @Test
    public void shouldProvideTopicHeadersAndDataToKeyDeserializer() {
        MockSourceNode mockSourceNode = new MockSourceNode(new TheDeserializer(), new TheDeserializer());
        RecordHeaders recordHeaders = new RecordHeaders();
        MatcherAssert.assertThat((String) mockSourceNode.deserializeKey("topic", recordHeaders, "data".getBytes(StandardCharsets.UTF_8)), CoreMatchers.is("topic" + recordHeaders + "data"));
    }

    @Test
    public void shouldProvideTopicHeadersAndDataToValueDeserializer() {
        MockSourceNode mockSourceNode = new MockSourceNode(new TheDeserializer(), new TheDeserializer());
        RecordHeaders recordHeaders = new RecordHeaders();
        MatcherAssert.assertThat((String) mockSourceNode.deserializeValue("topic", recordHeaders, "data".getBytes(StandardCharsets.UTF_8)), CoreMatchers.is("topic" + recordHeaders + "data"));
    }

    @Test
    public void shouldExposeProcessMetrics() {
        Metrics metrics = new Metrics();
        InternalMockProcessorContext internalMockProcessorContext = new InternalMockProcessorContext(new StreamsMetricsImpl(metrics, "test-client", "latest", new MockTime()));
        SourceNode sourceNode = new SourceNode(internalMockProcessorContext.currentNode().name(), new TheDeserializer(), new TheDeserializer());
        sourceNode.init(internalMockProcessorContext);
        String name = Thread.currentThread().getName();
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", name), Utils.mkEntry("task-id", internalMockProcessorContext.taskId().toString()), Utils.mkEntry("processor-node-id", sourceNode.name())});
        Assert.assertTrue(StreamsTestUtils.containsMetric(metrics, "process-rate", "stream-processor-node-metrics", mkMap));
        Assert.assertTrue(StreamsTestUtils.containsMetric(metrics, "process-total", "stream-processor-node-metrics", mkMap));
        mkMap.remove("processor-node-id");
        Assert.assertTrue(StreamsTestUtils.containsMetric(metrics, "process-rate", "stream-task-metrics", mkMap));
        Assert.assertTrue(StreamsTestUtils.containsMetric(metrics, "process-total", "stream-task-metrics", mkMap));
        String str = "internal." + name + ".task." + internalMockProcessorContext.taskId().toString();
        MatcherAssert.assertThat(new SensorAccessor(metrics.getSensor(str + ".node." + internalMockProcessorContext.currentNode().name() + ".s.process")).parents().stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList()), Matchers.contains(new String[]{str + ".s.process"}));
    }
}
