package org.apache.kylin.engine.spark.scheduler;

import java.lang.reflect.Proxy;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kylin.cluster.ResourceInfo;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigBase;
import org.apache.kylin.engine.spark.application.SparkApplication;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.engine.spark.job.KylinBuildEnv$;
import org.apache.spark.application.MockClusterManager;
import org.apache.spark.application.NoRetryException;
import org.apache.spark.scheduler.KylinJobEventLoop;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.EventLoop;
import org.mockito.Mockito;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.Tag;
import org.scalatest.funsuite.AnyFunSuite;
import scala.Predef$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.VolatileObjectRef;

/* compiled from: ClusterMonitorTest.scala */
@ScalaSignature(bytes = "\u0006\u0001!2A\u0001B\u0003\u0001%!)1\u0004\u0001C\u00019!9q\u0004\u0001b\u0001\n\u0003\u0001\u0003BB\u0014\u0001A\u0003%\u0011E\u0001\nDYV\u001cH/\u001a:N_:LGo\u001c:UKN$(B\u0001\u0004\b\u0003%\u00198\r[3ek2,'O\u0003\u0002\t\u0013\u0005)1\u000f]1sW*\u0011!bC\u0001\u0007K:<\u0017N\\3\u000b\u00051i\u0011!B6zY&t'B\u0001\b\u0010\u0003\u0019\t\u0007/Y2iK*\t\u0001#A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001'A\u0011A#G\u0007\u0002+)\u0011acF\u0001\tMVt7/^5uK*\u0011\u0001dD\u0001\ng\u000e\fG.\u0019;fgRL!AG\u000b\u0003\u0017\u0005s\u0017PR;o'VLG/Z\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003u\u0001\"A\b\u0001\u000e\u0003\u0015\taaY8oM&<W#A\u0011\u0011\u0005\t*S\"A\u0012\u000b\u0005\u0011Z\u0011AB2p[6|g.\u0003\u0002'G\tY1*\u001f7j]\u000e{gNZ5h\u0003\u001d\u0019wN\u001c4jO\u0002\u0002")
/* loaded from: input_file:org/apache/kylin/engine/spark/scheduler/ClusterMonitorTest.class */
public class ClusterMonitorTest extends AnyFunSuite {
    private final KylinConfig config = (KylinConfig) Mockito.mock(KylinConfig.class);

    public KylinConfig config() {
        return this.config;
    }

