package org.apache.kafka.connect.runtime.tracing;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorConfigTest;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.TopicAdmin;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/kafka/connect/runtime/tracing/ConnectTracerTest.class */
public class ConnectTracerTest {
    private static final String TRACE_TOPIC_NAME = "Topic";
    public static final TraceRecord STRING_KEY_VALUE_TRACE_RECORD = new TraceRecord(TRACE_TOPIC_NAME, 0, Schema.STRING_SCHEMA, "Key", Schema.STRING_SCHEMA, "Value", (Long) null, (Iterable) null);

    /* loaded from: input_file:org/apache/kafka/connect/runtime/tracing/ConnectTracerTest$TestRetriableException.class */
    private static class TestRetriableException extends RetriableException {
        private TestRetriableException() {
        }
    }

    private Map<String, String> createDefaultConfigs() {
        HashMap hashMap = new HashMap(TracerConfigTest.TEST_CONNECTOR_CONFIGS);
        hashMap.put("trace.records.enable", "true");
        hashMap.put("trace.records.topic", TRACE_TOPIC_NAME);
        return hashMap;
    }

    private Producer<byte[], byte[]> getMockProducerWithSendAnswer(Answer<?> answer) {
        Producer<byte[], byte[]> producer = (Producer) Mockito.mock(Producer.class);
        Mockito.when(producer.send((ProducerRecord) ArgumentMatchers.any(), (Callback) ArgumentMatchers.any())).then(answer);
        return producer;
    }

    private TopicAdmin getMockTopicAdminWithCreateAnswer(Answer<?> answer) {
        TopicAdmin topicAdmin = (TopicAdmin) Mockito.mock(TopicAdmin.class);
        Mockito.when(Boolean.valueOf(topicAdmin.createOrFindTopic((NewTopic) ArgumentMatchers.any()))).then(answer);
        return topicAdmin;
    }

    private TracerConfig getMockTracerConfig() {
        return new TracerConfig(ConnectorConfigTest.MOCK_PLUGINS, new ConnectorConfig(ConnectorConfigTest.MOCK_PLUGINS, createDefaultConfigs()));
    }

