package com.datatorrent.stram.engine;

import com.datatorrent.api.Stats;
import com.datatorrent.stram.ComponentContextPair;
import com.datatorrent.stram.api.ContainerEvent;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import net.engio.mbassy.listener.Handler;

/* loaded from: input_file:com/datatorrent/stram/engine/BufferServerStatsSubscriber.class */
public class BufferServerStatsSubscriber {
    private HashMap<String, ByteCounterStream> inputStreams = new HashMap<>();
    private HashMap<String, List<ByteCounterStream>> outputStreams = new HashMap<>();

    @Handler
    public void handleStreamActivation(ContainerEvent.StreamActivationEvent streamActivationEvent) {
        ComponentContextPair<Stream, StreamContext> stream = streamActivationEvent.getStream();
        String portId = stream.context.getPortId();
        String sinkId = stream.context.getSinkId();
        if (stream.component instanceof ByteCounterStream) {
            if (!sinkId.startsWith("tcp:")) {
                this.inputStreams.put(portId, (ByteCounterStream) stream.component);
                return;
            }
            List<ByteCounterStream> list = this.outputStreams.get(portId);
            if (list == null) {
                list = new ArrayList();
                this.outputStreams.put(portId, list);
            }
            list.add((ByteCounterStream) stream.component);
        }
    }

    @Handler
    public void handleStreamDeactivation(ContainerEvent.StreamDeactivationEvent streamDeactivationEvent) {
        ComponentContextPair<Stream, StreamContext> stream = streamDeactivationEvent.getStream();
        String portId = stream.context.getPortId();
        String sinkId = stream.context.getSinkId();
        if (stream.component instanceof ByteCounterStream) {
            if (!sinkId.startsWith("tcp:")) {
                this.inputStreams.remove(portId);
                return;
            }
            List<ByteCounterStream> list = this.outputStreams.get(portId);
            if (list != null) {
                list.remove(stream);
                if (list.size() == 0) {
                    this.outputStreams.remove(portId);
                }
            }
        }
    }

    @Handler
    public void handleContainerStats(ContainerEvent.ContainerStatsEvent containerStatsEvent) {
        Iterator<StreamingContainerUmbilicalProtocol.OperatorHeartbeat> it = containerStatsEvent.getContainerStats().operators.iterator();
        while (it.hasNext()) {
            Iterator<Stats.OperatorStats> it2 = it.next().windowStats.iterator();
            while (it2.hasNext()) {
                Stats.OperatorStats next = it2.next();
                if (next.inputPorts != null) {
                    Iterator it3 = next.inputPorts.iterator();
                    while (it3.hasNext()) {
                        Stats.OperatorStats.PortStats portStats = (Stats.OperatorStats.PortStats) it3.next();
                        ByteCounterStream byteCounterStream = this.inputStreams.get(portStats.id);
                        if (byteCounterStream != null) {
                            portStats.bufferServerBytes = byteCounterStream.getByteCount(true);
                        }
                    }
                }
                if (next.outputPorts != null) {
                    Iterator it4 = next.outputPorts.iterator();
                    while (it4.hasNext()) {
                        Stats.OperatorStats.PortStats portStats2 = (Stats.OperatorStats.PortStats) it4.next();
                        List<ByteCounterStream> list = this.outputStreams.get(portStats2.id);
                        if (this.outputStreams != null) {
                            portStats2.bufferServerBytes = 0L;
                            Iterator<ByteCounterStream> it5 = list.iterator();
                            while (it5.hasNext()) {
                                portStats2.bufferServerBytes = it5.next().getByteCount(true);
                            }
                        }
                    }
                }
            }
        }
    }
}