    public ClusterMonitorTest() {
        Mockito.when(config().getMaxAllocationResourceProportion()).thenReturn(Predef$.MODULE$.double2Double(1.0d));
        Mockito.when(BoxesRunTime.boxToDouble(config().getSparkEngineRetryMemoryGradient())).thenReturn(BoxesRunTime.boxToDouble(1.5d));
        Mockito.when(BoxesRunTime.boxToDouble(config().getSparkEngineRetryOverheadMemoryGradient())).thenReturn(BoxesRunTime.boxToDouble(0.2d));
        Mockito.when(config().getClusterManagerClassName()).thenReturn("org.apache.spark.application.MockClusterManager");
        Mockito.when(BoxesRunTime.boxToLong(config().getClusterManagerTimeoutThreshold())).thenReturn(BoxesRunTime.boxToLong(10000L));
        Mockito.when(BoxesRunTime.boxToInteger(config().getSparkEngineMaxRetryTime())).thenReturn(BoxesRunTime.boxToInteger(1));
        test("test scheduleWithFixedDelay", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            AtomicLong atomicLong = new AtomicLong(0L);
            ClusterMonitor clusterMonitor = new ClusterMonitor();
            VolatileObjectRef create = VolatileObjectRef.create("");
            clusterMonitor.scheduleAtFixedRate(() -> {
                atomicLong.addAndGet(5L);
                create.elem = Thread.currentThread().getName();
            }, 5L);
            countDownLatch.await(10L, TimeUnit.SECONDS);
            clusterMonitor.shutdown();
            long j = atomicLong.get();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(j), "==", BoxesRunTime.boxToInteger(10), j == ((long) 10), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 54));
            TripleEqualsSupport.Equalizer convertToEqualizer = this.convertToEqualizer((String) create.elem);
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", "connect-master-guard", convertToEqualizer.$eq$eq$eq("connect-master-guard", Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 55));
        }, new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 43));
        test("test monitor", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            KylinBuildEnv orCreate = KylinBuildEnv$.MODULE$.getOrCreate(this.config());
            Proxy.getInvocationHandler(orCreate.clusterManager()).invoke(orCreate.clusterManager(), MockClusterManager.class.getMethod("setMaxAllocation", ResourceInfo.class), new Object[]{new ResourceInfo(2400, Integer.MAX_VALUE)});
            AtomicLong atomicLong = new AtomicLong(10L);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            new ClusterMonitor().monitor(new AtomicReference(orCreate), new AtomicReference(null), atomicLong, atomicBoolean);
            KylinBuildEnv$.MODULE$.clean();
            long j = atomicLong.get();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(j), "==", BoxesRunTime.boxToInteger(0), j == ((long) 0), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 70));
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(atomicBoolean.get(), "atomicUnreachableSparkMaster.get()", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 71));
        }, new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 58));
        test("test monitor with error", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            KylinBuildEnv orCreate = KylinBuildEnv$.MODULE$.getOrCreate(this.config());
            ((KylinConfigBase) Mockito.doThrow(new Throwable[]{new RuntimeException("test monitor with error")}).when(this.config())).getClusterManagerClassName();
            AtomicReference atomicReference = new AtomicReference(orCreate);
            SparkSession sparkSession = (SparkSession) Mockito.mock(SparkSession.class);
            AtomicReference atomicReference2 = new AtomicReference(sparkSession);
            ((SparkSession) Mockito.doNothing().when(sparkSession)).stop();
            ((EventLoop) Mockito.doNothing().when((KylinJobEventLoop) Mockito.mock(KylinJobEventLoop.class))).post(new JobFailed("Unable to connect to spark master to reach set timeout maximum time", new NoRetryException("Unable to connect to spark master to reach set timeout maximum time")));
            AtomicLong atomicLong = new AtomicLong(10L);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            ClusterMonitor clusterMonitor = new ClusterMonitor();
            clusterMonitor.monitor(atomicReference, atomicReference2, atomicLong, atomicBoolean);
            long j = atomicLong.get();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(j), ">=", BoxesRunTime.boxToInteger(10), j >= ((long) 10), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 89));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(atomicBoolean.get(), "atomicUnreachableSparkMaster.get()", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 90));
            atomicLong.set(0L);
            atomicBoolean.set(false);
            Mockito.when(BoxesRunTime.boxToLong(this.config().getClusterManagerHealthCheckMaxTimes())).thenReturn(BoxesRunTime.boxToLong(20L));
            Mockito.when(BoxesRunTime.boxToLong(this.config().getClusterManagerHealCheckIntervalSecond())).thenReturn(BoxesRunTime.boxToLong(10L));
            clusterMonitor.monitor(atomicReference, atomicReference2, atomicLong, atomicBoolean);
            long j2 = atomicLong.get();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(j2), ">=", BoxesRunTime.boxToInteger(1), j2 >= ((long) 1), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 97));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(atomicBoolean.get(), "atomicUnreachableSparkMaster.get()", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 98));
            atomicLong.set(0L);
            atomicBoolean.set(false);
            Mockito.when(BoxesRunTime.boxToLong(this.config().getClusterManagerHealthCheckMaxTimes())).thenReturn(BoxesRunTime.boxToLong(1L));
            Mockito.when(BoxesRunTime.boxToLong(this.config().getClusterManagerHealCheckIntervalSecond())).thenReturn(BoxesRunTime.boxToLong(10L));
            clusterMonitor.monitor(atomicReference, atomicReference2, atomicLong, atomicBoolean);
            long j3 = atomicLong.get();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(j3), ">=", BoxesRunTime.boxToInteger(1), j3 >= ((long) 1), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 105));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(atomicBoolean.get(), "atomicUnreachableSparkMaster.get()", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 106));
            KylinBuildEnv$.MODULE$.clean();
        }, new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 74));
        test("test monitor with error and spark session is null", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            KylinBuildEnv orCreate = KylinBuildEnv$.MODULE$.getOrCreate(this.config());
            ((KylinConfigBase) Mockito.doThrow(new Throwable[]{new RuntimeException("test monitor with error")}).when(this.config())).getClusterManagerClassName();
            AtomicReference atomicReference = new AtomicReference(orCreate);
            AtomicReference atomicReference2 = new AtomicReference(null);
            ((EventLoop) Mockito.doNothing().when((KylinJobEventLoop) Mockito.mock(KylinJobEventLoop.class))).post(new JobFailed("Unable to connect to spark master to reach set timeout maximum time", new NoRetryException("Unable to connect to spark master to reach set timeout maximum time")));
            AtomicLong atomicLong = new AtomicLong(10L);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            ClusterMonitor clusterMonitor = new ClusterMonitor();
            clusterMonitor.monitor(atomicReference, atomicReference2, atomicLong, atomicBoolean);
            long j = atomicLong.get();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(j), ">=", BoxesRunTime.boxToInteger(10), j >= ((long) 10), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 124));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(atomicBoolean.get(), "atomicUnreachableSparkMaster.get()", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 125));
            atomicLong.set(0L);
            atomicBoolean.set(true);
            Mockito.when(BoxesRunTime.boxToLong(this.config().getClusterManagerHealthCheckMaxTimes())).thenReturn(BoxesRunTime.boxToLong(20L));
            Mockito.when(BoxesRunTime.boxToLong(this.config().getClusterManagerHealCheckIntervalSecond())).thenReturn(BoxesRunTime.boxToLong(10L));
            clusterMonitor.monitor(atomicReference, atomicReference2, atomicLong, atomicBoolean);
            long j2 = atomicLong.get();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(j2), ">=", BoxesRunTime.boxToInteger(1), j2 >= ((long) 1), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 132));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(atomicBoolean.get(), "atomicUnreachableSparkMaster.get()", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 133));
            atomicLong.set(0L);
            atomicBoolean.set(false);
            Mockito.when(BoxesRunTime.boxToLong(this.config().getClusterManagerHealthCheckMaxTimes())).thenReturn(BoxesRunTime.boxToLong(1L));
            Mockito.when(BoxesRunTime.boxToLong(this.config().getClusterManagerHealCheckIntervalSecond())).thenReturn(BoxesRunTime.boxToLong(10L));
            clusterMonitor.monitor(atomicReference, atomicReference2, atomicLong, atomicBoolean);
            long j3 = atomicLong.get();
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(BoxesRunTime.boxToLong(j3), ">=", BoxesRunTime.boxToInteger(1), j3 >= ((long) 1), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 140));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.notBool(Bool$.MODULE$.simpleMacroBool(atomicBoolean.get(), "atomicUnreachableSparkMaster.get()", Prettifier$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 141));
            KylinBuildEnv$.MODULE$.clean();
        }, new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 111));
        test("test sparkApplication extraDestroy", Predef$.MODULE$.wrapRefArray(new Tag[0]), () -> {
            final ClusterMonitorTest clusterMonitorTest = null;
            new SparkApplication(clusterMonitorTest) { // from class: org.apache.kylin.engine.spark.scheduler.ClusterMonitorTest$$anon$1
                public void doExecute() throws Exception {
                }
            }.extraDestroy();
        }, new Position("ClusterMonitorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 146));
    }
}
