package org.apache.heron.simulator.instance;

import java.time.Duration;
import java.util.Map;
import org.apache.heron.api.Config;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.SlaveLooper;
import org.apache.heron.common.basics.TypeUtils;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.instance.IInstance;
import org.apache.heron.shaded.com.google.protobuf.Message;

/* loaded from: input_file:org/apache/heron/simulator/instance/SpoutInstance.class */
public class SpoutInstance extends org.apache.heron.instance.spout.SpoutInstance implements IInstance {
    private final boolean ackEnabled;
    private final int maxSpoutPending;
    private final Duration instanceEmitBatchTime;
    private final ByteAmount instanceEmitBatchSize;

    public SpoutInstance(PhysicalPlanHelper physicalPlanHelper, Communicator<Message> communicator, Communicator<Message> communicator2, SlaveLooper slaveLooper) {
        super(physicalPlanHelper, communicator, communicator2, slaveLooper);
        Map<String, Object> topologyConfig = physicalPlanHelper.getTopologyContext().getTopologyConfig();
        SystemConfig systemConfig = (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.maxSpoutPending = TypeUtils.getInteger(topologyConfig.get("topology.max.spout.pending")).intValue();
        this.instanceEmitBatchTime = systemConfig.getInstanceEmitBatchTime();
        this.instanceEmitBatchSize = systemConfig.getInstanceEmitBatchSize();
        if (topologyConfig.containsKey(Config.TOPOLOGY_RELIABILITY_MODE)) {
            this.ackEnabled = Config.TopologyReliabilityMode.ATLEAST_ONCE.equals(topologyConfig.get(Config.TOPOLOGY_RELIABILITY_MODE).toString());
        } else {
            this.ackEnabled = Boolean.parseBoolean((String) topologyConfig.get(Config.TOPOLOGY_ENABLE_ACKING));
        }
    }

    @Override // org.apache.heron.instance.spout.SpoutInstance
    protected void produceTuple() {
        long totalTuplesEmitted = this.collector.getTotalTuplesEmitted();
        long totalDataEmittedInBytes = this.collector.getTotalDataEmittedInBytes();
        long nanoTime = System.nanoTime();
        do {
            if (this.ackEnabled && this.maxSpoutPending <= this.collector.numInFlight()) {
                return;
            }
            long nanoTime2 = System.nanoTime();
            this.spout.nextTuple();
            this.spoutMetrics.nextTuple(System.nanoTime() - nanoTime2);
            long totalTuplesEmitted2 = this.collector.getTotalTuplesEmitted();
            if (totalTuplesEmitted2 == totalTuplesEmitted) {
                return;
            }
            totalTuplesEmitted = totalTuplesEmitted2;
            if ((System.nanoTime() - nanoTime) - this.instanceEmitBatchTime.toNanos() > 0) {
                return;
            }
        } while (this.collector.getTotalDataEmittedInBytes() - totalDataEmittedInBytes <= this.instanceEmitBatchSize.asBytes());
    }
}
