/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.jobs;

import java.io.File;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.code.ErrorCodeServer;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.streaming.jobs.StreamingJobUtils;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamingJobUtilsTest
extends StreamingTestCase {
    private static final String PROJECT = "streaming_test";
    private static final String DATAFLOW_ID = "e78a89dd-847f-4574-8afa-8768b4228b73";

    @Before
    public void setUp() throws Exception {
        this.createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        this.cleanupTestMetadata();
    }

    @Test
    public void testGetStreamingKylinConfig() {
        KylinConfig config = StreamingJobUtilsTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        String jobId = "e78a89dd-847f-4574-8afa-8768b4228b73_build";
        StreamingJobMeta jobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b73_build");
        Map params = jobMeta.getParams();
        params.put("kylin.streaming.spark-conf.spark.executor.memoryOverhead", "1g");
        params.put("kylin.streaming.kafka-conf.maxOffsetsPerTrigger", "300");
        params.put("kylin.streaming.table-refresh-interval", "1h");
        KylinConfig kylinConfig = StreamingJobUtils.getStreamingKylinConfig((KylinConfig)config, (Map)params, (String)jobMeta.getModelId(), (String)PROJECT);
        Assert.assertFalse((boolean)kylinConfig.getStreamingSparkConfigOverride().isEmpty());
        Assert.assertFalse((boolean)kylinConfig.getStreamingKafkaConfigOverride().isEmpty());
        Assert.assertEquals((Object)"1h", (Object)kylinConfig.getStreamingTableRefreshInterval());
    }

    @Test
    public void testGetStreamingKylinConfigOfProject() {
        KylinConfig config = StreamingJobUtilsTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        String jobId = "e78a89dd-847f-4574-8afa-8768b4228b73_build";
        StreamingJobMeta jobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b73_build");
        Map params = jobMeta.getParams();
        config.setProperty("kylin.streaming.spark-conf.spark.executor.memoryOverhead", "1g");
        config.setProperty("kylin.streaming.kafka-conf.maxOffsetsPerTrigger", "300");
        config.setProperty("kylin.streaming.table-refresh-interval", "30m");
        KylinConfig kylinConfig = StreamingJobUtils.getStreamingKylinConfig((KylinConfig)config, (Map)params, (String)"", (String)PROJECT);
        Assert.assertFalse((boolean)kylinConfig.getStreamingSparkConfigOverride().isEmpty());
        Assert.assertFalse((boolean)kylinConfig.getStreamingKafkaConfigOverride().isEmpty());
        Assert.assertEquals((Object)"30m", (Object)kylinConfig.getStreamingTableRefreshInterval());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testExtractKafkaSaslJaasConf() throws Exception {
        KapConfig kapConfig = KapConfig.getInstanceFromEnv();
        Assert.assertNull((Object)StreamingJobUtils.extractKafkaJaasConf((boolean)true));
        StreamingJobUtilsTest.getTestConfig().setProperty("kylin.kafka-jaas.enabled", "true");
        FileUtils.write((File)new File(kapConfig.getKafkaJaasConfPath()), (CharSequence)"KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required;}", (Charset)StandardCharsets.UTF_8);
        String text = StreamingJobUtils.extractKafkaJaasConf((boolean)true);
        Assert.assertNotNull((Object)text);
        StreamingJobUtilsTest.getTestConfig().setProperty("kylin.kafka-jaas-conf", "kafka_err_jaas.conf");
        File file = new File(kapConfig.getKafkaJaasConfPath());
        FileUtils.write((File)file, (CharSequence)"}4{", (Charset)StandardCharsets.UTF_8);
        try {
            StreamingJobUtils.extractKafkaJaasConf((boolean)true);
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof KylinException));
            Assert.assertEquals((Object)"KE-010035217", (Object)((KylinException)e).getErrorCode().getCodeString());
        }
        finally {
            FileUtils.deleteQuietly((File)new File(KapConfig.getInstanceFromEnv().getKafkaJaasConfPath()));
        }
    }

    @Test
    public void testCheckKeyTabFileUnderJaas() throws Exception {
        KapConfig kapConfig = KapConfig.getInstanceFromEnv();
        Assert.assertNull((Object)StreamingJobUtils.extractKafkaJaasConf((boolean)true));
        Assert.assertNull((Object)StreamingJobUtils.getJaasKeyTabAbsPath());
        KylinConfig kylinConfig = StreamingJobUtilsTest.getTestConfig();
        kylinConfig.setProperty("kylin.kafka-jaas.enabled", "true");
        File testKeyTab = new File(KylinConfig.getKylinConfDir() + File.separator + "test.keytab");
        Assert.assertThrows((String)ErrorCodeServer.READ_KAFKA_JAAS_FILE_ERROR.getMsg(new Object[0]), KylinException.class, () -> StreamingJobUtils.extractKafkaJaasConf((boolean)true));
        FileUtils.write((File)new File(kapConfig.getKafkaJaasConfPath()), (CharSequence)"KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true principal=\"kylin@DEV.COM\" serviceName=\"kafka\"; };", (Charset)StandardCharsets.UTF_8);
        Assert.assertNull((Object)StreamingJobUtils.getJaasKeyTabAbsPath());
        FileUtils.write((File)new File(kapConfig.getKafkaJaasConfPath()), (CharSequence)("KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"" + testKeyTab + "\" principal=\"kylin@DEV.COM\" serviceName=\"kafka\"; };"), (Charset)StandardCharsets.UTF_8);
        Assert.assertThrows((String)ErrorCodeServer.KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS.getMsg(new Object[0]), KylinException.class, () -> StreamingJobUtils.extractKafkaJaasConf((boolean)true));
        FileUtils.write((File)testKeyTab, (CharSequence)"test", (Charset)StandardCharsets.UTF_8);
        String text = StreamingJobUtils.extractKafkaJaasConf((boolean)true);
        Assert.assertNotNull((Object)text);
        String keyTabAbsPath = StreamingJobUtils.getJaasKeyTabAbsPath();
        Assert.assertEquals((Object)testKeyTab.getAbsolutePath(), (Object)keyTabAbsPath);
        String executorJaasName = StreamingJobUtils.getExecutorJaasName();
        Assert.assertEquals((Object)kapConfig.getKafkaJaasConf(), (Object)executorJaasName);
        String executorJaasPath = StreamingJobUtils.getExecutorJaasPath();
        Assert.assertEquals((Object)(HadoopUtil.getHadoopConfDir() + File.separator + executorJaasName), (Object)executorJaasPath);
        kylinConfig.setProperty("kylin.kafka-jaas-conf", "kafka_err_jaas.conf");
    }

    @Test
    public void testCreateExecutorJaas() throws Exception {
        KapConfig kapConfig = KapConfig.getInstanceFromEnv();
        String executorJaasPath = HadoopUtil.getHadoopConfDir() + File.separator + kapConfig.getKafkaJaasConf();
        File executorJaasFile = new File(executorJaasPath);
        executorJaasFile.deleteOnExit();
        StreamingJobUtils.createExecutorJaas();
        Assert.assertFalse((boolean)executorJaasFile.exists());
        StreamingJobUtilsTest.getTestConfig().setProperty("kylin.kafka-jaas.enabled", "true");
        String jaasContext = "KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required; };";
        FileUtils.write((File)new File(kapConfig.getKafkaJaasConfPath()), (CharSequence)jaasContext, (Charset)StandardCharsets.UTF_8);
        StreamingJobUtils.createExecutorJaas();
        Assert.assertTrue((boolean)executorJaasFile.exists());
        Assert.assertEquals((Object)jaasContext, (Object)FileUtils.readFileToString((File)executorJaasFile, (Charset)StandardCharsets.UTF_8));
        File testKeyTab = new File(KylinConfig.getKylinConfDir() + File.separator + "test.keytab");
        FileUtils.write((File)testKeyTab, (CharSequence)"test", (Charset)StandardCharsets.UTF_8);
        String jaasContext2 = "KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"" + testKeyTab.getAbsolutePath() + "\" principal=\"kylin@DEV.COM\" serviceName=\"kafka\"; };";
        FileUtils.write((File)new File(kapConfig.getKafkaJaasConfPath()), (CharSequence)jaasContext2, (Charset)StandardCharsets.UTF_8);
        StreamingJobUtils.createExecutorJaas();
        Assert.assertTrue((boolean)executorJaasFile.exists());
        Assert.assertEquals((Object)jaasContext2.replace(testKeyTab.getAbsolutePath(), testKeyTab.getName()), (Object)FileUtils.readFileToString((File)executorJaasFile, (Charset)StandardCharsets.UTF_8));
        executorJaasFile.deleteOnExit();
    }
}

