package org.apache.kylin.query.pushdown;

import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.query.exception.UserStopQueryException;
import org.apache.kylin.query.util.SlowQueryDetector;
import org.apache.spark.scheduler.JobFailed;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerTaskStart;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kylin/query/pushdown/PushdownJobCancelTest.class */
public class PushdownJobCancelTest extends NLocalFileMetadataTestCase {
    SparkSession ss;
    private SlowQueryDetector slowQueryDetector = null;

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
        this.ss = SparkSession.builder().appName("local").master("local[1]").getOrCreate();
        SparderEnv.setSparkSession(this.ss);
        this.ss.read().schema(new StructType().add("TRANS_ID", DataTypes.LongType, false).add("ORDER_ID", DataTypes.LongType, false).add("CAL_DT", DataTypes.DateType, false).add("LSTG_FORMAT_NAME", DataTypes.StringType, false).add("LEAF_CATEG_ID", DataTypes.LongType, false).add("LSTG_SITE_ID", DataTypes.IntegerType, false).add("SLR_SEGMENT_CD", DataTypes.FloatType, false).add("SELLER_ID", DataTypes.LongType, false).add("PRICE", DataTypes.createDecimalType(19, 4), false).add("ITEM_COUNT", DataTypes.DoubleType, false).add("TEST_COUNT_DISTINCT_BITMAP", DataTypes.StringType, false)).csv("../../examples/test_case_data/localmeta/data/DEFAULT.TEST_KYLIN_FACT.csv").createOrReplaceTempView("TEST_KYLIN_FACT");
        this.slowQueryDetector = new SlowQueryDetector(100000, 5000);
        this.slowQueryDetector.start();
    }

    @After
    public void after() throws Exception {
        this.slowQueryDetector.interrupt();
        this.ss.stop();
        cleanupTestMetadata();
        SparderEnv.clean();
    }

    @Test
    public void testCancelPushdownJob() throws InterruptedException {
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.ss.sparkContext().addSparkListener(new SparkListener() { // from class: org.apache.kylin.query.pushdown.PushdownJobCancelTest.1
            public void onTaskStart(SparkListenerTaskStart sparkListenerTaskStart) {
                Iterator it = SlowQueryDetector.getRunningQueries().values().iterator();
                if (!it.hasNext()) {
                    Assert.fail("no running query is found");
                    return;
                }
                SlowQueryDetector.QueryEntry queryEntry = (SlowQueryDetector.QueryEntry) it.next();
                queryEntry.setStopByUser(true);
                queryEntry.getThread().interrupt();
            }

            public void onJobEnd(SparkListenerJobEnd sparkListenerJobEnd) {
                atomicReference.set(sparkListenerJobEnd);
                countDownLatch.countDown();
            }
        });
        Thread thread = new Thread(() -> {
            try {
                this.slowQueryDetector.queryStart("foo");
                SparkSqlClient.executeSql(this.ss, "select * from TEST_KYLIN_FACT", RandomUtil.randomUUID(), "tpch");
            } catch (Exception e) {
                Assert.assertTrue(e instanceof UserStopQueryException);
            } finally {
                this.slowQueryDetector.queryEnd();
            }
        });
        thread.start();
        thread.join();
        countDownLatch.await(10L, TimeUnit.SECONDS);
        Assert.assertTrue(((SparkListenerJobEnd) atomicReference.get()).jobResult() instanceof JobFailed);
        Assert.assertTrue(((SparkListenerJobEnd) atomicReference.get()).jobResult().exception().getMessage().contains("cancelled part of cancelled job group"));
    }
}
