package org.apache.kafka.connect.runtime.tracing;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.integration.MonitorableSourceConnector;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorConfigTest;
import org.apache.kafka.connect.runtime.WorkerConnectorTest;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Cast;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/connect/runtime/tracing/TraceRecordBuilderImplTest.class */
public class TraceRecordBuilderImplTest {
    private static final String TOPIC_NAME = "myTopic";
    private TracerConfig config;
    private TracingContext tracingContext;
    public static final Map<String, String> TEST_CONNECTOR_CONFIGS = new HashMap();
    private static final Map<String, ?> SOURCE_PARTITION = Collections.singletonMap("src", "abc");
    private static final Map<String, ?> SOURCE_OFFSET = Collections.singletonMap("offset", "1");
    private static final Integer PARTITION_NUMBER = 0;
    private static final Long KAFKA_TIMESTAMP = 0L;
    private static final TimestampType TS_TYPE = TimestampType.CREATE_TIME;
    private static final Schema SAMPLE_RECORDS_VALUE_SCHEMA = SchemaBuilder.struct().name("sample").field("field1", Schema.STRING_SCHEMA).field("field2", Schema.INT32_SCHEMA).build();
    private static final Struct SAMPLE_RECORD_VALUE = new Struct(SAMPLE_RECORDS_VALUE_SCHEMA).put("field1", "abc").put("field2", 123);

    @Before
    public void setUp() {
        this.config = prepareTraceConfig();
        this.tracingContext = prepareTraceContext();
    }

    @Test
    public void appendRecord() {
        List build = new TraceRecordBuilderImpl(this.config, this.tracingContext).appendRecord(sampleSourceRecord()).build();
        Assert.assertEquals(1L, build.size());
        TraceRecord traceRecord = (TraceRecord) build.get(0);
        Schema valueSchema = traceRecord.valueSchema();
        Object value = traceRecord.value();
        assertErrorAbsentAndRecordPresent(valueSchema);
        Assert.assertTrue(value instanceof Struct);
        assertRecordFields((Struct) value);
        Object obj = ((Struct) value).get("metadata");
        Assert.assertTrue(obj instanceof Struct);
        Assert.assertEquals(Collections.singletonMap("src", "abc"), ((Struct) obj).get("source_partition"));
        Assert.assertEquals(Collections.singletonMap("offset", "1"), ((Struct) obj).get("source_offset"));
        Assert.assertEquals(TOPIC_NAME, ((Struct) obj).get(MonitorableSourceConnector.TOPIC_CONFIG));
        Assert.assertEquals(0, ((Struct) obj).get("partition"));
        Assert.assertEquals("SOURCE", ((Struct) obj).get("type"));
    }

    @Test
    public void appendTransformedRecord() {
        TraceRecordBuilderImpl traceRecordBuilderImpl = new TraceRecordBuilderImpl(this.config, this.tracingContext);
        SourceRecord sampleSourceRecord = sampleSourceRecord();
        List build = traceRecordBuilderImpl.appendTransformedRecord("testTransform", Cast.class, sampleSourceRecord, sampleSourceRecord).build();
        Assert.assertEquals(1L, build.size());
        TraceRecord traceRecord = (TraceRecord) build.get(0);
        Schema valueSchema = traceRecord.valueSchema();
        Object value = traceRecord.value();
        assertErrorAbsentAndRecordPresent(valueSchema);
        Assert.assertTrue(value instanceof Struct);
        assertRecordFields((Struct) value);
        Object obj = ((Struct) value).get("metadata");
        Assert.assertTrue(obj instanceof Struct);
        assertMetadataTransformations((Struct) obj);
        Assert.assertEquals("testTransform", ((Struct) obj).get("transformation_name"));
        Assert.assertEquals("org.apache.kafka.connect.transforms.Cast", ((Struct) obj).get("transformation_type"));
    }

    @Test
    public void appendTransformationError() {
        List build = new TraceRecordBuilderImpl(this.config, this.tracingContext).appendTransformationError("testTransform", Cast.class, new Exception("test exception")).build();
        Assert.assertEquals(1L, build.size());
        TraceRecord traceRecord = (TraceRecord) build.get(0);
        Schema valueSchema = traceRecord.valueSchema();
        Object value = traceRecord.value();
        assertRecordAbsentAndErrorPresent(valueSchema);
        Assert.assertTrue(value instanceof Struct);
        Object obj = ((Struct) value).get("metadata");
        Assert.assertTrue(obj instanceof Struct);
        Assert.assertEquals("testTransform", ((Struct) obj).get("transformation_name"));
        Assert.assertEquals("org.apache.kafka.connect.transforms.Cast", ((Struct) obj).get("transformation_type"));
        assertErrorFields((Struct) value);
    }

