package com.datatorrent.stram.debug;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StringCodec;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.api.ContainerContext;
import com.datatorrent.stram.api.ContainerEvent;
import com.datatorrent.stram.api.RequestFactory;
import com.datatorrent.stram.api.StramToNodeStartRecordingRequest;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.engine.Node;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import com.datatorrent.stram.plan.logical.Operators;
import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
import com.datatorrent.stram.webapp.asm.MethodSignatureVisitor;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import net.engio.mbassy.listener.Handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/debug/TupleRecorderCollection.class */
public class TupleRecorderCollection extends HashMap<OperatorIdPortNamePair, TupleRecorder> implements Component<Context> {
    private int tupleRecordingPartFileSize;
    private String gatewayAddress;
    private boolean gatewayUseSsl = false;
    private String gatewayUserName;
    private String gatewayPassword;
    private long tupleRecordingPartFileTimeMillis;
    private String appPath;
    private String appId;
    private SharedPubSubWebSocketClient wsClient;
    private Map<Class<?>, Class<? extends StringCodec<?>>> codecs;
    private static final long serialVersionUID = 201309112123L;
    private static final Logger logger = LoggerFactory.getLogger(TupleRecorderCollection.class);

    /* renamed from: com.datatorrent.stram.debug.TupleRecorderCollection$2, reason: invalid class name */
    /* loaded from: input_file:com/datatorrent/stram/debug/TupleRecorderCollection$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$datatorrent$stram$api$StreamingContainerUmbilicalProtocol$StramToNodeRequest$RequestType = new int[StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.values().length];

        static {
            try {
                $SwitchMap$com$datatorrent$stram$api$StreamingContainerUmbilicalProtocol$StramToNodeRequest$RequestType[StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.START_RECORDING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$datatorrent$stram$api$StreamingContainerUmbilicalProtocol$StramToNodeRequest$RequestType[StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.STOP_RECORDING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$datatorrent$stram$api$StreamingContainerUmbilicalProtocol$StramToNodeRequest$RequestType[StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.SYNC_RECORDING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/debug/TupleRecorderCollection$RequestDelegateImpl.class */
    private class RequestDelegateImpl implements RequestFactory.RequestDelegate {
        private RequestDelegateImpl() {
        }

        @Override // com.datatorrent.stram.api.RequestFactory.RequestDelegate
        public StatsListener.OperatorRequest getRequestExecutor(final Node<?> node, final StreamingContainerUmbilicalProtocol.StramToNodeRequest stramToNodeRequest) {
            switch (AnonymousClass2.$SwitchMap$com$datatorrent$stram$api$StreamingContainerUmbilicalProtocol$StramToNodeRequest$RequestType[stramToNodeRequest.getRequestType().ordinal()]) {
                case MethodSignatureVisitor.VISIT_PARAM /* 1 */:
                    return new StatsListener.OperatorRequest() { // from class: com.datatorrent.stram.debug.TupleRecorderCollection.RequestDelegateImpl.1
                        public StatsListener.OperatorResponse execute(Operator operator, int i, long j) throws IOException {
                            StramToNodeStartRecordingRequest stramToNodeStartRecordingRequest = (StramToNodeStartRecordingRequest) stramToNodeRequest;
                            TupleRecorderCollection.this.startRecording(stramToNodeStartRecordingRequest.getId(), node, i, stramToNodeStartRecordingRequest.getPortName(), stramToNodeStartRecordingRequest.getNumWindows());
                            return null;
                        }

                        public String toString() {
                            return "Start Recording";
                        }
                    };
                case MethodSignatureVisitor.VISIT_RETURN /* 2 */:
                    return new StatsListener.OperatorRequest() { // from class: com.datatorrent.stram.debug.TupleRecorderCollection.RequestDelegateImpl.2
                        public StatsListener.OperatorResponse execute(Operator operator, int i, long j) throws IOException {
                            TupleRecorderCollection.this.stopRecording(node, i, stramToNodeRequest.getPortName());
                            return null;
                        }

                        public String toString() {
                            return "Stop Recording";
                        }
                    };
                case MethodSignatureVisitor.VISIT_EXCEPTION /* 3 */:
                    return new StatsListener.OperatorRequest() { // from class: com.datatorrent.stram.debug.TupleRecorderCollection.RequestDelegateImpl.3
                        public StatsListener.OperatorResponse execute(Operator operator, int i, long j) throws IOException {
                            TupleRecorderCollection.this.syncRecording(node, i, stramToNodeRequest.getPortName());
                            return null;
                        }

                        public String toString() {
                            return "Recording Request";
                        }
                    };
                default:
                    throw new UnsupportedOperationException("Unknown request type " + stramToNodeRequest.requestType);
            }
        }
    }

