package org.apache.kylin.rest.service;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.msg.MsgPicker;
import org.apache.kylin.common.scheduler.EventBusFactory;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.engine.spark.ExecutableUtils;
import org.apache.kylin.junit.rule.TransactionExceptedException;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
import org.apache.kylin.metadata.streaming.KafkaConfig;
import org.apache.kylin.metadata.streaming.KafkaConfigManager;
import org.apache.kylin.rest.request.StreamingRequest;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.util.AclUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.springframework.security.authentication.TestingAuthenticationToken;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.test.util.ReflectionTestUtils;

/* loaded from: input_file:org/apache/kylin/rest/service/StreamingTableServiceTest.class */
public class StreamingTableServiceTest extends NLocalFileMetadataTestCase {

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Mock
    private AclUtil aclUtil = (AclUtil) Mockito.spy(AclUtil.class);

    @Mock
    private AclEvaluate aclEvaluate = (AclEvaluate) Mockito.spy(AclEvaluate.class);

    @Mock
    private UserService userService = (UserService) Mockito.spy(UserService.class);

    @Mock
    private UserAclService userAclService = (UserAclService) Mockito.spy(UserAclService.class);

    @InjectMocks
    private AccessService accessService = (AccessService) Mockito.spy(new AccessService());

    @InjectMocks
    private StreamingTableService streamingTableService = (StreamingTableService) Mockito.spy(new StreamingTableService());

    @InjectMocks
    private TableService tableService = (TableService) Mockito.spy(new TableService());

    @Rule
    public TransactionExceptedException thrown = TransactionExceptedException.none();

    @Mock
    protected IUserGroupService userGroupService = (IUserGroupService) Mockito.spy(NUserGroupService.class);
    private static final String PROJECT = "streaming_test";

    @Before
    public void setup() {
        ExecutableUtils.initJobFactory();
        createTestMetadata(new String[0]);
        SecurityContextHolder.getContext().setAuthentication(new TestingAuthenticationToken("ADMIN", "ADMIN", new String[]{"ROLE_ADMIN"}));
        NProjectManager nProjectManager = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
        nProjectManager.forceDropProject("broken_test");
        nProjectManager.forceDropProject("bad_query_test");
        System.setProperty("HADOOP_USER_NAME", "root");
        ReflectionTestUtils.setField(this.aclEvaluate, "aclUtil", this.aclUtil);
        ReflectionTestUtils.setField(this.streamingTableService, "aclEvaluate", this.aclEvaluate);
        ReflectionTestUtils.setField(this.tableService, "aclEvaluate", this.aclEvaluate);
        ReflectionTestUtils.setField(this.tableService, "userGroupService", this.userGroupService);
        ReflectionTestUtils.setField(this.tableService, "accessService", this.accessService);
        ReflectionTestUtils.setField(this.userAclService, "userService", this.userService);
        ReflectionTestUtils.setField(this.accessService, "userAclService", this.userAclService);
        ReflectionTestUtils.setField(this.accessService, "userService", this.userService);
        NProjectManager nProjectManager2 = NProjectManager.getInstance(getTestConfig());
        nProjectManager2.updateProject(nProjectManager2.copyForWrite(nProjectManager2.getProject(PROJECT)));
        Mockito.when(this.userService.listSuperAdminUsers()).thenReturn(Arrays.asList("admin"));
        Mockito.when(Boolean.valueOf(this.userAclService.hasUserAclPermissionInProject(Mockito.anyString(), Mockito.anyString()))).thenReturn(false);
        try {
            new JdbcRawRecStore(getTestConfig());
        } catch (Exception e) {
        }
    }

    @After
    public void tearDown() {
        getTestConfig().setProperty("kylin.metadata.semi-automatic-mode", "false");
        EventBusFactory.getInstance().restart();
        cleanupTestMetadata();
    }

    @Test
    public void testInnerReloadTable() {
        getTestConfig();
        try {
            List list = (List) this.tableService.getTableDesc(PROJECT, true, "P_LINEORDER_STR", "SSB", false, Collections.emptyList(), 10).getFirst();
            Assert.assertEquals(1L, list.size());
            TableDesc tableDesc = (TableDesc) list.get(0);
            Assert.assertEquals(0L, this.streamingTableService.innerReloadTable(PROJECT, tableDesc, this.tableService.getOrCreateTableExt(PROJECT, tableDesc)).size());
        } catch (Exception e) {
            Assert.fail();
        }
    }

    @Test
    public void testReloadTable() {
        try {
            List list = (List) this.tableService.getTableDesc(PROJECT, true, "", "DEFAULT", true, Collections.emptyList(), 10).getFirst();
            Assert.assertEquals(2L, list.size());
            TableDesc tableDesc = (TableDesc) list.get(0);
            this.streamingTableService.reloadTable(PROJECT, tableDesc, this.tableService.getOrCreateTableExt(PROJECT, tableDesc));
        } catch (Exception e) {
            Assert.fail();
        }
    }