    @Test
    public void appendSourceConversionError() {
        List build = new TraceRecordBuilderImpl(this.config, this.tracingContext).appendSourceConversionError(Stage.KEY_CONVERTER, sampleSourceRecord(), new Exception("test exception")).build();
        Assert.assertEquals(1L, build.size());
        TraceRecord traceRecord = (TraceRecord) build.get(0);
        Schema valueSchema = traceRecord.valueSchema();
        Object value = traceRecord.value();
        assertRecordAbsentAndErrorPresent(valueSchema);
        Assert.assertTrue(value instanceof Struct);
        Object obj = ((Struct) value).get("metadata");
        Assert.assertTrue(obj instanceof Struct);
        assertMetadataTransformations((Struct) obj);
        assertErrorFields((Struct) value);
    }

    @Test
    public void appendSinkConversionError() {
        List build = new TraceRecordBuilderImpl(this.config, this.tracingContext).appendSinkConversionError(Stage.KEY_CONVERTER, sampleConsumerRecord(), new Exception("test exception")).build();
        Assert.assertEquals(1L, build.size());
        TraceRecord traceRecord = (TraceRecord) build.get(0);
        Schema valueSchema = traceRecord.valueSchema();
        Object value = traceRecord.value();
        assertRecordAbsentAndErrorPresent(valueSchema);
        Assert.assertTrue(value instanceof Struct);
        Object obj = ((Struct) value).get("metadata");
        Assert.assertTrue(obj instanceof Struct);
        Assert.assertEquals(TOPIC_NAME, ((Struct) obj).get(MonitorableSourceConnector.TOPIC_CONFIG));
        Assert.assertEquals(1234L, ((Struct) obj).get("offset"));
        Assert.assertEquals("SINK", ((Struct) obj).get("type"));
        assertErrorFields((Struct) value);
    }

    @Test
    public void createSinkRecordError() {
        TraceRecord createSinkRecordError = new TraceRecordBuilderImpl(this.config, this.tracingContext).createSinkRecordError(sampleSinkRecord(), new Exception("test exception"));
        System.out.println(createSinkRecordError.toString());
        Object value = createSinkRecordError.value();
        Assert.assertTrue(value instanceof Struct);
        Assert.assertNotNull(createSinkRecordError.valueSchema().field("record"));
        assertNullCorrelation(createSinkRecordError);
        Object obj = ((Struct) value).get("metadata");
        Assert.assertTrue(obj instanceof Struct);
        Assert.assertEquals(TOPIC_NAME, ((Struct) obj).get(MonitorableSourceConnector.TOPIC_CONFIG));
        Assert.assertEquals(1234L, ((Struct) obj).get("offset"));
        Assert.assertEquals("SINK", ((Struct) obj).get("type"));
        assertErrorFields((Struct) value);
    }

    @Test
    public void appendError() {
        List build = new TraceRecordBuilderImpl(this.config, this.tracingContext).appendError(Stage.TRANSFORMATION.toString(), Cast.class, new Exception("test exception")).build();
        Assert.assertEquals(1L, build.size());
        TraceRecord traceRecord = (TraceRecord) build.get(0);
        Schema valueSchema = traceRecord.valueSchema();
        Object value = traceRecord.value();
        assertRecordAbsentAndErrorPresent(valueSchema);
        Assert.assertTrue(value instanceof Struct);
        Assert.assertTrue(((Struct) value).get("metadata") instanceof Struct);
        assertErrorFields((Struct) value);
    }

    @Test
    public void testCorrelation() {
        TraceRecordBuilderImpl traceRecordBuilderImpl = new TraceRecordBuilderImpl(this.config, this.tracingContext);
        SourceRecord sampleSourceRecord = sampleSourceRecord();
        traceRecordBuilderImpl.appendRecord(sampleSourceRecord);
        traceRecordBuilderImpl.appendTransformedRecord("t1", Cast.class, sampleSourceRecord, sampleSourceRecord);
        this.tracingContext.incrementProcessedRecordCount();
        traceRecordBuilderImpl.appendRecord(sampleSourceRecord);
        traceRecordBuilderImpl.appendTransformedRecord("t1", Cast.class, sampleSourceRecord, sampleSourceRecord);
        traceRecordBuilderImpl.appendTransformationError("t2", Cast.class, new Exception("test exception"));
        List build = traceRecordBuilderImpl.build();
        Assert.assertEquals(5L, build.size());
        validateCorrelation(build.subList(0, 2), this.tracingContext, 0);
        validateCorrelation(build.subList(2, 5), this.tracingContext, 1);
    }

