package org.apache.kylin.rest.service;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.code.ErrorCodeServer;
import org.apache.kylin.common.msg.Message;
import org.apache.kylin.common.msg.MsgPicker;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
import org.apache.kylin.metadata.jar.JarInfoManager;
import org.apache.kylin.metadata.jar.JarTypeEnum;
import org.apache.kylin.metadata.streaming.DataParserInfo;
import org.apache.kylin.metadata.streaming.DataParserManager;
import org.apache.kylin.metadata.streaming.KafkaConfig;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.util.AclUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.test.util.ReflectionTestUtils;

/* loaded from: input_file:org/apache/kylin/rest/service/KafkaServiceTest.class */
public class KafkaServiceTest extends NLocalFileMetadataTestCase {
    private static final String brokerServer = "localhost:19093";
    private static final String PROJECT = "streaming_test";
    private static final String JAR_NAME = "custom_parser.jar";
    private static final String PARSER_NAME1 = "org.apache.kylin.parser.JsonDataParser1";
    private static final String PARSER_NAME2 = "org.apache.kylin.parser.CsvDataParser2";
    private static final String PARSER_NAME3 = "org.apache.kylin.parser.CustomDataParser2";
    private static String JAR_ABS_PATH;

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Mock
    private final KafkaService kafkaService = (KafkaService) Mockito.spy(KafkaService.class);

    @Mock
    private final CustomFileService customFileService = (CustomFileService) Mockito.spy(CustomFileService.class);

    @Mock
    private AclEvaluate aclEvaluate = (AclEvaluate) Mockito.spy(AclEvaluate.class);

