package org.apache.storm.testing;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.storm.ILocalCluster;
import org.apache.storm.Testing;
import org.apache.storm.Thrift;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.utils.RegisteredGlobalState;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/testing/TrackedTopology.class */
public class TrackedTopology {
    private static final Logger LOG = LoggerFactory.getLogger(TrackedTopology.class);
    private final StormTopology topology;
    private final AtomicInteger lastSpoutCommit;
    private final ILocalCluster cluster;

    public TrackedTopology(StormTopology stormTopology, ILocalCluster iLocalCluster) {
        LOG.warn("CLUSTER {} - {}", iLocalCluster, iLocalCluster.getTrackedId());
        this.cluster = iLocalCluster;
        this.lastSpoutCommit = new AtomicInteger(0);
        String trackedId = iLocalCluster.getTrackedId();
        this.topology = stormTopology.deepCopy();
        for (Bolt bolt : this.topology.get_bolts().values()) {
            bolt.set_bolt_object(Thrift.serializeComponentObject(new BoltTracker((IRichBolt) Thrift.deserializeComponentObject(bolt.get_bolt_object()), trackedId)));
        }
        for (SpoutSpec spoutSpec : this.topology.get_spouts().values()) {
            spoutSpec.set_spout_object(Thrift.serializeComponentObject(new SpoutTracker((IRichSpout) Thrift.deserializeComponentObject(spoutSpec.get_spout_object()), trackedId)));
        }
    }

    private static int globalAmt(String str, String str2) {
        LOG.warn("Reading tracked metrics for ID {}", str);
        return ((AtomicInteger) ((ConcurrentHashMap) RegisteredGlobalState.getState(str)).get(str2)).get();
    }

    public StormTopology getTopology() {
        return this.topology;
    }

    public ILocalCluster getCluster() {
        return this.cluster;
    }

    public void trackedWait() {
        trackedWait(1, Testing.TEST_TIMEOUT_MS);
    }

    public void trackedWait(int i) {
        trackedWait(i, Testing.TEST_TIMEOUT_MS);
    }

    public void trackedWait(int i, int i2) {
        int i3 = i + this.lastSpoutCommit.get();
        String trackedId = this.cluster.getTrackedId();
        ThreadLocalRandom current = ThreadLocalRandom.current();
        Testing.whileTimeout(i2, () -> {
            int globalAmt = globalAmt(trackedId, "spout-emitted");
            int globalAmt2 = globalAmt(trackedId, "transferred");
            int globalAmt3 = globalAmt(trackedId, "processed");
            LOG.info("emitted {} target {} transferred {} processed {}", new Object[]{Integer.valueOf(globalAmt), Integer.valueOf(i3), Integer.valueOf(globalAmt2), Integer.valueOf(globalAmt3)});
            return (i3 == globalAmt && globalAmt2 == globalAmt3) ? false : true;
        }, () -> {
            Time.advanceTimeSecs(1L);
            try {
                Thread.sleep(current.nextInt(200));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        this.lastSpoutCommit.set(i3);
    }

    public int globalAmt(String str) {
        return globalAmt(this.cluster.getTrackedId(), str);
    }
}
