/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.sample;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.code.ErrorCodeServer;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.kafka.util.KafkaUtils;
import org.apache.kylin.metadata.streaming.KafkaConfig;
import org.apache.kylin.sample.KafkaSourceHandler;
import org.apache.kylin.sample.StreamingSourceHandler;
import org.apache.kylin.streaming.util.ReflectionUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class KafkaSourceHandlerTest
extends NLocalFileMetadataTestCase {
    private static final StreamingSourceHandler sourceHandler = (StreamingSourceHandler)Mockito.spy((Object)new KafkaSourceHandler());
    private static final String BROKER_SERVER = "localhost:19093";
    private static final KafkaConfig kafkaConfig = new KafkaConfig();

    @Before
    public void setUp() throws Exception {
        this.createTestMetadata(new String[0]);
        this.init();
    }

    @After
    public void tearDown() {
        this.cleanupTestMetadata();
    }

    public void init() {
        kafkaConfig.setDatabase("SSB");
        kafkaConfig.setName("P_LINEORDER");
        kafkaConfig.setProject("streaming_test");
        kafkaConfig.setKafkaBootstrapServers(BROKER_SERVER);
        kafkaConfig.setSubscribe("ssb-topic1");
        kafkaConfig.setStartingOffsets("latest");
        kafkaConfig.setBatchTable("");
        kafkaConfig.setParserName("org.apache.kylin.parser.TimedJsonStreamParser");
    }

    @Test
    public void testParseMessage() {
        String msg = "{\"timestamp\": \"2000-01-01 05:06:12\"}";
        Map map = sourceHandler.parserMessage(kafkaConfig, msg);
        Assert.assertEquals((long)1L, (long)map.size());
        Assert.assertTrue((boolean)map.containsKey("timestamp"));
        msg = "{\"times[tamp]\": \"2000-01-01 05:06:12\"}";
        Assert.assertThrows((String)ErrorCodeServer.CUSTOM_PARSER_CHECK_COLUMN_NAME_FAILED.getCodeMsg(new Object[0]), KylinException.class, () -> sourceHandler.parserMessage(kafkaConfig, msg));
        msg = "{timestamp: \"2000-01-01 05:06:12\"}";
        Assert.assertThrows((String)ErrorCodeServer.STREAMING_PARSE_MESSAGE_ERROR.getMsg(new Object[]{kafkaConfig.getParserName(), kafkaConfig.getSubscribe()}), KylinException.class, () -> sourceHandler.parserMessage(kafkaConfig, msg));
    }

    @Test
    public void testGetMessage() {
        String topic = "ssb-topic1";
        boolean partition = false;
        this.setupMockConsumer("ssb-topic1", 0, true, 7);
        List messages = sourceHandler.getMessages(kafkaConfig);
        Assert.assertEquals((long)7L, (long)messages.size());
        this.setupMockConsumer("ssb-topic1", 0, true, 11);
        messages = sourceHandler.getMessages(kafkaConfig);
        Assert.assertEquals((long)10L, (long)messages.size());
        this.setupMockConsumer("ssb-topic1", 0, false, 0);
        messages = sourceHandler.getMessages(kafkaConfig);
        Assert.assertEquals((long)0L, (long)messages.size());
    }

    @Test
    public void testBrokenBrokers() {
        String topic = "ssb-topic1";
        boolean partition = false;
        this.setupMockConsumer("ssb-topic1", 0, false, 0);
        kafkaConfig.setKafkaBootstrapServers("1.1.1.1:9092,2.2.2.2:9092,3.3.3.3:9092");
        List brokenBrokers = sourceHandler.getBrokenBrokers(kafkaConfig);
        Assert.assertEquals((long)3L, (long)brokenBrokers.size());
        kafkaConfig.setKafkaBootstrapServers(BROKER_SERVER);
        brokenBrokers = sourceHandler.getBrokenBrokers(kafkaConfig);
        Assert.assertEquals((long)1L, (long)brokenBrokers.size());
    }

    @Test
    public void testGetTopics() {
        String topic = "ssb-topic1";
        boolean partition = false;
        this.setupMockConsumer("ssb-topic1", 0, false, 0);
        Map topics = sourceHandler.getTopics(kafkaConfig, "ssb-topic1");
        Assert.assertEquals((long)1L, (long)((List)topics.get("kafka-cluster-1")).size());
        this.setupMockConsumer("ssb-topic1", 0, false, 0);
        topics = sourceHandler.getTopics(kafkaConfig, "");
        Assert.assertEquals((long)1L, (long)((List)topics.get("kafka-cluster-1")).size());
    }

    private void setupMockConsumer(String topic, int partition, boolean addMsg, int msgCnt) {
        ReflectionUtils.setField(KafkaUtils.class, "mockup", (Object)new MockConsumer(OffsetResetStrategy.LATEST));
        MockConsumer mockup = (MockConsumer)ReflectionUtils.getField(KafkaUtils.class, "mockup");
        mockup.assign(Collections.singletonList(new TopicPartition(topic, partition)));
        mockup.updatePartitions(topic, Collections.singletonList(new PartitionInfo(topic, partition, null, new Node[0], new Node[0])));
        HashMap<TopicPartition, Long> beginningOffsets = new HashMap<TopicPartition, Long>();
        beginningOffsets.put(new TopicPartition(topic, partition), 0L);
        mockup.updateBeginningOffsets(beginningOffsets);
        if (!addMsg) {
            HashMap<TopicPartition, Long> endOffsets = new HashMap<TopicPartition, Long>();
            endOffsets.put(new TopicPartition(topic, partition), 0L);
            mockup.updateEndOffsets(endOffsets);
            return;
        }
        for (int i = 0; i < msgCnt; ++i) {
            ByteBuffer value = ByteBuffer.allocate(10);
            value.put(("msg-" + i).getBytes());
            value.flip();
            ConsumerRecord rec = new ConsumerRecord(topic, partition, (long)i, null, (Object)value);
            mockup.addRecord(rec);
        }
        HashMap<TopicPartition, Long> endOffsets = new HashMap<TopicPartition, Long>();
        endOffsets.put(new TopicPartition(topic, partition), Long.valueOf(msgCnt));
        mockup.updateEndOffsets(endOffsets);
    }
}