    @Mock
    private AclUtil aclUtil = (AclUtil) Mockito.spy(AclUtil.class);
    KafkaConfig kafkaConfig = new KafkaConfig();

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
        ReflectionTestUtils.setField(this.aclEvaluate, "aclUtil", this.aclUtil);
        ReflectionTestUtils.setField(this.kafkaService, "aclEvaluate", this.aclEvaluate);
        ReflectionTestUtils.setField(this.customFileService, "aclEvaluate", this.aclEvaluate);
        init();
    }

    public void init() {
        this.kafkaConfig.setDatabase("SSB");
        this.kafkaConfig.setName("P_LINEORDER");
        this.kafkaConfig.setProject(PROJECT);
        this.kafkaConfig.setKafkaBootstrapServers(brokerServer);
        this.kafkaConfig.setSubscribe("ssb-topic1");
        this.kafkaConfig.setStartingOffsets("latest");
        this.kafkaConfig.setBatchTable("");
        this.kafkaConfig.setParserName("org.apache.kylin.parser.TimedJsonStreamParser");
    }

    public void initJar() {
        JAR_ABS_PATH = new File(new Path(String.format("%s/%s/%s", new Path(KylinConfig.getInstanceFromEnv().getMetadataUrl().toString()).getParent().toString(), "jars", JAR_NAME)).toString()).toString();
    }

    @Test
    public void testCheckBrokerStatus() {
        try {
            this.kafkaService.checkBrokerStatus(this.kafkaConfig);
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e instanceof KylinException);
            Map map = (Map) e.getData();
            Assert.assertNotNull(map);
            Assert.assertEquals(1L, map.size());
            Assert.assertEquals(Collections.singletonList(brokerServer), map.get("failed_servers"));
        }
    }

    @Test
    public void testGetTopics() {
        Message msg = MsgPicker.getMsg();
        Assert.assertThrows(msg.getBrokerTimeoutMessage(), KylinException.class, () -> {
            this.kafkaService.getTopics(this.kafkaConfig, PROJECT, "test");
        });
        this.kafkaConfig.setKafkaBootstrapServers("");
        Assert.assertThrows(msg.getInvalidBrokerDefinition(), KylinException.class, () -> {
            this.kafkaService.getTopics(this.kafkaConfig, PROJECT, "test");
        });
    }

    @Test
    public void testGetMessage() {
        Assert.assertThrows(MsgPicker.getMsg().getStreamingTimeoutMessage(), KylinException.class, () -> {
            this.kafkaService.getMessages(this.kafkaConfig, PROJECT);
        });
        this.kafkaConfig.setKafkaBootstrapServers("");
        Assert.assertThrows(MsgPicker.getMsg().getInvalidBrokerDefinition(), KylinException.class, () -> {
            this.kafkaService.getMessages(this.kafkaConfig, PROJECT);
        });
    }

    @Test
    public void testGetMessageTypeAndDecodedMessages() {
        ByteBuffer allocate = ByteBuffer.allocate(10);
        allocate.put("msg-1".getBytes());
        allocate.flip();
        Assert.assertEquals(1L, ((List) this.kafkaService.decodeMessage(Collections.singletonList(allocate)).get("message")).size());
    }

    @Test
    public void testConvertSampleMessageToFlatMap() {
        Assert.assertEquals(3L, this.kafkaService.parserMessage(PROJECT, this.kafkaConfig, "{\"a\": 2, \"b\": 2, \"timestamp\": \"2000-01-01 05:06:12\"}").size());
        Assert.assertThrows(ErrorCodeServer.CUSTOM_PARSER_CHECK_COLUMN_NAME_FAILED.getMsg(new Object[0]), KylinException.class, () -> {
            this.kafkaService.parserMessage(PROJECT, this.kafkaConfig, "{\"_a\": 2, \"b\": 2, \"timestamp\": \"2000-01-01 05:06:12\"}");
        });
        Assert.assertThrows(MsgPicker.getMsg().getEmptyStreamingMessage(), KylinException.class, () -> {
            this.kafkaService.parserMessage(PROJECT, this.kafkaConfig, "");
        });
    }

    @Test
    public void testGetParsers() {
        Assert.assertFalse(this.kafkaService.getParsers(PROJECT).isEmpty());
    }

    @Test
    public void testRemoveParser() throws IOException {
        DataParserManager dataParserManager = DataParserManager.getInstance(getTestConfig(), PROJECT);
        JarInfoManager jarInfoManager = JarInfoManager.getInstance(getTestConfig(), PROJECT);
        initJar();
        this.customFileService.loadParserJar(JAR_NAME, this.customFileService.uploadCustomJar(new MockMultipartFile(JAR_NAME, JAR_NAME, "multipart/form-data", Files.newInputStream(Paths.get(JAR_ABS_PATH, new String[0]), new OpenOption[0])), PROJECT, "STREAMING_CUSTOM_PARSER"), PROJECT);
        Assert.assertNotNull(jarInfoManager.getJarInfo(JarTypeEnum.valueOf("STREAMING_CUSTOM_PARSER"), JAR_NAME));
        Assert.assertNotNull(dataParserManager.getDataParserInfo(PARSER_NAME1));
        this.kafkaService.removeParser(PROJECT, PARSER_NAME1);
        Assert.assertNull(dataParserManager.getDataParserInfo(PARSER_NAME1));
        Assert.assertNotNull(dataParserManager.getDataParserInfo(PARSER_NAME2));
        this.kafkaService.removeParser(PROJECT, PARSER_NAME2);
        Assert.assertNull(dataParserManager.getDataParserInfo(PARSER_NAME2));
        Assert.assertNotNull(dataParserManager.getDataParserInfo(PARSER_NAME3));
        this.kafkaService.removeParser(PROJECT, PARSER_NAME3);
        Assert.assertNull(dataParserManager.getDataParserInfo(PARSER_NAME3));
        Assert.assertNull(jarInfoManager.getJarInfo(JarTypeEnum.valueOf("STREAMING_CUSTOM_PARSER"), JAR_NAME));
    }

    @Test
    public void testInitDefaultParser() {
        DataParserManager dataParserManager = DataParserManager.getInstance(getTestConfig(), PROJECT);
        DataParserInfo dataParserInfo = dataParserManager.getDataParserInfo("org.apache.kylin.parser.TimedJsonStreamParser");
        CachedCrudAssist cachedCrudAssist = (CachedCrudAssist) ReflectionTestUtils.getField(dataParserManager, "crud");
        Assert.assertNotNull(cachedCrudAssist);
        cachedCrudAssist.delete(dataParserInfo);
        Assert.assertNull(dataParserManager.getDataParserInfo("org.apache.kylin.parser.TimedJsonStreamParser"));
        this.kafkaService.initDefaultParser(PROJECT);
        Assert.assertNotNull(dataParserManager.getDataParserInfo("org.apache.kylin.parser.TimedJsonStreamParser"));
    }
}