    @Test
    public void testNullValuedRecords() {
        List build = new TraceRecordBuilderImpl(this.config, this.tracingContext).appendRecord(new SourceRecord(SOURCE_PARTITION, SOURCE_OFFSET, TOPIC_NAME, PARTITION_NUMBER, (Schema) null, (Object) null, (Schema) null, (Object) null, (Long) null, (Iterable) null)).build();
        Assert.assertEquals(1L, build.size());
        TraceRecord traceRecord = (TraceRecord) build.get(0);
        Schema valueSchema = traceRecord.valueSchema();
        Object value = traceRecord.value();
        assertErrorAbsentAndRecordPresent(valueSchema);
        Assert.assertTrue(value instanceof Struct);
        Object obj = ((Struct) value).get("metadata");
        Assert.assertTrue(obj instanceof Struct);
        Assert.assertEquals(Collections.singletonMap("src", "abc"), ((Struct) obj).get("source_partition"));
        Assert.assertEquals(Collections.singletonMap("offset", "1"), ((Struct) obj).get("source_offset"));
        Assert.assertEquals(TOPIC_NAME, ((Struct) obj).get(MonitorableSourceConnector.TOPIC_CONFIG));
        Assert.assertEquals(0, ((Struct) obj).get("partition"));
        Assert.assertEquals("SOURCE", ((Struct) obj).get("type"));
    }

    @Test
    public void testAppendNullTransformation() {
        TraceRecordBuilderImpl traceRecordBuilderImpl = new TraceRecordBuilderImpl(this.config, this.tracingContext);
        traceRecordBuilderImpl.appendRecord(sampleSourceRecord());
        traceRecordBuilderImpl.appendTransformedRecord("t1", Transformation.class, (ConnectRecord) null, sampleSinkRecord());
        Assert.assertEquals(2L, traceRecordBuilderImpl.build().size());
        TraceRecord traceRecord = (TraceRecord) traceRecordBuilderImpl.build().get(1);
        System.out.println(traceRecord);
        Object obj = ((Struct) traceRecord.value()).get("metadata");
        Assert.assertEquals("t1", ((Struct) obj).get("transformation_name"));
        Assert.assertEquals(Transformation.class.getName(), ((Struct) obj).get("transformation_type"));
    }

    @Test
    public void testDataException() {
        TraceRecordBuilderImpl traceRecordBuilderImpl = new TraceRecordBuilderImpl(this.config, this.tracingContext);
        SourceRecord sourceRecord = new SourceRecord(SOURCE_PARTITION, SOURCE_OFFSET, TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, (Object) null, SAMPLE_RECORDS_VALUE_SCHEMA, SAMPLE_RECORD_VALUE, (Long) null, (Iterable) null);
        Assert.assertThrows(Exception.class, () -> {
            traceRecordBuilderImpl.appendRecord(sourceRecord);
        });
        List build = traceRecordBuilderImpl.build();
        Assert.assertEquals(1L, build.size());
        TraceRecord traceRecord = (TraceRecord) build.get(0);
        assertRecordAbsentAndErrorPresent(traceRecord.valueSchema());
        Schema valueSchema = traceRecord.valueSchema();
        Struct struct = (Struct) ((Struct) traceRecord.value()).get("error");
        Assert.assertNull(valueSchema.field("error_trace"));
        Assert.assertEquals("Invalid Record", struct.getMap("error_info").get("Info"));
    }

    private void validateCorrelation(List<TraceRecord> list, TracingContext tracingContext, Integer num) {
        String str = tracingContext.traceID().toString() + "-" + tracingContext.connectorTaskId().toString();
        int i = 0;
        for (TraceRecord traceRecord : list) {
            Assert.assertEquals(tracingContext.connectorTaskId().connector(), metadataOf(traceRecord).get(WorkerConnectorTest.CONNECTOR));
            Assert.assertEquals(str + "-" + num, metadataOf(traceRecord).get("correlation_id"));
            Assert.assertEquals(i + 1, metadataOf(traceRecord).getInt32("current_step").intValue());
            Assert.assertEquals(list.size(), metadataOf(traceRecord).getInt32("total_step").intValue());
            i++;
        }
    }