    public TupleRecorder getTupleRecorder(int i, String str) {
        return get(new OperatorIdPortNamePair(i, str));
    }

    public void setup(Context context) {
        this.tupleRecordingPartFileSize = ((Integer) context.getValue(LogicalPlan.TUPLE_RECORDING_PART_FILE_SIZE)).intValue();
        this.tupleRecordingPartFileTimeMillis = ((Integer) context.getValue(LogicalPlan.TUPLE_RECORDING_PART_FILE_TIME_MILLIS)).intValue();
        this.appId = (String) context.getValue(LogicalPlan.APPLICATION_ID);
        this.gatewayAddress = (String) context.getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
        this.gatewayUseSsl = ((Boolean) context.getValue(LogicalPlan.GATEWAY_USE_SSL)).booleanValue();
        this.gatewayUserName = (String) context.getValue(LogicalPlan.GATEWAY_USER_NAME);
        this.gatewayPassword = (String) context.getValue(LogicalPlan.GATEWAY_PASSWORD);
        this.appPath = (String) context.getValue(LogicalPlan.APPLICATION_PATH);
        this.codecs = (Map) context.getAttributes().get(Context.DAGContext.STRING_CODECS);
        RequestDelegateImpl requestDelegateImpl = new RequestDelegateImpl();
        RequestFactory requestFactory = (RequestFactory) context.getValue(ContainerContext.REQUEST_FACTORY);
        if (requestFactory == null) {
            logger.warn("No request factory defined, recording is disabled!");
            return;
        }
        requestFactory.registerDelegate(StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.START_RECORDING, requestDelegateImpl);
        requestFactory.registerDelegate(StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.STOP_RECORDING, requestDelegateImpl);
        requestFactory.registerDelegate(StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.SYNC_RECORDING, requestDelegateImpl);
    }

    public void teardown() {
        Iterator<TupleRecorder> it = values().iterator();
        while (it.hasNext()) {
            it.next().teardown();
        }
        if (this.wsClient != null) {
            this.wsClient.teardown();
        }
        clear();
    }