    @Test
    public void testCreateKafkaConfig() {
        KafkaConfig kafkaConfig = new KafkaConfig();
        kafkaConfig.setDatabase("DEFAULT");
        kafkaConfig.setName("TPCH_TOPIC");
        kafkaConfig.setKafkaBootstrapServers("10.1.2.210:9092");
        kafkaConfig.setSubscribe("tpch_topic");
        kafkaConfig.setStartingOffsets("latest");
        kafkaConfig.setParserName("org.apache.kylin.parser.TimedJsonStreamParser");
        this.streamingTableService.createKafkaConfig(PROJECT, kafkaConfig);
        KafkaConfig kafkaConfig2 = KafkaConfigManager.getInstance(getTestConfig(), PROJECT).getKafkaConfig("DEFAULT.TPCH_TOPIC");
        Assert.assertEquals("DEFAULT", kafkaConfig2.getDatabase());
        Assert.assertEquals("TPCH_TOPIC", kafkaConfig2.getName());
        Assert.assertEquals("10.1.2.210:9092", kafkaConfig2.getKafkaBootstrapServers());
        Assert.assertEquals("tpch_topic", kafkaConfig2.getSubscribe());
        Assert.assertEquals("latest", kafkaConfig2.getStartingOffsets());
    }

    @Test
    public void testUpdateKafkaConfig() {
        KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT).getKafkaConfig("DEFAULT.SSB_TOPIC");
        kafkaConfig.setKafkaBootstrapServers("10.1.2.210:9093");
        this.streamingTableService.updateKafkaConfig(PROJECT, kafkaConfig);
        Assert.assertEquals("10.1.2.210:9093", KafkaConfigManager.getInstance(getTestConfig(), PROJECT).getKafkaConfig("DEFAULT.SSB_TOPIC").getKafkaBootstrapServers());
    }

    @Test
    public void testDecimalConvertToDouble() {
        StreamingRequest streamingRequest = new StreamingRequest();
        TableDesc tableDesc = new TableDesc();
        tableDesc.setColumns(new ColumnDesc[]{new ColumnDesc("1", "name1", "DECIMAL", "", "", "", ""), new ColumnDesc("2", "name2", "double", "", "", "", ""), new ColumnDesc("3", "name3", "int", "", "", "", "")});
        streamingRequest.setTableDesc(tableDesc);
        this.streamingTableService.decimalConvertToDouble(PROJECT, streamingRequest);
        Assert.assertEquals(2L, Arrays.stream(streamingRequest.getTableDesc().getColumns()).filter(columnDesc -> {
            return StringUtils.equalsIgnoreCase(columnDesc.getDatatype(), "double");
        }).count());
    }

    @Test
    public void testCheckColumnsNotMatch() {
        StreamingRequest streamingRequest = new StreamingRequest();
        streamingRequest.setProject(PROJECT);
        TableDesc tableDesc = new TableDesc();
        tableDesc.setColumns(new ColumnDesc[]{new ColumnDesc("1", "name1", "DECIMAL", "", "", "", ""), new ColumnDesc("2", "name2", "double", "", "", "", ""), new ColumnDesc("3", "name3", "int", "", "", "", "")});
        streamingRequest.setTableDesc(tableDesc);
        KafkaConfig kafkaConfig = new KafkaConfig();
        kafkaConfig.setDatabase("SSB");
        kafkaConfig.setBatchTable("SSB.P_LINEORDER");
        kafkaConfig.setName("TPCH_TOPIC");
        kafkaConfig.setKafkaBootstrapServers("10.1.2.210:9092");
        kafkaConfig.setSubscribe("tpch_topic");
        kafkaConfig.setStartingOffsets("latest");
        streamingRequest.setKafkaConfig(kafkaConfig);
        this.thrown.expect(KylinException.class);
        this.thrown.expectMessage(String.format(Locale.ROOT, MsgPicker.getMsg().getBatchStreamTableNotMatch(), "SSB.P_LINEORDER"));
        this.streamingTableService.checkColumns(streamingRequest);
    }

    @Test
    public void testCheckColumnsNoTimestampPartition() {
        StreamingRequest streamingRequest = new StreamingRequest();
        streamingRequest.setProject(PROJECT);
        streamingRequest.setTableDesc(NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT).getTableDesc("SSB.LINEORDER"));
        KafkaConfig kafkaConfig = new KafkaConfig();
        kafkaConfig.setDatabase("SSB");
        kafkaConfig.setBatchTable("SSB.LINEORDER");
        kafkaConfig.setName("TPCH_TOPIC");
        kafkaConfig.setKafkaBootstrapServers("10.1.2.210:9092");
        kafkaConfig.setSubscribe("tpch_topic");
        kafkaConfig.setStartingOffsets("latest");
        streamingRequest.setKafkaConfig(kafkaConfig);
        this.thrown.expect(KylinException.class);
        this.thrown.expectMessage(MsgPicker.getMsg().getTimestampColumnNotExist());
        this.streamingTableService.checkColumns(streamingRequest);
    }

    @Test
    public void testCheckColumnsNoTimestampPartition1() {
        StreamingRequest streamingRequest = new StreamingRequest();
        streamingRequest.setProject(PROJECT);
        streamingRequest.setTableDesc(NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT).getTableDesc("SSB.LINEORDER"));
        KafkaConfig kafkaConfig = new KafkaConfig();
        kafkaConfig.setDatabase("SSB");
        kafkaConfig.setName("TPCH_TOPIC");
        kafkaConfig.setKafkaBootstrapServers("10.1.2.210:9092");
        kafkaConfig.setSubscribe("tpch_topic");
        kafkaConfig.setStartingOffsets("latest");
        streamingRequest.setKafkaConfig(kafkaConfig);
        this.thrown.expect(KylinException.class);
        this.thrown.expectMessage(MsgPicker.getMsg().getTimestampColumnNotExist());
        this.streamingTableService.checkColumns(streamingRequest);
    }
}
