package org.apache.kylin.streaming.jobs;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.code.ErrorCodeServer;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kylin/streaming/jobs/StreamingJobUtilsTest.class */
public class StreamingJobUtilsTest extends StreamingTestCase {
    private static final String PROJECT = "streaming_test";
    private static final String DATAFLOW_ID = "e78a89dd-847f-4574-8afa-8768b4228b73";

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        cleanupTestMetadata();
    }

    @Test
    public void testGetStreamingKylinConfig() {
        KylinConfig testConfig = getTestConfig();
        StreamingJobMeta streamingJobByUuid = StreamingJobManager.getInstance(testConfig, PROJECT).getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b73_build");
        Map params = streamingJobByUuid.getParams();
        params.put("kylin.streaming.spark-conf.spark.executor.memoryOverhead", "1g");
        params.put("kylin.streaming.kafka-conf.maxOffsetsPerTrigger", "300");
        params.put("kylin.streaming.table-refresh-interval", "1h");
        KylinConfig streamingKylinConfig = StreamingJobUtils.getStreamingKylinConfig(testConfig, params, streamingJobByUuid.getModelId(), PROJECT);
        Assert.assertFalse(streamingKylinConfig.getStreamingSparkConfigOverride().isEmpty());
        Assert.assertFalse(streamingKylinConfig.getStreamingKafkaConfigOverride().isEmpty());
        Assert.assertEquals("1h", streamingKylinConfig.getStreamingTableRefreshInterval());
    }

    @Test
    public void testGetStreamingKylinConfigOfProject() {
        KylinConfig testConfig = getTestConfig();
        Map params = StreamingJobManager.getInstance(testConfig, PROJECT).getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b73_build").getParams();
        testConfig.setProperty("kylin.streaming.spark-conf.spark.executor.memoryOverhead", "1g");
        testConfig.setProperty("kylin.streaming.kafka-conf.maxOffsetsPerTrigger", "300");
        testConfig.setProperty("kylin.streaming.table-refresh-interval", "30m");
        KylinConfig streamingKylinConfig = StreamingJobUtils.getStreamingKylinConfig(testConfig, params, "", PROJECT);
        Assert.assertFalse(streamingKylinConfig.getStreamingSparkConfigOverride().isEmpty());
        Assert.assertFalse(streamingKylinConfig.getStreamingKafkaConfigOverride().isEmpty());
        Assert.assertEquals("30m", streamingKylinConfig.getStreamingTableRefreshInterval());
    }

    @Test
    public void testExtractKafkaSaslJaasConf() throws Exception {
        KapConfig instanceFromEnv = KapConfig.getInstanceFromEnv();
        Assert.assertNull(StreamingJobUtils.extractKafkaJaasConf(true));
        getTestConfig().setProperty("kylin.kafka-jaas.enabled", "true");
        FileUtils.write(new File(instanceFromEnv.getKafkaJaasConfPath()), "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required;}", StandardCharsets.UTF_8);
        Assert.assertNotNull(StreamingJobUtils.extractKafkaJaasConf(true));
        getTestConfig().setProperty("kylin.kafka-jaas-conf", "kafka_err_jaas.conf");
        FileUtils.write(new File(instanceFromEnv.getKafkaJaasConfPath()), "}4{", StandardCharsets.UTF_8);
        try {
            try {
                StreamingJobUtils.extractKafkaJaasConf(true);
                FileUtils.deleteQuietly(new File(KapConfig.getInstanceFromEnv().getKafkaJaasConfPath()));
            } catch (Exception e) {
                Assert.assertTrue(e instanceof KylinException);
                Assert.assertEquals("KE-010035217", e.getErrorCode().getCodeString());
                FileUtils.deleteQuietly(new File(KapConfig.getInstanceFromEnv().getKafkaJaasConfPath()));
            }
        } catch (Throwable th) {
            FileUtils.deleteQuietly(new File(KapConfig.getInstanceFromEnv().getKafkaJaasConfPath()));
            throw th;
        }
    }

    @Test
    public void testCheckKeyTabFileUnderJaas() throws Exception {
        KapConfig instanceFromEnv = KapConfig.getInstanceFromEnv();
        Assert.assertNull(StreamingJobUtils.extractKafkaJaasConf(true));
        Assert.assertNull(StreamingJobUtils.getJaasKeyTabAbsPath());
        KylinConfig testConfig = getTestConfig();
        testConfig.setProperty("kylin.kafka-jaas.enabled", "true");
        File file = new File(KylinConfig.getKylinConfDir() + File.separator + "test.keytab");
        Assert.assertThrows(ErrorCodeServer.READ_KAFKA_JAAS_FILE_ERROR.getMsg(new Object[0]), KylinException.class, () -> {
            StreamingJobUtils.extractKafkaJaasConf(true);
        });
        FileUtils.write(new File(instanceFromEnv.getKafkaJaasConfPath()), "KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true principal=\"kylin@DEV.COM\" serviceName=\"kafka\"; };", StandardCharsets.UTF_8);
        Assert.assertNull(StreamingJobUtils.getJaasKeyTabAbsPath());
        FileUtils.write(new File(instanceFromEnv.getKafkaJaasConfPath()), "KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"" + file + "\" principal=\"kylin@DEV.COM\" serviceName=\"kafka\"; };", StandardCharsets.UTF_8);
        Assert.assertThrows(ErrorCodeServer.KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS.getMsg(new Object[0]), KylinException.class, () -> {
            StreamingJobUtils.extractKafkaJaasConf(true);
        });
        FileUtils.write(file, "test", StandardCharsets.UTF_8);
        Assert.assertNotNull(StreamingJobUtils.extractKafkaJaasConf(true));
        Assert.assertEquals(file.getAbsolutePath(), StreamingJobUtils.getJaasKeyTabAbsPath());
        String executorJaasName = StreamingJobUtils.getExecutorJaasName();
        Assert.assertEquals(instanceFromEnv.getKafkaJaasConf(), executorJaasName);
        Assert.assertEquals(HadoopUtil.getHadoopConfDir() + File.separator + executorJaasName, StreamingJobUtils.getExecutorJaasPath());
        testConfig.setProperty("kylin.kafka-jaas-conf", "kafka_err_jaas.conf");
    }

    @Test
    public void testCreateExecutorJaas() throws Exception {
        KapConfig instanceFromEnv = KapConfig.getInstanceFromEnv();
        File file = new File(HadoopUtil.getHadoopConfDir() + File.separator + instanceFromEnv.getKafkaJaasConf());
        file.deleteOnExit();
        StreamingJobUtils.createExecutorJaas();
        Assert.assertFalse(file.exists());
        getTestConfig().setProperty("kylin.kafka-jaas.enabled", "true");
        FileUtils.write(new File(instanceFromEnv.getKafkaJaasConfPath()), "KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required; };", StandardCharsets.UTF_8);
        StreamingJobUtils.createExecutorJaas();
        Assert.assertTrue(file.exists());
        Assert.assertEquals("KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required; };", FileUtils.readFileToString(file, StandardCharsets.UTF_8));
        File file2 = new File(KylinConfig.getKylinConfDir() + File.separator + "test.keytab");
        FileUtils.write(file2, "test", StandardCharsets.UTF_8);
        String str = "KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"" + file2.getAbsolutePath() + "\" principal=\"kylin@DEV.COM\" serviceName=\"kafka\"; };";
        FileUtils.write(new File(instanceFromEnv.getKafkaJaasConfPath()), str, StandardCharsets.UTF_8);
        StreamingJobUtils.createExecutorJaas();
        Assert.assertTrue(file.exists());
        Assert.assertEquals(str.replace(file2.getAbsolutePath(), file2.getName()), FileUtils.readFileToString(file, StandardCharsets.UTF_8));
        file.deleteOnExit();
    }
}
