package com.datatorrent.stram.debug;

import com.datatorrent.api.Operator;
import com.datatorrent.api.Sink;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.StringCodec;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.common.codec.JsonStreamCodec;
import com.datatorrent.common.util.ObjectMapperString;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.engine.Node;
import com.datatorrent.stram.tuple.Tuple;
import com.datatorrent.stram.util.FSPartFileCollection;
import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.apex.common.util.PropertiesHelper;
import org.codehaus.jackson.JsonProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/debug/TupleRecorder.class */
public class TupleRecorder {
    public static final String VERSION = "1.2";
    private String id;
    private final String appId;
    private transient StreamCodec<Object> streamCodec;
    private SharedPubSubWebSocketClient wsClient;
    private String recordingNameTopic;
    private Runnable stopProcedure;
    private static final Logger logger = LoggerFactory.getLogger(TupleRecorder.class);
    private static long ERROR_LOG_GAP = PropertiesHelper.getLong("org.apache.apex.stram.tupleRecorder.errorLogGap", StramClientUtils.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS_OVERRIDE, 0, Long.MAX_VALUE);
    private long totalTupleCount = 0;
    private final HashMap<String, PortInfo> portMap = new HashMap<>();
    private final HashMap<String, PortCount> portCountMap = new HashMap<>();
    private transient long currentWindowId = -1;
    private transient ArrayList<Range> windowIdRanges = new ArrayList<>();
    private long startTime = -1;
    private int nextPortIndex = 0;
    private final HashMap<String, Sink<Object>> sinks = new HashMap<>();
    private transient long endWindowTuplesProcessed = 0;
    private int numSubscribers = 0;
    private long numWindows = Long.MAX_VALUE;
    long lastLog = -1;
    private final FSPartFileCollection storage = new FSPartFileCollection() { // from class: com.datatorrent.stram.debug.TupleRecorder.1
        @Override // com.datatorrent.stram.util.FSPartFileCollection
        protected String getIndexExtraInfo() {
            if (TupleRecorder.this.windowIdRanges.isEmpty()) {
                return null;
            }
            ((Range) TupleRecorder.this.windowIdRanges.get(TupleRecorder.this.windowIdRanges.size() - 1)).high = TupleRecorder.this.currentWindowId;
            int i = 0;
            String str = TupleRecorder.convertToString(TupleRecorder.this.windowIdRanges) + ":";
            StringBuilder sb = new StringBuilder("{");
            for (PortCount portCount : TupleRecorder.this.portCountMap.values()) {
                if (i != 0) {
                    sb.append(",");
                }
                sb.append("\"").append(portCount.id).append("\":\"").append(portCount.count).append("\"");
                i++;
            }
            sb.append("}");
            return (str + sb.length()) + ":" + sb.toString();
        }

        @Override // com.datatorrent.stram.util.FSPartFileCollection
        protected void resetIndexExtraInfo() {
            Iterator it = TupleRecorder.this.portCountMap.values().iterator();
            while (it.hasNext()) {
                ((PortCount) it.next()).count = 0L;
            }
            TupleRecorder.this.windowIdRanges.clear();
        }
    };

    /* loaded from: input_file:com/datatorrent/stram/debug/TupleRecorder$PortCount.class */
    public static class PortCount {
        public int id;
        public long count;
    }

    /* loaded from: input_file:com/datatorrent/stram/debug/TupleRecorder$PortInfo.class */
    public static class PortInfo {
        public String name;
        public String streamName;
        public String type;
        public int id;
    }

    /* loaded from: input_file:com/datatorrent/stram/debug/TupleRecorder$Range.class */
    public static class Range {
        public long low;
        public long high;

        public Range() {
            this.low = -1L;
            this.high = -1L;
        }

        public Range(long j, long j2) {
            this.low = -1L;
            this.high = -1L;
            this.low = j;
            this.high = j2;
        }

