package org.apache.kafka.connect.runtime.errors;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.MockConnectMetrics;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.EasyMock;
import org.easymock.Mock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*"})
/* loaded from: input_file:org/apache/kafka/connect/runtime/errors/ErrorReporterTest.class */
public class ErrorReporterTest {
    private static final String TOPIC = "test-topic";
    private static final String DLQ_TOPIC = "test-topic-errors";
    private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);

    @Mock
    KafkaProducer<byte[], byte[]> producer;

    @Mock
    Future<RecordMetadata> metadata;

    @Mock
    Plugins plugins;
    private ErrorHandlingMetrics errorHandlingMetrics;
    private MockConnectMetrics metrics;

    @Before
    public void setup() {
        this.metrics = new MockConnectMetrics();
        this.errorHandlingMetrics = new ErrorHandlingMetrics(new ConnectorTaskId("connector-", 1), this.metrics);
    }

    @After
    public void tearDown() {
        if (this.metrics != null) {
            this.metrics.stop();
        }
    }

    @Test(expected = NullPointerException.class)
    public void initializeDLQWithNullMetrics() {
        new DeadLetterQueueReporter(this.producer, config(Collections.emptyMap()), TASK_ID, (ErrorHandlingMetrics) null);
    }

    @Test
    public void testDLQConfigWithEmptyTopicName() {
        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(this.producer, config(Collections.emptyMap()), TASK_ID, this.errorHandlingMetrics);
        ProcessingContext processingContext = processingContext();
        EasyMock.expect(this.producer.send((ProducerRecord) EasyMock.anyObject(), (Callback) EasyMock.anyObject())).andThrow(new RuntimeException());
        EasyMock.replay(new Object[]{this.producer});
        deadLetterQueueReporter.report(processingContext);
    }

    @Test
    public void testDLQConfigWithValidTopicName() {
        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(this.producer, config(Collections.singletonMap("errors.deadletterqueue.topic.name", DLQ_TOPIC)), TASK_ID, this.errorHandlingMetrics);
        ProcessingContext processingContext = processingContext();
        EasyMock.expect(this.producer.send((ProducerRecord) EasyMock.anyObject(), (Callback) EasyMock.anyObject())).andReturn(this.metadata);
        EasyMock.replay(new Object[]{this.producer});
        deadLetterQueueReporter.report(processingContext);
        PowerMock.verifyAll();
    }

    @Test
    public void testReportDLQTwice() {
        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(this.producer, config(Collections.singletonMap("errors.deadletterqueue.topic.name", DLQ_TOPIC)), TASK_ID, this.errorHandlingMetrics);
        ProcessingContext processingContext = processingContext();
        EasyMock.expect(this.producer.send((ProducerRecord) EasyMock.anyObject(), (Callback) EasyMock.anyObject())).andReturn(this.metadata).times(2);
        EasyMock.replay(new Object[]{this.producer});
        deadLetterQueueReporter.report(processingContext);
        deadLetterQueueReporter.report(processingContext);
        PowerMock.verifyAll();
    }

    @Test
    public void testCloseDLQ() {
        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(this.producer, config(Collections.singletonMap("errors.deadletterqueue.topic.name", DLQ_TOPIC)), TASK_ID, this.errorHandlingMetrics);
        this.producer.close();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.producer});
        deadLetterQueueReporter.close();
        PowerMock.verifyAll();
    }

    @Test
    public void testLogOnDisabledLogReporter() {
        LogReporter logReporter = new LogReporter(TASK_ID, config(Collections.emptyMap()), this.errorHandlingMetrics);
        ProcessingContext processingContext = processingContext();
        processingContext.error(new RuntimeException());
        logReporter.report(processingContext);
        assertErrorHandlingMetricValue("total-errors-logged", 0.0d);
    }

    @Test
    public void testLogOnEnabledLogReporter() {
        LogReporter logReporter = new LogReporter(TASK_ID, config(Collections.singletonMap("errors.log.enable", "true")), this.errorHandlingMetrics);
        ProcessingContext processingContext = processingContext();
        processingContext.error(new RuntimeException());
        logReporter.report(processingContext);
        assertErrorHandlingMetricValue("total-errors-logged", 1.0d);
    }

    @Test
    public void testLogMessageWithNoRecords() {
        Assert.assertEquals("Error encountered in task job-0. Executing stage 'KEY_CONVERTER' with class 'org.apache.kafka.connect.json.JsonConverter'.", new LogReporter(TASK_ID, config(Collections.singletonMap("errors.log.enable", "true")), this.errorHandlingMetrics).message(processingContext()));
    }

    @Test
    public void testLogMessageWithSinkRecords() {
        HashMap hashMap = new HashMap();
        hashMap.put("errors.log.enable", "true");
        hashMap.put("errors.log.include.messages", "true");
        Assert.assertEquals("Error encountered in task job-0. Executing stage 'KEY_CONVERTER' with class 'org.apache.kafka.connect.json.JsonConverter', where consumed record is {topic='test-topic', partition=5, offset=100}.", new LogReporter(TASK_ID, config(hashMap), this.errorHandlingMetrics).message(processingContext()));
    }

    @Test
    public void testSetDLQConfigs() {
        Assert.assertEquals(config(Collections.singletonMap("errors.deadletterqueue.topic.name", DLQ_TOPIC)).dlqTopicName(), DLQ_TOPIC);
        Assert.assertEquals(config(Collections.singletonMap("errors.deadletterqueue.topic.replication.factor", "7")).dlqTopicReplicationFactor(), 7L);
    }

    @Test
    public void testDlqHeaderConsumerRecord() {
        HashMap hashMap = new HashMap();
        hashMap.put("errors.deadletterqueue.topic.name", DLQ_TOPIC);
        hashMap.put("errors.deadletterqueue.context.headers.enable", "true");
        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(this.producer, config(hashMap), TASK_ID, this.errorHandlingMetrics);
        ProcessingContext processingContext = new ProcessingContext();
        processingContext.consumerRecord(new ConsumerRecord("source-topic", 7, 10L, "source-key".getBytes(), "source-value".getBytes()));
        processingContext.currentContext(Stage.TRANSFORMATION, Transformation.class);
        processingContext.error(new ConnectException("Test Exception"));
        ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(DLQ_TOPIC, "source-key".getBytes(), "source-value".getBytes());
        deadLetterQueueReporter.populateContextHeaders(producerRecord, processingContext);
        Assert.assertEquals("source-topic", headerValue(producerRecord, "__connect.errors.topic"));
        Assert.assertEquals("7", headerValue(producerRecord, "__connect.errors.partition"));
        Assert.assertEquals("10", headerValue(producerRecord, "__connect.errors.offset"));
        Assert.assertEquals(TASK_ID.connector(), headerValue(producerRecord, "__connect.errors.connector.name"));
        Assert.assertEquals(String.valueOf(TASK_ID.task()), headerValue(producerRecord, "__connect.errors.task.id"));
        Assert.assertEquals(Stage.TRANSFORMATION.name(), headerValue(producerRecord, "__connect.errors.stage"));
        Assert.assertEquals(Transformation.class.getName(), headerValue(producerRecord, "__connect.errors.class.name"));
        Assert.assertEquals(ConnectException.class.getName(), headerValue(producerRecord, "__connect.errors.exception.class.name"));
        Assert.assertEquals("Test Exception", headerValue(producerRecord, "__connect.errors.exception.message"));
        Assert.assertTrue(headerValue(producerRecord, "__connect.errors.exception.stacktrace").length() > 0);
        Assert.assertTrue(headerValue(producerRecord, "__connect.errors.exception.stacktrace").startsWith("org.apache.kafka.connect.errors.ConnectException: Test Exception"));
    }

    @Test
    public void testDlqHeaderOnNullExceptionMessage() {
        HashMap hashMap = new HashMap();
        hashMap.put("errors.deadletterqueue.topic.name", DLQ_TOPIC);
        hashMap.put("errors.deadletterqueue.context.headers.enable", "true");
        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(this.producer, config(hashMap), TASK_ID, this.errorHandlingMetrics);
        ProcessingContext processingContext = new ProcessingContext();
        processingContext.consumerRecord(new ConsumerRecord("source-topic", 7, 10L, "source-key".getBytes(), "source-value".getBytes()));
        processingContext.currentContext(Stage.TRANSFORMATION, Transformation.class);
        processingContext.error(new NullPointerException());
        ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(DLQ_TOPIC, "source-key".getBytes(), "source-value".getBytes());
        deadLetterQueueReporter.populateContextHeaders(producerRecord, processingContext);
        Assert.assertEquals("source-topic", headerValue(producerRecord, "__connect.errors.topic"));
        Assert.assertEquals("7", headerValue(producerRecord, "__connect.errors.partition"));
        Assert.assertEquals("10", headerValue(producerRecord, "__connect.errors.offset"));
        Assert.assertEquals(TASK_ID.connector(), headerValue(producerRecord, "__connect.errors.connector.name"));
        Assert.assertEquals(String.valueOf(TASK_ID.task()), headerValue(producerRecord, "__connect.errors.task.id"));
        Assert.assertEquals(Stage.TRANSFORMATION.name(), headerValue(producerRecord, "__connect.errors.stage"));
        Assert.assertEquals(Transformation.class.getName(), headerValue(producerRecord, "__connect.errors.class.name"));
        Assert.assertEquals(NullPointerException.class.getName(), headerValue(producerRecord, "__connect.errors.exception.class.name"));
        Assert.assertNull(producerRecord.headers().lastHeader("__connect.errors.exception.message").value());
        Assert.assertTrue(headerValue(producerRecord, "__connect.errors.exception.stacktrace").length() > 0);
        Assert.assertTrue(headerValue(producerRecord, "__connect.errors.exception.stacktrace").startsWith("java.lang.NullPointerException"));
    }

    @Test
    public void testDlqHeaderIsAppended() {
        HashMap hashMap = new HashMap();
        hashMap.put("errors.deadletterqueue.topic.name", DLQ_TOPIC);
        hashMap.put("errors.deadletterqueue.context.headers.enable", "true");
        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(this.producer, config(hashMap), TASK_ID, this.errorHandlingMetrics);
        ProcessingContext processingContext = new ProcessingContext();
        processingContext.consumerRecord(new ConsumerRecord("source-topic", 7, 10L, "source-key".getBytes(), "source-value".getBytes()));
        processingContext.currentContext(Stage.TRANSFORMATION, Transformation.class);
        processingContext.error(new ConnectException("Test Exception"));
        ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(DLQ_TOPIC, "source-key".getBytes(), "source-value".getBytes());
        producerRecord.headers().add("__connect.errors.topic", "dummy".getBytes());
        deadLetterQueueReporter.populateContextHeaders(producerRecord, processingContext);
        int i = 0;
        Iterator it = producerRecord.headers().iterator();
        while (it.hasNext()) {
            if ("__connect.errors.topic".equalsIgnoreCase(((Header) it.next()).key())) {
                i++;
            }
        }
        Assert.assertEquals("source-topic", headerValue(producerRecord, "__connect.errors.topic"));
        Assert.assertEquals(2L, i);
    }

    private String headerValue(ProducerRecord<byte[], byte[]> producerRecord, String str) {
        return new String(producerRecord.headers().lastHeader(str).value());
    }

    private ProcessingContext processingContext() {
        ProcessingContext processingContext = new ProcessingContext();
        processingContext.consumerRecord(new ConsumerRecord(TOPIC, 5, 100L, new byte[]{97, 98}, new byte[]{120}));
        processingContext.currentContext(Stage.KEY_CONVERTER, JsonConverter.class);
        return processingContext;
    }

    private SinkConnectorConfig config(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        hashMap.put("name", "test");
        hashMap.put("connector.class", SinkTask.class.getName());
        hashMap.putAll(map);
        return new SinkConnectorConfig(this.plugins, hashMap);
    }

    private void assertErrorHandlingMetricValue(String str, double d) {
        Assert.assertEquals(d, this.metrics.currentMetricValueAsDouble(this.errorHandlingMetrics.metricGroup(), str), 0.001d);
    }
}
