package org.apache.kylin.sample;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.code.ErrorCodeServer;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.kafka.util.KafkaUtils;
import org.apache.kylin.metadata.streaming.KafkaConfig;
import org.apache.kylin.streaming.util.ReflectionUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kylin/sample/KafkaSourceHandlerTest.class */
public class KafkaSourceHandlerTest extends NLocalFileMetadataTestCase {
    private static final String BROKER_SERVER = "localhost:19093";
    private static final StreamingSourceHandler sourceHandler = (StreamingSourceHandler) Mockito.spy(new KafkaSourceHandler());
    private static final KafkaConfig kafkaConfig = new KafkaConfig();

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
        init();
    }

    @After
    public void tearDown() {
        cleanupTestMetadata();
    }

    public void init() {
        kafkaConfig.setDatabase("SSB");
        kafkaConfig.setName("P_LINEORDER");
        kafkaConfig.setProject("streaming_test");
        kafkaConfig.setKafkaBootstrapServers(BROKER_SERVER);
        kafkaConfig.setSubscribe("ssb-topic1");
        kafkaConfig.setStartingOffsets("latest");
        kafkaConfig.setBatchTable("");
        kafkaConfig.setParserName("org.apache.kylin.parser.TimedJsonStreamParser");
    }

    @Test
    public void testParseMessage() {
        Map parserMessage = sourceHandler.parserMessage(kafkaConfig, "{\"timestamp\": \"2000-01-01 05:06:12\"}");
        Assert.assertEquals(1L, parserMessage.size());
        Assert.assertTrue(parserMessage.containsKey("timestamp"));
        String str = "{\"times[tamp]\": \"2000-01-01 05:06:12\"}";
        Assert.assertThrows(ErrorCodeServer.CUSTOM_PARSER_CHECK_COLUMN_NAME_FAILED.getCodeMsg(new Object[0]), KylinException.class, () -> {
            sourceHandler.parserMessage(kafkaConfig, str);
        });
        String str2 = "{timestamp: \"2000-01-01 05:06:12\"}";
        Assert.assertThrows(ErrorCodeServer.STREAMING_PARSE_MESSAGE_ERROR.getMsg(new Object[]{kafkaConfig.getParserName(), kafkaConfig.getSubscribe()}), KylinException.class, () -> {
            sourceHandler.parserMessage(kafkaConfig, str2);
        });
    }

    @Test
    public void testGetMessage() {
        setupMockConsumer("ssb-topic1", 0, true, 7);
        Assert.assertEquals(7L, sourceHandler.getMessages(kafkaConfig).size());
        setupMockConsumer("ssb-topic1", 0, true, 11);
        Assert.assertEquals(10L, sourceHandler.getMessages(kafkaConfig).size());
        setupMockConsumer("ssb-topic1", 0, false, 0);
        Assert.assertEquals(0L, sourceHandler.getMessages(kafkaConfig).size());
    }

    @Test
    public void testBrokenBrokers() {
        setupMockConsumer("ssb-topic1", 0, false, 0);
        kafkaConfig.setKafkaBootstrapServers("1.1.1.1:9092,2.2.2.2:9092,3.3.3.3:9092");
        Assert.assertEquals(3L, sourceHandler.getBrokenBrokers(kafkaConfig).size());
        kafkaConfig.setKafkaBootstrapServers(BROKER_SERVER);
        Assert.assertEquals(1L, sourceHandler.getBrokenBrokers(kafkaConfig).size());
    }

    @Test
    public void testGetTopics() {
        setupMockConsumer("ssb-topic1", 0, false, 0);
        Assert.assertEquals(1L, ((List) sourceHandler.getTopics(kafkaConfig, "ssb-topic1").get("kafka-cluster-1")).size());
        setupMockConsumer("ssb-topic1", 0, false, 0);
        Assert.assertEquals(1L, ((List) sourceHandler.getTopics(kafkaConfig, "").get("kafka-cluster-1")).size());
    }

    private void setupMockConsumer(String str, int i, boolean z, int i2) {
        ReflectionUtils.setField((Class<?>) KafkaUtils.class, "mockup", (Object) new MockConsumer(OffsetResetStrategy.LATEST));
        MockConsumer mockConsumer = (MockConsumer) ReflectionUtils.getField((Class<?>) KafkaUtils.class, "mockup");
        mockConsumer.assign(Collections.singletonList(new TopicPartition(str, i)));
        mockConsumer.updatePartitions(str, Collections.singletonList(new PartitionInfo(str, i, (Node) null, new Node[0], new Node[0])));
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(str, i), 0L);
        mockConsumer.updateBeginningOffsets(hashMap);
        if (!z) {
            HashMap hashMap2 = new HashMap();
            hashMap2.put(new TopicPartition(str, i), 0L);
            mockConsumer.updateEndOffsets(hashMap2);
            return;
        }
        for (int i3 = 0; i3 < i2; i3++) {
            ByteBuffer allocate = ByteBuffer.allocate(10);
            allocate.put(("msg-" + i3).getBytes());
            allocate.flip();
            mockConsumer.addRecord(new ConsumerRecord(str, i, i3, (Object) null, allocate));
        }
        HashMap hashMap3 = new HashMap();
        hashMap3.put(new TopicPartition(str, i), Long.valueOf(i2));
        mockConsumer.updateEndOffsets(hashMap3);
    }
}
