package org.apache.griffin.measure.process;

import org.apache.griffin.measure.data.source.DataSource;
import org.apache.griffin.measure.data.source.DataSourceFactory$;
import org.apache.griffin.measure.persist.MultiPersists;
import org.apache.griffin.measure.persist.PersistFactory;
import org.apache.griffin.measure.process.engine.DqEngineFactory$;
import org.apache.griffin.measure.process.engine.DqEngines;
import org.apache.griffin.measure.utils.TimeUtil$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.StreamingContext$;
import scala.Option;
import scala.Serializable;
import scala.Some;
import scala.collection.Seq;
import scala.runtime.AbstractFunction0$mcV$sp;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: StreamingDqProcess.scala */
/* loaded from: input_file:org/apache/griffin/measure/process/StreamingDqProcess$$anonfun$run$1.class */
public final class StreamingDqProcess$$anonfun$run$1 extends AbstractFunction0$mcV$sp implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ StreamingDqProcess $outer;

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // scala.Function0$mcV$sp
    public final void apply() {
        apply$mcV$sp();
    }

    @Override // scala.runtime.AbstractFunction0, scala.Function0
    public void apply$mcV$sp() {
        StreamingContext orCreate = StreamingContext$.MODULE$.getOrCreate(this.$outer.sparkParam().cpDir(), new StreamingDqProcess$$anonfun$run$1$$anonfun$3(this), StreamingContext$.MODULE$.getOrCreate$default$3(), StreamingContext$.MODULE$.getOrCreate$default$4());
        long appTime = this.$outer.getAppTime();
        PersistFactory persistFactory = new PersistFactory(this.$outer.envParam().persistParams(), this.$outer.metricName());
        MultiPersists persists = persistFactory.getPersists(appTime);
        persists.start(this.$outer.sparkSession().sparkContext().applicationId());
        DqEngines genDqEngines = DqEngineFactory$.MODULE$.genDqEngines(this.$outer.sqlContext());
        Seq<DataSource> genDataSources = DataSourceFactory$.MODULE$.genDataSources(this.$outer.sqlContext(), orCreate, genDqEngines, this.$outer.userParam().dataSources());
        genDataSources.foreach(new StreamingDqProcess$$anonfun$run$1$$anonfun$apply$mcV$sp$1(this));
        StreamingDqThread streamingDqThread = new StreamingDqThread(this.$outer.sqlContext(), genDqEngines, genDataSources, this.$outer.userParam().evaluateRuleParam(), persistFactory, persists);
        Option<Object> milliseconds = TimeUtil$.MODULE$.milliseconds(this.$outer.sparkParam().processInterval());
        if (!(milliseconds instanceof Some)) {
            throw new Exception("invalid batch interval");
        }
        new TimingProcess(BoxesRunTime.unboxToLong(((Some) milliseconds).x()), streamingDqThread).startup();
        orCreate.start();
        orCreate.awaitTermination();
        orCreate.stop(true, true);
        persists.finish();
    }

    public /* synthetic */ StreamingDqProcess org$apache$griffin$measure$process$StreamingDqProcess$$anonfun$$$outer() {
        return this.$outer;
    }

    @Override // scala.Function0
    /* renamed from: apply */
    public final /* bridge */ /* synthetic */ BoxedUnit mo2apply() {
        apply();
        return BoxedUnit.UNIT;
    }

    public StreamingDqProcess$$anonfun$run$1(StreamingDqProcess streamingDqProcess) {
        if (streamingDqProcess == null) {
            throw null;
        }
        this.$outer = streamingDqProcess;
    }
}
