package org.apache.kylin.newten;

import java.io.File;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.exception.KylinTimeoutException;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.query.engine.QueryExec;
import org.apache.kylin.query.pushdown.SparkSqlClient;
import org.apache.kylin.query.runtime.plan.ResultPlan;
import org.apache.kylin.query.util.QueryParams;
import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.query.util.SlowQueryDetector;
import org.apache.kylin.util.ExecAndComp;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparderEnv;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.AnswersWithDelay;
import org.mockito.internal.stubbing.answers.Returns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/newten/SlowQueryDetectorTest.class */
public class SlowQueryDetectorTest extends NLocalWithSparkSessionTest {
    private SlowQueryDetector slowQueryDetector = null;
    private static final Logger logger;
    private static final int TIMEOUT_MS = 5000;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Before
    public void setup() {
        overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1");
        createTestMetadata(new String[0]);
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance(getProject());
        nDefaultScheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
        if (!nDefaultScheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
        this.slowQueryDetector = new SlowQueryDetector(100, TIMEOUT_MS);
        this.slowQueryDetector.start();
    }

    public String getProject() {
        return "match";
    }

    @After
    public void after() {
        NDefaultScheduler.destroyInstance();
        cleanupTestMetadata();
        this.slowQueryDetector.interrupt();
    }

    @Test
    public void testSetInterrupt() {
        this.slowQueryDetector.queryStart("");
        try {
            Thread.sleep(6000L);
            Assert.fail();
        } catch (InterruptedException e) {
            Assert.assertEquals("sleep interrupted", e.getMessage());
        }
        this.slowQueryDetector.queryEnd();
    }

    @Test
    public void testStopQuery() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();
        Thread thread = new Thread(() -> {
            this.slowQueryDetector.queryStart("stopId");
            try {
                try {
                    semaphore.acquire();
                    Assert.fail();
                    this.slowQueryDetector.queryEnd();
                } catch (InterruptedException e) {
                    atomicReference.set(e);
                    this.slowQueryDetector.queryEnd();
                }
            } catch (Throwable th) {
                this.slowQueryDetector.queryEnd();
                throw th;
            }
        });
        thread.start();
        Awaitility.await().atMost(2L, TimeUnit.SECONDS).until(() -> {
            SlowQueryDetector slowQueryDetector = this.slowQueryDetector;
            return Boolean.valueOf(SlowQueryDetector.getRunningQueries().size() > 0);
        });
        this.slowQueryDetector.stopQuery("stopId");
        semaphore.release();
        thread.join();
        if (!$assertionsDisabled && atomicReference.get() == null) {
            throw new AssertionError();
        }
    }

