package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Properties;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNodeTest.class */
public class ProcessorNodeTest {

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNodeTest$ClassCastProcessor.class */
    private static class ClassCastProcessor extends ExceptionalProcessor {
        private ClassCastProcessor() {
            super();
        }

        @Override // org.apache.kafka.streams.processor.internals.ProcessorNodeTest.ExceptionalProcessor
        public void init(ProcessorContext processorContext) {
        }

        @Override // org.apache.kafka.streams.processor.internals.ProcessorNodeTest.ExceptionalProcessor
        public void process(Object obj, Object obj2) {
            throw new ClassCastException("Incompatible types simulation exception.");
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNodeTest$ExceptionalProcessor.class */
    private static class ExceptionalProcessor implements Processor<Object, Object> {
        private ExceptionalProcessor() {
        }

        public void init(ProcessorContext processorContext) {
            throw new RuntimeException();
        }

        public void process(Object obj, Object obj2) {
            throw new RuntimeException();
        }

        public void close() {
            throw new RuntimeException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorNodeTest$NoOpProcessor.class */
    public static class NoOpProcessor implements Processor<Object, Object> {
        private NoOpProcessor() {
        }

        public void init(ProcessorContext processorContext) {
        }

        public void process(Object obj, Object obj2) {
        }

        public void close() {
        }
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
        new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet()).init((InternalProcessorContext) null);
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
        ProcessorNode processorNode = new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet());
        processorNode.init((InternalProcessorContext) null);
        processorNode.close();
    }

    @Test
    public void testMetricsWithBuiltInMetricsVersionLatest() {
        testMetrics("latest");
    }

    @Test
    public void testMetricsWithBuiltInMetricsVersion0100To24() {
        testMetrics("0.10.0-2.4");
    }

    private void testMetrics(String str) {
        Metrics metrics = new Metrics();
        InternalMockProcessorContext internalMockProcessorContext = new InternalMockProcessorContext(new StreamsMetricsImpl(metrics, "test-client", str, new MockTime()));
        ProcessorNode processorNode = new ProcessorNode("name", new NoOpProcessor(), Collections.emptySet());
        processorNode.init(internalMockProcessorContext);
        String name = Thread.currentThread().getName();
        String[] strArr = {"process", "punctuate", "create", "destroy"};
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        String str2 = "0.10.0-2.4".equals(str) ? "client-id" : "thread-id";
        linkedHashMap.put("processor-node-id", processorNode.name());
        linkedHashMap.put("task-id", internalMockProcessorContext.taskId().toString());
        linkedHashMap.put(str2, name);
        if ("0.10.0-2.4".equals(str)) {
            for (String str3 : strArr) {
                Assert.assertTrue(StreamsTestUtils.containsMetric(metrics, str3 + "-latency-avg", "stream-processor-node-metrics", linkedHashMap));
                Assert.assertTrue(StreamsTestUtils.containsMetric(metrics, str3 + "-latency-max", "stream-processor-node-metrics", linkedHashMap));
                Assert.assertTrue(StreamsTestUtils.containsMetric(metrics, str3 + "-rate", "stream-processor-node-metrics", linkedHashMap));
                Assert.assertTrue(StreamsTestUtils.containsMetric(metrics, str3 + "-total", "stream-processor-node-metrics", linkedHashMap));
            }
            linkedHashMap.put("processor-node-id", "all");
            for (String str4 : strArr) {
                Assert.assertTrue(StreamsTestUtils.containsMetric(metrics, str4 + "-latency-avg", "stream-processor-node-metrics", linkedHashMap));
                Assert.assertTrue(StreamsTestUtils.containsMetric(metrics, str4 + "-latency-max", "stream-processor-node-metrics", linkedHashMap));
                Assert.assertTrue(StreamsTestUtils.containsMetric(metrics, str4 + "-rate", "stream-processor-node-metrics", linkedHashMap));
                Assert.assertTrue(StreamsTestUtils.containsMetric(metrics, str4 + "-total", "stream-processor-node-metrics", linkedHashMap));
            }
            return;
        }
        for (String str5 : strArr) {
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str5 + "-latency-avg", "stream-processor-node-metrics", linkedHashMap));
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str5 + "-latency-max", "stream-processor-node-metrics", linkedHashMap));
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str5 + "-rate", "stream-processor-node-metrics", linkedHashMap));
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str5 + "-total", "stream-processor-node-metrics", linkedHashMap));
        }
        linkedHashMap.put("processor-node-id", "all");
        for (String str6 : strArr) {
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str6 + "-latency-avg", "stream-processor-node-metrics", linkedHashMap));
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str6 + "-latency-max", "stream-processor-node-metrics", linkedHashMap));
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str6 + "-rate", "stream-processor-node-metrics", linkedHashMap));
            Assert.assertFalse(StreamsTestUtils.containsMetric(metrics, str6 + "-total", "stream-processor-node-metrics", linkedHashMap));
        }
    }

    @Test
    public void testTopologyLevelClassCastException() {
        Properties properties = new Properties();
        properties.put("application.id", "test");
        properties.put("bootstrap.servers", "dummy:1234");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("streams-plaintext-input").flatMapValues(str -> {
            return Collections.singletonList("");
        });
        TestInputTopic createInputTopic = new TopologyTestDriver(streamsBuilder.build(), properties).createInputTopic("streams-plaintext-input", new StringSerializer(), new StringSerializer());
        String message = Assert.assertThrows(StreamsException.class, () -> {
            createInputTopic.pipeInput("a-key", "a value");
        }).getMessage();
        Assert.assertTrue("Error about class cast with serdes", message.contains("ClassCastException"));
        Assert.assertTrue("Error about class cast with serdes", message.contains("Serdes"));
    }

    @Test
    public void testTopologyLevelClassCastExceptionDirect() {
        InternalMockProcessorContext internalMockProcessorContext = new InternalMockProcessorContext(new StreamsMetricsImpl(new Metrics(), "test-client", "latest", new MockTime()));
        ProcessorNode processorNode = new ProcessorNode("name", new ClassCastProcessor(), Collections.emptySet());
        processorNode.init(internalMockProcessorContext);
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            processorNode.process(new Record("aKey", "aValue", 0L));
        });
        MatcherAssert.assertThat(assertThrows.getCause(), CoreMatchers.instanceOf(ClassCastException.class));
        MatcherAssert.assertThat(assertThrows.getMessage(), CoreMatchers.containsString("default Serdes"));
        MatcherAssert.assertThat(assertThrows.getMessage(), CoreMatchers.containsString("input types"));
    }
}