    private void assertNullCorrelation(TraceRecord traceRecord) {
        Struct struct = (Struct) ((Struct) traceRecord.value()).get("metadata");
        Assert.assertNull(struct.get("correlation_id"));
        Assert.assertNull(struct.get("total_step"));
        Assert.assertNull(struct.get("current_step"));
    }

    private Struct metadataOf(TraceRecord traceRecord) {
        return (Struct) ((Struct) traceRecord.value()).get("metadata");
    }

    private void assertErrorFields(Struct struct) {
        Object obj = struct.get("error");
        Assert.assertTrue(obj instanceof Struct);
        Assert.assertEquals("test exception", ((Struct) obj).get("error_message"));
    }

    private void assertRecordAbsentAndErrorPresent(Schema schema) {
        Assert.assertNull(schema.field("record"));
        Assert.assertNotNull(schema.field("metadata"));
        Assert.assertNotNull(schema.field("error"));
    }

    private void assertErrorAbsentAndRecordPresent(Schema schema) {
        Assert.assertNotNull(schema.field("record"));
        Assert.assertNotNull(schema.field("metadata"));
        Assert.assertNull(schema.field("error"));
    }

    private void assertRecordFields(Struct struct) {
        Object obj = struct.get("record");
        Assert.assertTrue(obj instanceof Struct);
        Object obj2 = ((Struct) obj).get("key");
        Assert.assertTrue(obj2 instanceof String);
        Assert.assertEquals("key", obj2);
        Object obj3 = ((Struct) obj).get("value");
        Assert.assertTrue(obj3 instanceof Struct);
        Object obj4 = ((Struct) obj).get("headers");
        Assert.assertTrue(obj4 instanceof Map);
        Assert.assertEquals(Collections.singletonMap("h1", "hv1"), obj4);
        assertExpectedSampleRecord(obj3);
    }

    private void assertMetadataTransformations(Struct struct) {
        Assert.assertEquals(Collections.singletonMap("src", "abc"), struct.get("source_partition"));
        Assert.assertEquals(Collections.singletonMap("offset", "1"), struct.get("source_offset"));
        Assert.assertEquals(TOPIC_NAME, struct.get(MonitorableSourceConnector.TOPIC_CONFIG));
        Assert.assertEquals(0, struct.get("partition"));
        Assert.assertEquals("SOURCE", struct.get("type"));
    }

    private void assertExpectedSampleRecord(Object obj) {
        ConnectSchema.validateValue(SAMPLE_RECORDS_VALUE_SCHEMA, obj);
    }

    private TracingContext prepareTraceContext() {
        return new TracingContext(new ConnectorTaskId("test", 1), this.config);
    }

    private TracerConfig prepareTraceConfig() {
        return new TracerConfig(ConnectorConfigTest.MOCK_PLUGINS, new ConnectorConfig(ConnectorConfigTest.MOCK_PLUGINS, TEST_CONNECTOR_CONFIGS));
    }

    private SourceRecord sampleSourceRecord() {
        return new SourceRecord(SOURCE_PARTITION, SOURCE_OFFSET, TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key", SAMPLE_RECORDS_VALUE_SCHEMA, SAMPLE_RECORD_VALUE, KAFKA_TIMESTAMP, new ConnectHeaders().addString("h1", "hv1"));
    }

    private ConsumerRecord<String, String> sampleConsumerRecord() {
        return new ConsumerRecord<>(TOPIC_NAME, PARTITION_NUMBER.intValue(), 1234L, 1234L, TimestampType.CREATE_TIME, 0, 0, "key", "value", new RecordHeaders(), Optional.empty());
    }

    private SinkRecord sampleSinkRecord() {
        return new SinkRecord(TOPIC_NAME, PARTITION_NUMBER.intValue(), Schema.STRING_SCHEMA, "key", SAMPLE_RECORDS_VALUE_SCHEMA, SAMPLE_RECORD_VALUE, 1234L, KAFKA_TIMESTAMP, TS_TYPE, new ConnectHeaders().addString("h1", "hv1"));
    }

    static {
        TEST_CONNECTOR_CONFIGS.put("name", "test");
        TEST_CONNECTOR_CONFIGS.put("connector.class", ConnectorConfigTest.TestConnector.class.getName());
        TEST_CONNECTOR_CONFIGS.put("trace.records.enable", "true");
    }
}