    @Test
    public void testStopQueryWrong() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();
        Thread thread = new Thread(() -> {
            this.slowQueryDetector.queryStart("stopId");
            try {
                try {
                    semaphore.acquire();
                    this.slowQueryDetector.queryEnd();
                } catch (InterruptedException e) {
                    atomicReference.set(e);
                    this.slowQueryDetector.queryEnd();
                }
            } catch (Throwable th) {
                this.slowQueryDetector.queryEnd();
                throw th;
            }
        });
        thread.start();
        Awaitility.await().atMost(2L, TimeUnit.SECONDS).until(() -> {
            SlowQueryDetector slowQueryDetector = this.slowQueryDetector;
            return Boolean.valueOf(SlowQueryDetector.getRunningQueries().size() > 0);
        });
        this.slowQueryDetector.stopQuery("stopId-wrong");
        semaphore.release();
        thread.join();
        Assert.assertEquals((Object) null, atomicReference.get());
    }

    @Test
    public void testStopAsyncQuery() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();
        Thread thread = new Thread(() -> {
            try {
                try {
                    QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
                    QueryContext.current().setQueryId("queryId-async");
                    this.slowQueryDetector.queryStart("");
                    semaphore.acquire();
                    Assert.fail();
                    this.slowQueryDetector.queryEnd();
                } catch (InterruptedException e) {
                    atomicReference.set(e);
                    this.slowQueryDetector.queryEnd();
                }
            } catch (Throwable th) {
                this.slowQueryDetector.queryEnd();
                throw th;
            }
        });
        thread.start();
        Awaitility.await().atMost(2L, TimeUnit.SECONDS).until(() -> {
            SlowQueryDetector slowQueryDetector = this.slowQueryDetector;
            return Boolean.valueOf(SlowQueryDetector.getRunningQueries().size() > 0);
        });
        this.slowQueryDetector.stopQuery("queryId-async");
        semaphore.release();
        thread.join();
        if (!$assertionsDisabled && atomicReference.get() == null) {
            throw new AssertionError();
        }
    }

    @Test
    public void testStopAsyncQueryWrong() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();
        Thread thread = new Thread(() -> {
            try {
                QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
                QueryContext.current().setQueryId("queryId");
                this.slowQueryDetector.queryStart("");
                semaphore.acquire();
            } catch (InterruptedException e) {
                atomicReference.set(e);
            }
            this.slowQueryDetector.queryEnd();
        });
        thread.start();
        Awaitility.await().atMost(2L, TimeUnit.SECONDS).until(() -> {
            SlowQueryDetector slowQueryDetector = this.slowQueryDetector;
            return Boolean.valueOf(SlowQueryDetector.getRunningQueries().size() > 0);
        });
        this.slowQueryDetector.stopQuery("queryId-error");
        semaphore.release();
        thread.join();
        Assert.assertEquals((Object) null, atomicReference.get());
    }

    @Test
    public void testStopAsyncQueryJob() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();
        Thread thread = new Thread(() -> {
            try {
                QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
                QueryContext.current().setQueryId("queryId");
                this.slowQueryDetector.addJobIdForAsyncQueryJob("jobId2");
                this.slowQueryDetector.queryStart("");
                this.slowQueryDetector.addJobIdForAsyncQueryJob("jobId");
                semaphore.acquire();
            } catch (InterruptedException e) {
                atomicReference.set(e);
            }
            this.slowQueryDetector.queryEnd();
        });
        thread.start();
        Awaitility.await().atMost(2L, TimeUnit.SECONDS).until(() -> {
            SlowQueryDetector slowQueryDetector = this.slowQueryDetector;
            return Boolean.valueOf(SlowQueryDetector.getRunningQueries().size() > 0);
        });
        this.slowQueryDetector.stopQuery("queryId");
        semaphore.release();
        thread.join();
        Assert.assertEquals((Object) null, atomicReference.get());
    }

    @Test
    public void testSparderTimeoutCancelJob() throws Exception {
        Dataset dataset = (Dataset) Mockito.spy(SparderEnv.getSparkSession().emptyDataFrame());
        ((Dataset) Mockito.doAnswer(new AnswersWithDelay(15000L, new Returns((Object) null))).when(dataset)).toIterator();
        this.slowQueryDetector.queryStart("");
        try {
            SparderEnv.cleanCompute();
            long currentTimeMillis = System.currentTimeMillis();
            ResultPlan.getResult(dataset, (RelDataType) null);
            ExecAndComp.queryModel(getProject(), "select sum(price) from TEST_KYLIN_FACT group by LSTG_FORMAT_NAME");
            String str = "TestSparderTimeoutCancelJob fail, query cost:" + (System.currentTimeMillis() - currentTimeMillis) + " ms, need compute:" + SparderEnv.needCompute();
            logger.error(str);
            Assert.fail(str);
        } catch (Exception e) {
            Assert.assertTrue(QueryContext.current().getQueryTagInfo().isTimeout());
            Assert.assertTrue(e instanceof KylinTimeoutException);
            Assert.assertEquals("The query exceeds the set time limit of 300s. Current step: Collecting dataset for sparder.", e.getMessage());
            Thread.interrupted();
        }
        this.slowQueryDetector.queryEnd();
    }

    @Test
    public void testPushdownTimeoutCancelJob() {
        Dataset dataset = (Dataset) Mockito.spy(SparderEnv.getSparkSession().emptyDataFrame());
        ((Dataset) Mockito.doAnswer(new AnswersWithDelay(15000L, new Returns((Object) null))).when(dataset)).toIterator();
        this.slowQueryDetector.queryStart("");
        try {
            SparkSqlClient.dfToList(ss, "", dataset);
            SparkSqlClient.executeSql(ss, "select sum(price) from TEST_KYLIN_FACT group by LSTG_FORMAT_NAME", RandomUtil.randomUUID(), getProject());
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(QueryContext.current().getQueryTagInfo().isTimeout());
            Assert.assertTrue(e instanceof KylinTimeoutException);
            Assert.assertEquals("The query exceeds the set time limit of 300s. Current step: Collecting dataset of push-down.", e.getMessage());
            Thread.interrupted();
        }
        this.slowQueryDetector.queryEnd();
    }

    @Test
    @Ignore("not timeout, need another sql")
    public void testSQLMassageTimeoutCancelJob() throws Exception {
        this.slowQueryDetector.queryStart("");
        try {
            SparderEnv.cleanCompute();
            long currentTimeMillis = System.currentTimeMillis();
            QueryUtil.massageSql(new QueryParams(NProjectManager.getProjectConfig(getProject()), FileUtils.readFileToString(new File("src/test/resources/query/sql_timeout/query03.sql"), "UTF-8").trim(), getProject(), 0, 0, "DEFAULT", true));
            String str = "TestSQLMassageTimeoutCancelJob fail, query cost:" + (System.currentTimeMillis() - currentTimeMillis) + " ms, need compute:" + SparderEnv.needCompute();
            logger.error(str);
            Assert.fail(str);
        } catch (Exception e) {
            Assert.assertTrue(QueryContext.current().getQueryTagInfo().isTimeout());
            Assert.assertTrue(e instanceof KylinTimeoutException);
            Assert.assertTrue(ExceptionUtils.getStackTrace(e).contains("QueryUtil"));
            Thread.interrupted();
        }
        this.slowQueryDetector.queryEnd();
    }

    @Ignore("TODO: remove or adapt")
    public void testRealizationChooserTimeout() {
        this.slowQueryDetector.queryStart("");
        try {
            long currentTimeMillis = System.currentTimeMillis();
            Awaitility.await().pollDelay(4990L, TimeUnit.MILLISECONDS).until(() -> {
                return true;
            });
            new QueryExec("default", getTestConfig()).executeQuery("select cal_dt,sum(price) from test_kylin_fact group by cal_dt union all select cal_dt,sum(price) from test_kylin_fact group by cal_dt");
            Assert.fail("testRealizationChooserTimeout fail, query cost:" + (System.currentTimeMillis() - currentTimeMillis) + " ms");
        } catch (Exception e) {
            Assert.assertTrue(QueryContext.current().getQueryTagInfo().isTimeout());
            Assert.assertTrue(e.getCause() instanceof KylinTimeoutException);
            Assert.assertEquals("KE-000000002", e.getCause().getErrorCode().getCodeString());
            Assert.assertEquals("The query exceeds the set time limit of 300s. Current step: Realization chooser. ", e.getCause().getMessage());
            Thread.interrupted();
        }
        this.slowQueryDetector.queryEnd();
    }

    static {
        $assertionsDisabled = !SlowQueryDetectorTest.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(SlowQueryDetectorTest.class);
    }
}