        public String toString() {
            return "[" + String.valueOf(this.low) + "," + String.valueOf(this.high) + "]";
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/debug/TupleRecorder$RecordInfo.class */
    public static class RecordInfo {
        public long startTime;
        public String appId;
        public Map<String, ObjectMapperString> properties = new HashMap();
    }

    /* loaded from: input_file:com/datatorrent/stram/debug/TupleRecorder$RecorderSink.class */
    public class RecorderSink implements Sink<Object> {
        private final String portName;
        private int count;

        public RecorderSink(String str) {
            this.portName = str;
        }

        public void put(Object obj) {
            this.count++;
            if (!(obj instanceof Tuple)) {
                TupleRecorder.this.writeTuple(obj, this.portName);
                return;
            }
            Tuple tuple = (Tuple) obj;
            MessageType type = tuple.getType();
            if (type == MessageType.BEGIN_WINDOW) {
                TupleRecorder.this.beginWindow(tuple.getWindowId());
            }
            TupleRecorder.this.writeControlTuple(tuple, this.portName);
            if (type == MessageType.END_WINDOW) {
                TupleRecorder.this.endWindow();
            }
        }

        public int getCount(boolean z) {
            try {
                int i = this.count;
                if (z) {
                    this.count = 0;
                }
                return i;
            } catch (Throwable th) {
                if (z) {
                    this.count = 0;
                }
                throw th;
            }
        }
    }

    public TupleRecorder(String str, String str2) {
        this.id = str;
        this.appId = str2;
    }

    public FSPartFileCollection getStorage() {
        return this.storage;
    }

    public RecorderSink newSink(String str) {
        RecorderSink recorderSink = new RecorderSink(str);
        this.sinks.put(str, recorderSink);
        return recorderSink;
    }

    public void setStreamCodec(StreamCodec<Object> streamCodec) {
        this.streamCodec = streamCodec;
    }

    public void setWebSocketClient(SharedPubSubWebSocketClient sharedPubSubWebSocketClient) {
        this.wsClient = sharedPubSubWebSocketClient;
    }

    public Map<String, PortInfo> getPortInfoMap() {
        return Collections.unmodifiableMap(this.portMap);
    }

    public long getTotalTupleCount() {
        return this.totalTupleCount;
    }

    public Map<String, Sink<Object>> getSinkMap() {
        return Collections.unmodifiableMap(this.sinks);
    }

    public void setStartTime(long j) {
        if (this.startTime != -1) {
            throw new IllegalStateException("Tuple recorder has already started at " + this.startTime);
        }
        this.startTime = j;
    }

    public long getStartTime() {
        return this.startTime;
    }

    public String getId() {
        return this.id;
    }

    public void addInputPortInfo(String str, String str2) {
        PortInfo portInfo = new PortInfo();
        portInfo.name = str;
        portInfo.streamName = str2;
        portInfo.type = Node.INPUT;
        int i = this.nextPortIndex;
        this.nextPortIndex = i + 1;
        portInfo.id = i;
        this.portMap.put(str, portInfo);
        PortCount portCount = new PortCount();
        portCount.id = portInfo.id;
        portCount.count = 0L;
        this.portCountMap.put(str, portCount);
    }

    public void addOutputPortInfo(String str, String str2) {
        PortInfo portInfo = new PortInfo();
        portInfo.name = str;
        portInfo.streamName = str2;
        portInfo.type = Node.OUTPUT;
        int i = this.nextPortIndex;
        this.nextPortIndex = i + 1;
        portInfo.id = i;
        this.portMap.put(str, portInfo);
        PortCount portCount = new PortCount();
        portCount.id = portInfo.id;
        portCount.count = 0L;
        this.portCountMap.put(str, portCount);
    }

    public void teardown() {
        logger.info("Closing down tuple recorder.");
        this.storage.teardown();
    }

    public void setup(Operator operator, Map<Class<?>, Class<? extends StringCodec<?>>> map) {
        try {
            this.storage.setup();
            setStartTime(System.currentTimeMillis());
            if (this.id == null) {
                this.id = String.valueOf(this.startTime);
            }
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            byteArrayOutputStream.write("1.2\n".getBytes());
            RecordInfo recordInfo = new RecordInfo();
            recordInfo.startTime = this.startTime;
            recordInfo.appId = this.appId;
            this.streamCodec = new JsonStreamCodec(map);
            if (operator != null) {
                for (PropertyDescriptor propertyDescriptor : Introspector.getBeanInfo(operator.getClass()).getPropertyDescriptors()) {
                    String name = propertyDescriptor.getName();
                    Method readMethod = propertyDescriptor.getReadMethod();
                    if (readMethod != null) {
                        readMethod.setAccessible(true);
                        try {
                            recordInfo.properties.put(name, new ObjectMapperString(this.streamCodec.toByteArray(readMethod.invoke(operator, new Object[0])).stringValue()));
                        } catch (Throwable th) {
                            logger.warn("Cannot serialize property {} for operator {}", name, operator.getClass());
                            recordInfo.properties.put(name, null);
                        }
                    }
                }
            }
            Slice byteArray = this.streamCodec.toByteArray(recordInfo);
            byteArrayOutputStream.write(byteArray.buffer, byteArray.offset, byteArray.length);
            byteArrayOutputStream.write("\n".getBytes());
            Iterator<PortInfo> it = this.portMap.values().iterator();
            while (it.hasNext()) {
                Slice byteArray2 = this.streamCodec.toByteArray(it.next());
                byteArrayOutputStream.write(byteArray2.buffer, byteArray2.offset, byteArray2.length);
                byteArrayOutputStream.write("\n".getBytes());
            }
            this.storage.writeMetaData(byteArrayOutputStream.toByteArray());
            if (this.wsClient != null) {
                this.recordingNameTopic = "applications." + this.appId + ".tupleRecorder." + getStartTime();
                setupWsClient();
            }
        } catch (Exception e) {
            logger.error("Trouble setting up tuple recorder", e);
        }
    }

    private void setupWsClient() throws ExecutionException, IOException, InterruptedException, TimeoutException {
        if (this.wsClient != null) {
            this.wsClient.addHandler(this.recordingNameTopic, true, new SharedPubSubWebSocketClient.Handler() { // from class: com.datatorrent.stram.debug.TupleRecorder.2
                @Override // com.datatorrent.stram.util.SharedPubSubWebSocketClient.Handler
                public void onMessage(String str, String str2, Object obj) {
                    TupleRecorder.this.numSubscribers = Integer.valueOf((String) obj).intValue();
                    TupleRecorder.logger.info("Number of subscribers for recording started at {} is now {}", Long.valueOf(TupleRecorder.this.getStartTime()), Integer.valueOf(TupleRecorder.this.numSubscribers));
                }

                @Override // com.datatorrent.stram.util.SharedPubSubWebSocketClient.Handler
                public void onClose() {
                    TupleRecorder.this.numSubscribers = 0;
                }
            });
        }
    }

    public void beginWindow(long j) {
        if (this.currentWindowId != j) {
            if (j != this.currentWindowId + 1) {
                if (!this.windowIdRanges.isEmpty()) {
                    this.windowIdRanges.get(this.windowIdRanges.size() - 1).high = this.currentWindowId;
                }
                Range range = new Range();
                range.low = j;
                this.windowIdRanges.add(range);
            }
            if (this.windowIdRanges.isEmpty()) {
                Range range2 = new Range();
                range2.low = j;
                this.windowIdRanges.add(range2);
            }
            this.currentWindowId = j;
            this.endWindowTuplesProcessed = 0L;
            try {
                this.storage.writeDataItem(("B:" + System.currentTimeMillis() + ":" + j + "\n").getBytes(), false);
            } catch (IOException e) {
                logger.error(e.toString());
            }
        }
    }

    /*  JADX ERROR: Failed to decode insn: 0x0007: MOVE_MULTI, method: com.datatorrent.stram.debug.TupleRecorder.endWindow():void
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    /*  JADX ERROR: Failed to decode insn: 0x00A7: MOVE_MULTI, method: com.datatorrent.stram.debug.TupleRecorder.endWindow():void
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -2 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    public void endWindow() {
        /*
            r6 = this;
            r0 = r6
            r1 = r0
            long r1 = r1.endWindowTuplesProcessed
            r2 = 1
            long r1 = r1 + r2
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.endWindowTuplesProcessed = r1
            r0 = r6
            java.util.HashMap<java.lang.String, com.datatorrent.stram.debug.TupleRecorder$PortInfo> r0 = r0.portMap
            int r0 = r0.size()
            long r0 = (long) r0
            int r-1 = (r-1 > r0 ? 1 : (r-1 == r0 ? 0 : -1))
            if (r-1 != 0) goto L99
            r-1 = r6
            com.datatorrent.stram.util.FSPartFileCollection r-1 = r-1.storage
            java.lang.StringBuilder r0 = new java.lang.StringBuilder
            r1 = r0
            r1.<init>()
            java.lang.String r1 = "E:"
            java.lang.StringBuilder r0 = r0.append(r1)
            long r1 = java.lang.System.currentTimeMillis()
            java.lang.StringBuilder r0 = r0.append(r1)
            java.lang.String r1 = ":"
            java.lang.StringBuilder r0 = r0.append(r1)
            r1 = r6
            long r1 = r1.currentWindowId
            java.lang.StringBuilder r0 = r0.append(r1)
            java.lang.String r1 = "\n"
            java.lang.StringBuilder r0 = r0.append(r1)
            java.lang.String r0 = r0.toString()
            byte[] r0 = r0.getBytes()
            r1 = 0
            r-1.writeDataItem(r0, r1)
            org.slf4j.Logger r-1 = com.datatorrent.stram.debug.TupleRecorder.logger
            java.lang.String r0 = "Got last end window tuple.  Flushing..."
            r-1.debug(r0)
            r-1 = r6
            com.datatorrent.stram.util.FSPartFileCollection r-1 = r-1.storage
            r-1.flushData()
            if (r-1 != 0) goto L8a
            r-1 = r6
            com.datatorrent.stram.util.SharedPubSubWebSocketClient r-1 = r-1.wsClient
            if (r-1 == 0) goto L8a
            r-1 = r6
            com.datatorrent.stram.util.SharedPubSubWebSocketClient r-1 = r-1.wsClient
            java.lang.StringBuilder r0 = new java.lang.StringBuilder
            r1 = r0
            r1.<init>()
            java.lang.String r1 = "_internal.lastIndex.tuple."
            java.lang.StringBuilder r0 = r0.append(r1)
            r1 = r6
            com.datatorrent.stram.util.FSPartFileCollection r1 = r1.storage
            java.lang.String r1 = r1.getBasePath()
            java.lang.StringBuilder r0 = r0.append(r1)
            java.lang.String r0 = r0.toString()
            r1 = r6
            com.datatorrent.stram.util.FSPartFileCollection r1 = r1.storage
            java.lang.String r1 = r1.getLatestIndexLine()
            r-1.publish(r0, r1)
            goto L99
            r7 = move-exception
            org.slf4j.Logger r0 = com.datatorrent.stram.debug.TupleRecorder.logger
            java.lang.String r1 = "Exception caught in endWindow"
            r2 = r7
            r0.error(r1, r2)
            r-1 = r6
            java.lang.Runnable r-1 = r-1.stopProcedure
            if (r-1 == 0) goto Lb9
            r-1 = r6
            r0 = r-1
            long r0 = r0.numWindows
            r1 = 1
            long r0 = r0 - r1
            // decode failed: arraycopy: source index -2 out of bounds for object array[6]
            r-1.numWindows = r0
            r-1 = 0
            int r-2 = (r-2 > r-1 ? 1 : (r-2 == r-1 ? 0 : -1))
            if (r-2 > 0) goto Lb9
            r-2 = r6
            java.lang.Runnable r-2 = r-2.stopProcedure
            r-2.run()
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.datatorrent.stram.debug.TupleRecorder.endWindow():void");
    }

    public void writeTuple(Object obj, String str) {
        this.totalTupleCount++;
        if (this.windowIdRanges.isEmpty()) {
            throw new RuntimeException("Data tuples received from tuple recorder before any BEGIN_WINDOW");
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            Slice byteArray = this.streamCodec.toByteArray(obj);
            try {
                PortInfo portInfo = this.portMap.get(str);
                byteArrayOutputStream.write(("T:" + System.currentTimeMillis() + ":" + portInfo.id + ":" + byteArray.length + ":").getBytes());
                byteArrayOutputStream.write(byteArray.buffer, byteArray.offset, byteArray.length);
                byteArrayOutputStream.write("\n".getBytes());
                PortCount portCount = this.portCountMap.get(str);
                portCount.count++;
                this.portCountMap.put(str, portCount);
                this.storage.writeDataItem(byteArrayOutputStream.toByteArray(), true);
                if (this.numSubscribers > 0) {
                    publishTupleData(portInfo.id, obj);
                }
            } catch (Exception e) {
                logger.warn("Error saving tuple", e);
            }
        } catch (RuntimeException e2) {
            checkLogTuple(e2, "save", obj);
        }
    }

    public void writeControlTuple(Tuple tuple, String str) {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            PortInfo portInfo = this.portMap.get(str);
            Slice byteArray = this.streamCodec.toByteArray(tuple);
            byteArrayOutputStream.write(("C:" + System.currentTimeMillis() + ":" + portInfo.id + ":" + byteArray.length + ":").getBytes());
            byteArrayOutputStream.write(byteArray.buffer, byteArray.offset, byteArray.length);
            byteArrayOutputStream.write("\n".getBytes());
            this.storage.writeDataItem(byteArrayOutputStream.toByteArray(), false);
        } catch (IOException e) {
            logger.error(e.toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String convertToString(List<Range> list) {
        String str = "";
        int i = 0;
        for (Range range : list) {
            int i2 = i;
            i++;
            if (i2 > 0) {
                str = str + ",";
            }
            str = ((str + String.valueOf(range.low)) + "-") + String.valueOf(range.high);
        }
        return str;
    }

    private void publishTupleData(int i, Object obj) {
        try {
            if (this.wsClient != null && this.wsClient.isConnectionOpen()) {
                HashMap hashMap = new HashMap();
                hashMap.put("portId", String.valueOf(i));
                hashMap.put("windowId", Long.valueOf(this.currentWindowId));
                hashMap.put("tupleCount", Long.valueOf(this.totalTupleCount));
                hashMap.put("data", obj);
                this.wsClient.publish(this.recordingNameTopic, hashMap);
            }
        } catch (Exception e) {
            if (e instanceof JsonProcessingException) {
                checkLogTuple(e, "publish", obj);
            } else {
                logger.warn("Error publishing tuple", e);
            }
        }
    }

    private void checkLogTuple(Exception exc, String str, Object obj) {
        if (this.lastLog == -1 || this.totalTupleCount - this.lastLog >= ERROR_LOG_GAP) {
            this.lastLog = this.totalTupleCount;
            logger.warn("Error serializing during {} for tuple {} ", new Object[]{str, obj, exc});
        }
    }

    public void setNumWindows(long j, Runnable runnable) {
        this.numWindows = j;
        this.stopProcedure = runnable;
    }
}