    public final String getDeclaredStreamId(int i, String str) {
        return String.valueOf(i).concat(LogicalPlanConfiguration.KEY_SEPARATOR).concat(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startRecording(String str, final Node<?> node, int i, final String str2, long j) {
        Operators.PortMappingDescriptor portMappingDescriptor = node.getPortMappingDescriptor();
        OperatorIdPortNamePair operatorIdPortNamePair = new OperatorIdPortNamePair(i, str2);
        boolean z = false;
        if (containsKey(new OperatorIdPortNamePair(i, null))) {
            z = true;
        } else if (str2 == null) {
            Iterator<Map.Entry<String, Operators.PortContextPair<Operator.InputPort<?>>>> it = portMappingDescriptor.inputPorts.entrySet().iterator();
            while (true) {
                if (it.hasNext()) {
                    if (containsKey(new OperatorIdPortNamePair(i, it.next().getKey()))) {
                        z = true;
                        break;
                    }
                } else {
                    break;
                }
            }
            Iterator<Map.Entry<String, Operators.PortContextPair<Operator.OutputPort<?>>>> it2 = portMappingDescriptor.outputPorts.entrySet().iterator();
            while (true) {
                if (it2.hasNext()) {
                    if (containsKey(new OperatorIdPortNamePair(i, it2.next().getKey()))) {
                        z = true;
                        break;
                    }
                } else {
                    break;
                }
            }
        } else if (containsKey(operatorIdPortNamePair)) {
            z = true;
        }
        if (z) {
            logger.error("Operator id {} is already being recorded.", Integer.valueOf(i));
            return;
        }
        logger.debug("Executing start recording request for {}", operatorIdPortNamePair);
        if (this.gatewayAddress != null && this.wsClient == null) {
            synchronized (this) {
                if (this.wsClient == null) {
                    try {
                        this.wsClient = new SharedPubSubWebSocketClient((this.gatewayUseSsl ? "wss://" : "ws://") + this.gatewayAddress + "/pubsub", 500L);
                        if (this.gatewayUserName != null && this.gatewayPassword != null) {
                            this.wsClient.setLoginUrl((this.gatewayUseSsl ? "https://" : "http://") + this.gatewayAddress + StreamingContainerManager.GATEWAY_LOGIN_URL_PATH);
                            this.wsClient.setUserName(this.gatewayUserName);
                            this.wsClient.setPassword(this.gatewayPassword);
                        }
                        this.wsClient.setup();
                    } catch (Exception e) {
                        logger.warn("Error initializing websocket", e);
                    }
                }
            }
        }
        TupleRecorder tupleRecorder = new TupleRecorder(str, this.appId);
        tupleRecorder.setWebSocketClient(this.wsClient);
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Operators.PortContextPair<Operator.InputPort<?>>> entry : portMappingDescriptor.inputPorts.entrySet()) {
            String declaredStreamId = getDeclaredStreamId(i, entry.getKey());
            if (declaredStreamId == null) {
                declaredStreamId = str2 + "_implicit_stream";
            }
            if (entry.getValue().context != 0 && (str2 == null || entry.getKey().equals(str2))) {
                logger.debug("Adding recorder sink to input port {}, stream {}", entry.getKey(), declaredStreamId);
                tupleRecorder.addInputPortInfo(entry.getKey(), declaredStreamId);
                hashMap.put(entry.getKey(), tupleRecorder.newSink(entry.getKey()));
            }
        }
        for (Map.Entry<String, Operators.PortContextPair<Operator.OutputPort<?>>> entry2 : portMappingDescriptor.outputPorts.entrySet()) {
            String declaredStreamId2 = getDeclaredStreamId(i, entry2.getKey());
            if (declaredStreamId2 == null) {
                declaredStreamId2 = str2 + "_implicit_stream";
            }
            if (str2 == null || entry2.getKey().equals(str2)) {
                logger.debug("Adding recorder sink to output port {}, stream {}", entry2.getKey(), declaredStreamId2);
                tupleRecorder.addOutputPortInfo(entry2.getKey(), declaredStreamId2);
                hashMap.put(entry2.getKey(), tupleRecorder.newSink(entry2.getKey()));
            }
        }
        if (hashMap.isEmpty()) {
            logger.warn("Tuple recording request ignored because operator is not connected on the specified port.");
            return;
        }
        logger.debug("Started recording on {} through {}", operatorIdPortNamePair, Integer.valueOf(System.identityHashCode(this)));
        tupleRecorder.getStorage().setBasePath(this.appPath + "/recordings/" + i + "/" + tupleRecorder.getId());
        tupleRecorder.getStorage().setBytesPerPartFile(this.tupleRecordingPartFileSize);
        tupleRecorder.getStorage().setMillisPerPartFile(this.tupleRecordingPartFileTimeMillis);
        node.addSinks(hashMap);
        tupleRecorder.setup(node.getOperator(), this.codecs);
        put(operatorIdPortNamePair, tupleRecorder);
        if (j > 0) {
            tupleRecorder.setNumWindows(j, new Runnable() { // from class: com.datatorrent.stram.debug.TupleRecorderCollection.1
                @Override // java.lang.Runnable
                public void run() {
                    node.context.request(new StatsListener.OperatorRequest() { // from class: com.datatorrent.stram.debug.TupleRecorderCollection.1.1
                        public StatsListener.OperatorResponse execute(Operator operator, int i2, long j2) throws IOException {
                            TupleRecorderCollection.this.stopRecording(node, i2, str2);
                            return null;
                        }
                    });
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void stopRecording(Node<?> node, int i, String str) {
        TupleRecorder value;
        OperatorIdPortNamePair operatorIdPortNamePair = new OperatorIdPortNamePair(i, str);
        if (containsKey(operatorIdPortNamePair)) {
            logger.debug("Executing stop recording request for {}", operatorIdPortNamePair);
            TupleRecorder tupleRecorder = get(operatorIdPortNamePair);
            if (tupleRecorder != null) {
                node.removeSinks(tupleRecorder.getSinkMap());
                tupleRecorder.teardown();
                logger.debug("Stopped recording for {}", operatorIdPortNamePair);
                remove(operatorIdPortNamePair);
                return;
            }
            return;
        }
        if (str != null) {
            logger.error("Operator/port {} is not being recorded.", operatorIdPortNamePair);
            return;
        }
        Iterator<Map.Entry<OperatorIdPortNamePair, TupleRecorder>> it = entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<OperatorIdPortNamePair, TupleRecorder> next = it.next();
            if (i == next.getKey().operatorId && (value = next.getValue()) != null) {
                node.removeSinks(value.getSinkMap());
                value.teardown();
                logger.debug("Stopped recording for operator/port {}", operatorIdPortNamePair);
                it.remove();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void syncRecording(Node<?> node, int i, String str) {
        TupleRecorder value;
        OperatorIdPortNamePair operatorIdPortNamePair = new OperatorIdPortNamePair(i, str);
        if (containsKey(operatorIdPortNamePair)) {
            logger.debug("Executing sync recording request for {}", operatorIdPortNamePair);
            TupleRecorder tupleRecorder = get(operatorIdPortNamePair);
            if (tupleRecorder != null) {
                tupleRecorder.getStorage().requestSync();
                logger.debug("Requested sync recording for operator/port {}", operatorIdPortNamePair);
                return;
            }
            return;
        }
        if (str != null) {
            logger.error("(SYNC_RECORDING) Operator/port {} is not being recorded.", operatorIdPortNamePair);
            return;
        }
        for (Map.Entry<OperatorIdPortNamePair, TupleRecorder> entry : entrySet()) {
            if (i == entry.getKey().operatorId && (value = entry.getValue()) != null) {
                value.getStorage().requestSync();
                logger.debug("Requested sync recording for operator/port {}", operatorIdPortNamePair);
            }
        }
    }

    @Handler
    public void activated(ContainerEvent.NodeActivationEvent nodeActivationEvent) {
        Node<?> node = nodeActivationEvent.getNode();
        if (((Boolean) node.context.getValue(Context.OperatorContext.AUTO_RECORD)).booleanValue()) {
            startRecording(null, node, node.getId(), null, 0L);
            return;
        }
        for (Map.Entry<String, Operators.PortContextPair<Operator.InputPort<?>>> entry : node.getPortMappingDescriptor().inputPorts.entrySet()) {
            if (entry.getValue().context != 0 && ((Boolean) entry.getValue().context.getValue(Context.PortContext.AUTO_RECORD)).booleanValue()) {
                startRecording(null, node, node.getId(), entry.getKey(), 0L);
            }
        }
        for (Map.Entry<String, Operators.PortContextPair<Operator.OutputPort<?>>> entry2 : node.getPortMappingDescriptor().outputPorts.entrySet()) {
            if (entry2.getValue().context != 0 && ((Boolean) entry2.getValue().context.getValue(Context.PortContext.AUTO_RECORD)).booleanValue()) {
                startRecording(null, node, node.getId(), entry2.getKey(), 0L);
            }
        }
    }

    @Handler
    public void deactivated(ContainerEvent.NodeDeactivationEvent nodeDeactivationEvent) {
        Node<?> node = nodeDeactivationEvent.getNode();
        stopRecording(node, node.getId(), null);
    }

    @Handler
    public void collected(ContainerEvent.ContainerStatsEvent containerStatsEvent) {
        String id;
        StreamingContainerUmbilicalProtocol.ContainerStats containerStats = containerStatsEvent.getContainerStats();
        Iterator<StreamingContainerUmbilicalProtocol.OperatorHeartbeat> it = containerStats.operators.iterator();
        while (it.hasNext()) {
            Iterator<Stats.OperatorStats> it2 = it.next().windowStats.iterator();
            while (it2.hasNext()) {
                Stats.OperatorStats next = it2.next();
                if (next.inputPorts != null) {
                    Iterator it3 = next.inputPorts.iterator();
                    while (it3.hasNext()) {
                        ((Stats.OperatorStats.PortStats) it3.next()).recordingId = null;
                    }
                }
                if (next.outputPorts != null) {
                    Iterator it4 = next.outputPorts.iterator();
                    while (it4.hasNext()) {
                        ((Stats.OperatorStats.PortStats) it4.next()).recordingId = null;
                    }
                }
            }
        }
        Iterator<StreamingContainerUmbilicalProtocol.OperatorHeartbeat> it5 = containerStats.operators.iterator();
        while (it5.hasNext()) {
            StreamingContainerUmbilicalProtocol.OperatorHeartbeat next2 = it5.next();
            TupleRecorder tupleRecorder = get(new OperatorIdPortNamePair(next2.nodeId, null));
            if (tupleRecorder == null) {
                id = null;
                for (Map.Entry<OperatorIdPortNamePair, TupleRecorder> entry : entrySet()) {
                    if (entry.getKey().operatorId == next2.nodeId) {
                        Iterator<Stats.OperatorStats> it6 = next2.windowStats.iterator();
                        while (it6.hasNext()) {
                            Stats.OperatorStats next3 = it6.next();
                            if (next3.inputPorts != null) {
                                Iterator it7 = next3.inputPorts.iterator();
                                while (true) {
                                    if (!it7.hasNext()) {
                                        break;
                                    }
                                    Stats.OperatorStats.PortStats portStats = (Stats.OperatorStats.PortStats) it7.next();
                                    if (portStats.id.equals(entry.getKey().portName)) {
                                        portStats.recordingId = entry.getValue().getId();
                                        break;
                                    }
                                }
                            }
                            if (next3.outputPorts != null) {
                                Iterator it8 = next3.outputPorts.iterator();
                                while (true) {
                                    if (it8.hasNext()) {
                                        Stats.OperatorStats.PortStats portStats2 = (Stats.OperatorStats.PortStats) it8.next();
                                        if (portStats2.id.equals(entry.getKey().portName)) {
                                            portStats2.recordingId = entry.getValue().getId();
                                            break;
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            } else {
                id = tupleRecorder.getId();
            }
            Iterator<Stats.OperatorStats> it9 = next2.windowStats.iterator();
            while (it9.hasNext()) {
                it9.next().recordingId = id;
            }
        }
    }
}
