package com.twitter.summingbird.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.testing.CompleteTopologyParam;
import backtype.storm.testing.MockedSources;
import com.twitter.algebird.Semigroup;
import com.twitter.summingbird.Options$;
import com.twitter.summingbird.Producer;
import com.twitter.summingbird.TailProducer;
import com.twitter.summingbird.TimeExtractor;
import com.twitter.summingbird.TimeExtractor$;
import com.twitter.summingbird.online.executor.InflightTuples$;
import com.twitter.summingbird.online.option.FlushFrequency;
import com.twitter.summingbird.option.CacheSize;
import com.twitter.summingbird.option.CacheSize$;
import com.twitter.summingbird.storm.spout.TraversableSpout$;
import com.twitter.util.Duration$;
import scala.Function2;
import scala.MatchError;
import scala.Predef$;
import scala.ScalaObject;
import scala.Tuple2;
import scala.collection.immutable.List;
import scala.runtime.BoxedUnit;

/* compiled from: StormTestRun.scala */
/* loaded from: input_file:com/twitter/summingbird/storm/StormTestRun$.class */
public final class StormTestRun$ implements ScalaObject {
    public static final StormTestRun$ MODULE$ = null;

    static {
        new StormTestRun$();
    }

    private CompleteTopologyParam completeTopologyParam(Config config) {
        CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
        completeTopologyParam.setMockedSources(new MockedSources());
        completeTopologyParam.setStormConf(config);
        completeTopologyParam.setCleanupState(Predef$.MODULE$.boolean2Boolean(false));
        return completeTopologyParam;
    }

    private void tryRun(PlannedTopology plannedTopology) {
        SecurityManager securityManager = System.getSecurityManager();
        System.setSecurityManager(new MySecurityManager());
        InflightTuples$.MODULE$.reset();
        try {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("test topology", plannedTopology.config(), plannedTopology.topology());
            Thread.sleep(4500L);
            localCluster.killTopology("test topology");
            Thread.sleep(1500L);
            localCluster.shutdown();
            System.setSecurityManager(securityManager);
            Predef$.MODULE$.require(InflightTuples$.MODULE$.get() == 0, new StormTestRun$$anonfun$tryRun$1());
        } catch (Throwable th) {
            System.setSecurityManager(securityManager);
            throw th;
        }
    }

    public void apply(TailProducer<Storm, Object> tailProducer, Storm storm) {
        apply(storm.plan(tailProducer));
    }

    public <T, K, V> TestStore<K, V> simpleRun(List<T> list, Function2<Producer<Storm, T>, StormStore<K, V>, TailProducer<Storm, Object>> function2, Semigroup<V> semigroup) {
        Tuple2<String, MergeableStoreSupplier<K, V>> createStore = TestStore$.MODULE$.createStore(TestStore$.MODULE$.createStore$default$1(), semigroup);
        if (createStore == null) {
            throw new MatchError(createStore);
        }
        Tuple2 tuple2 = new Tuple2(createStore._1(), createStore._2());
        String str = (String) tuple2._1();
        apply((TailProducer) function2.apply(Storm$.MODULE$.source(TraversableSpout$.MODULE$.apply(list, TraversableSpout$.MODULE$.apply$default$2()), Storm$.MODULE$.source$default$2(), extractor$1()), (MergeableStoreSupplier) tuple2._2()), Storm$.MODULE$.local(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$.MODULE$.any2ArrowAssoc("DEFAULT").$minus$greater(Options$.MODULE$.apply(Options$.MODULE$.apply$default$1()).set(new CacheSize(4, CacheSize$.MODULE$.apply$default$2())).set(new FlushFrequency(Duration$.MODULE$.fromMilliseconds(1L))))}))));
        return (TestStore) TestStore$.MODULE$.apply(str, semigroup).getOrElse(new StormTestRun$$anonfun$simpleRun$1());
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v4 */
    public void apply(PlannedTopology plannedTopology) {
        ?? r0 = this;
        synchronized (r0) {
            liftedTree1$1(plannedTopology);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
        }
    }

    private final TimeExtractor extractor$1() {
        return TimeExtractor$.MODULE$.apply(new StormTestRun$$anonfun$extractor$1$1());
    }

    private final void liftedTree1$1(PlannedTopology plannedTopology) {
        try {
            tryRun(plannedTopology);
        } catch (Throwable unused) {
            Thread.sleep(3000L);
            tryRun(plannedTopology);
        }
    }

    private StormTestRun$() {
        MODULE$ = this;
    }
}