    private void checkProducerRecord(Converter converter, Converter converter2, ProducerRecord<byte[], byte[]> producerRecord) {
        Assert.assertEquals(TRACE_TOPIC_NAME, producerRecord.topic());
        Assert.assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "Key").toString(), converter.toConnectData(TRACE_TOPIC_NAME, (byte[]) producerRecord.key()).toString());
        Assert.assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "Value").toString(), converter2.toConnectData(TRACE_TOPIC_NAME, (byte[]) producerRecord.value()).toString());
    }

    @Test
    public void testWriteTraceRecordWithTopicCreation() {
        TracerConfig tracerConfig = new TracerConfig(ConnectorConfigTest.MOCK_PLUGINS, new ConnectorConfig(ConnectorConfigTest.MOCK_PLUGINS, createDefaultConfigs()));
        TopicAdmin mockTopicAdminWithCreateAnswer = getMockTopicAdminWithCreateAnswer(invocationOnMock -> {
            Assert.assertEquals(TRACE_TOPIC_NAME, ((NewTopic) invocationOnMock.getArgument(0)).name());
            Assert.assertEquals(TracerConfig.TOPIC_PARTITION_DEFAULT.intValue(), r0.numPartitions());
            Assert.assertEquals(TracerConfig.TOPIC_REPLICATION_FACTOR_DEFAULT.shortValue(), r0.replicationFactor());
            Assert.assertEquals(0L, r0.configs().size());
            return null;
        });
        new ConnectTracer(new ConnectorTaskId("name", 0), tracerConfig, getMockProducerWithSendAnswer(invocationOnMock2 -> {
            checkProducerRecord(tracerConfig.keyConverter(), tracerConfig.valueConverter(), (ProducerRecord) invocationOnMock2.getArgument(0));
            return null;
        }), mockTopicAdminWithCreateAnswer).writeTraceRecord(STRING_KEY_VALUE_TRACE_RECORD, (Callback) null);
        ((TopicAdmin) Mockito.verify(mockTopicAdminWithCreateAnswer)).createOrFindTopic((NewTopic) ArgumentMatchers.any());
    }

    @Test
    public void testWriteTracingFailureWithSuccessfulError() {
        TracerConfig tracerConfig = (TracerConfig) Mockito.spy(new TracerConfig(ConnectorConfigTest.MOCK_PLUGINS, new ConnectorConfig(ConnectorConfigTest.MOCK_PLUGINS, createDefaultConfigs())));
        TraceRecordBuilder traceRecordBuilder = (TraceRecordBuilder) Mockito.mock(TraceRecordBuilder.class);
        Mockito.when(traceRecordBuilder.createTracingError((TraceRecord) ArgumentMatchers.any(), (Throwable) ArgumentMatchers.any())).thenReturn(STRING_KEY_VALUE_TRACE_RECORD);
        Producer<byte[], byte[]> mockProducerWithSendAnswer = getMockProducerWithSendAnswer(invocationOnMock -> {
            checkProducerRecord(tracerConfig.keyConverter(), tracerConfig.valueConverter(), (ProducerRecord) invocationOnMock.getArgument(0));
            return null;
        });
        Tracer tracer = (Tracer) Mockito.spy(new ConnectTracer(new ConnectorTaskId("name", 0), tracerConfig, mockProducerWithSendAnswer, getMockTopicAdminWithCreateAnswer(invocationOnMock2 -> {
            return null;
        })));
        Mockito.when(tracer.traceRecordBuilder()).thenReturn(traceRecordBuilder);
        tracer.writeTraceRecord(new TraceRecord((String) null, 0, (Schema) null, (Object) null, (Schema) null, (Object) null, (Long) null, (Iterable) null), (Callback) null);
        ((Producer) Mockito.verify(mockProducerWithSendAnswer)).send((ProducerRecord) ArgumentMatchers.any(), (Callback) ArgumentMatchers.any());
    }

    @Test(expected = ConnectorTracingException.class)
    public void testWriteTracingFailureWithErrorFailure() {
        TracerConfig tracerConfig = (TracerConfig) Mockito.spy(new TracerConfig(ConnectorConfigTest.MOCK_PLUGINS, new ConnectorConfig(ConnectorConfigTest.MOCK_PLUGINS, createDefaultConfigs())));
        TraceRecord traceRecord = new TraceRecord((String) null, (Integer) null, (Schema) null, (Object) null, (Schema) null, (Object) null, (Long) null, (Iterable) null);
        TraceRecordBuilder traceRecordBuilder = (TraceRecordBuilder) Mockito.mock(TraceRecordBuilder.class);
        Mockito.when(traceRecordBuilder.createTracingError((TraceRecord) ArgumentMatchers.any(), (Throwable) ArgumentMatchers.any())).thenReturn(traceRecord);
        Tracer tracer = (Tracer) Mockito.spy(new ConnectTracer(new ConnectorTaskId("name", 0), tracerConfig, (Producer) Mockito.mock(Producer.class), getMockTopicAdminWithCreateAnswer(invocationOnMock -> {
            return null;
        })));
        Mockito.when(tracer.traceRecordBuilder()).thenReturn(traceRecordBuilder);
        tracer.writeTraceRecord(traceRecord, (Callback) null);
    }

    @Test
    public void testBuildAndWriteRecords() {
        TracerConfig tracerConfig = (TracerConfig) Mockito.spy(new TracerConfig(ConnectorConfigTest.MOCK_PLUGINS, new ConnectorConfig(ConnectorConfigTest.MOCK_PLUGINS, createDefaultConfigs())));
        Producer<byte[], byte[]> mockProducerWithSendAnswer = getMockProducerWithSendAnswer(invocationOnMock -> {
            checkProducerRecord(tracerConfig.keyConverter(), tracerConfig.valueConverter(), (ProducerRecord) invocationOnMock.getArgument(0));
            return null;
        });
        TopicAdmin mockTopicAdminWithCreateAnswer = getMockTopicAdminWithCreateAnswer(invocationOnMock2 -> {
            return null;
        });
        TraceRecordBuilder traceRecordBuilder = (TraceRecordBuilder) Mockito.mock(TraceRecordBuilder.class);
        Mockito.when(traceRecordBuilder.build()).thenReturn(Arrays.asList(STRING_KEY_VALUE_TRACE_RECORD, STRING_KEY_VALUE_TRACE_RECORD));
        Tracer tracer = (Tracer) Mockito.spy(new ConnectTracer(new ConnectorTaskId("name", 0), tracerConfig, mockProducerWithSendAnswer, mockTopicAdminWithCreateAnswer));
        Mockito.when(tracer.traceRecordBuilder()).thenReturn(traceRecordBuilder);
        tracer.buildAndWriteRecords();
        ((Producer) Mockito.verify(mockProducerWithSendAnswer, Mockito.times(2))).send((ProducerRecord) ArgumentMatchers.any(), (Callback) ArgumentMatchers.any());
    }

    @Test
    public void testTransformationsOnRecords() {
        Map<String, String> createDefaultConfigs = createDefaultConfigs();
        createDefaultConfigs.put("trace.records.transforms", "topicChange");
        createDefaultConfigs.put("trace.records.transforms.topicChange.type", "org.apache.kafka.connect.transforms.RegexRouter");
        createDefaultConfigs.put("trace.records.transforms.topicChange.regex", ".*");
        createDefaultConfigs.put("trace.records.transforms.topicChange.replacement", "replaced-topic");
        TracerConfig tracerConfig = new TracerConfig(ConnectorConfigTest.MOCK_PLUGINS, new ConnectorConfig(ConnectorConfigTest.MOCK_PLUGINS, createDefaultConfigs));
        TopicAdmin mockTopicAdminWithCreateAnswer = getMockTopicAdminWithCreateAnswer(invocationOnMock -> {
            return null;
        });
        Producer<byte[], byte[]> mockProducerWithSendAnswer = getMockProducerWithSendAnswer(invocationOnMock2 -> {
            Assert.assertEquals("replaced-topic", ((ProducerRecord) invocationOnMock2.getArgument(0)).topic());
            return null;
        });
        new ConnectTracer(new ConnectorTaskId("name", 0), tracerConfig, mockProducerWithSendAnswer, mockTopicAdminWithCreateAnswer).writeTraceRecord(STRING_KEY_VALUE_TRACE_RECORD, (Callback) null);
        ((Producer) Mockito.verify(mockProducerWithSendAnswer)).send((ProducerRecord) ArgumentMatchers.any(), (Callback) ArgumentMatchers.any());
    }

    @Test
    public void testExistingOrCreatedTopicIsCached() {
        TracerConfig mockTracerConfig = getMockTracerConfig();
        Producer<byte[], byte[]> mockProducerWithSendAnswer = getMockProducerWithSendAnswer(invocationOnMock -> {
            checkProducerRecord(mockTracerConfig.keyConverter(), mockTracerConfig.valueConverter(), (ProducerRecord) invocationOnMock.getArgument(0));
            return null;
        });
        TopicAdmin mockTopicAdminWithCreateAnswer = getMockTopicAdminWithCreateAnswer(invocationOnMock2 -> {
            return true;
        });
        new ConnectTracer(new ConnectorTaskId("name", 0), mockTracerConfig, mockProducerWithSendAnswer, mockTopicAdminWithCreateAnswer).writeTraceRecords(Arrays.asList(STRING_KEY_VALUE_TRACE_RECORD, STRING_KEY_VALUE_TRACE_RECORD), (Callback) null);
        ((TopicAdmin) Mockito.verify(mockTopicAdminWithCreateAnswer, Mockito.times(1))).createOrFindTopic((NewTopic) ArgumentMatchers.any());
    }

    @Test
    public void testRetriableTopicCreationIsRetried() {
        TracerConfig mockTracerConfig = getMockTracerConfig();
        Producer<byte[], byte[]> mockProducerWithSendAnswer = getMockProducerWithSendAnswer(invocationOnMock -> {
            checkProducerRecord(mockTracerConfig.keyConverter(), mockTracerConfig.valueConverter(), (ProducerRecord) invocationOnMock.getArgument(0));
            return null;
        });
        TopicAdmin topicAdmin = (TopicAdmin) Mockito.mock(TopicAdmin.class);
        Mockito.when(Boolean.valueOf(topicAdmin.createOrFindTopic((NewTopic) ArgumentMatchers.any()))).thenThrow(new Throwable[]{new ConnectException("", new TestRetriableException())});
        new ConnectTracer(new ConnectorTaskId("name", 0), mockTracerConfig, mockProducerWithSendAnswer, topicAdmin).writeTraceRecords(Arrays.asList(STRING_KEY_VALUE_TRACE_RECORD, STRING_KEY_VALUE_TRACE_RECORD), (Callback) null);
        ((TopicAdmin) Mockito.verify(topicAdmin, Mockito.times(2))).createOrFindTopic((NewTopic) ArgumentMatchers.any());
    }

    @Test
    public void testFailedTopicCreationAndCaching() {
        TracerConfig mockTracerConfig = getMockTracerConfig();
        Producer<byte[], byte[]> mockProducerWithSendAnswer = getMockProducerWithSendAnswer(invocationOnMock -> {
            checkProducerRecord(mockTracerConfig.keyConverter(), mockTracerConfig.valueConverter(), (ProducerRecord) invocationOnMock.getArgument(0));
            return null;
        });
        TopicAdmin topicAdmin = (TopicAdmin) Mockito.mock(TopicAdmin.class);
        Mockito.when(Boolean.valueOf(topicAdmin.createOrFindTopic((NewTopic) ArgumentMatchers.any()))).thenThrow(new Throwable[]{new ConnectException("", new RuntimeException())});
        new ConnectTracer(new ConnectorTaskId("name", 0), mockTracerConfig, mockProducerWithSendAnswer, topicAdmin).writeTraceRecords(Arrays.asList(STRING_KEY_VALUE_TRACE_RECORD, STRING_KEY_VALUE_TRACE_RECORD), (Callback) null);
        ((TopicAdmin) Mockito.verify(topicAdmin, Mockito.times(1))).createOrFindTopic((NewTopic) ArgumentMatchers.any());
    }
}
