package com.datatorrent.stram.debug;

import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.debug.TupleRecorder;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.support.StramTestSupport;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/debug/TupleRecorderTest.class */
public class TupleRecorderTest {
    private final String classname = "com.datatorrent.stram.debug.TupleRecorderCollection";
    private static final int testTupleCount = 10;
    private static final File testWorkDir = new File("target", TupleRecorderTest.class.getName());
    private static final Logger logger = LoggerFactory.getLogger(TupleRecorderTest.class);

    /* loaded from: input_file:com/datatorrent/stram/debug/TupleRecorderTest$Tuple.class */
    public class Tuple {
        public String key;
        public String value;

        public Tuple() {
        }
    }

    @Before
    public void setup() throws IOException {
        StreamingContainer.eventloop.start();
    }

    @After
    public void teardown() {
        StreamingContainer.eventloop.stop();
    }

    public TupleRecorder getTupleRecorder(StramLocalCluster stramLocalCluster, PTOperator pTOperator) {
        return ((TupleRecorderCollection) stramLocalCluster.getContainer(pTOperator).getInstance(this.classname)).getTupleRecorder(pTOperator.getId(), (String) null);
    }

    @Test
    public void testRecorder() throws IOException {
        LocalFileSystem localFileSystem = new LocalFileSystem();
        try {
            try {
                TupleRecorder tupleRecorder = new TupleRecorder((String) null, "application_test_id_1");
                tupleRecorder.getStorage().setBytesPerPartFile(4096);
                tupleRecorder.getStorage().setLocalMode(true);
                tupleRecorder.getStorage().setBasePath("file://" + testWorkDir.getAbsolutePath() + "/recordings");
                tupleRecorder.addInputPortInfo("ip1", "str1");
                tupleRecorder.addInputPortInfo("ip2", "str2");
                tupleRecorder.addInputPortInfo("ip3", "str3");
                tupleRecorder.addOutputPortInfo("op1", "str4");
                tupleRecorder.setup((Operator) null, (Map) null);
                tupleRecorder.beginWindow(1000L);
                tupleRecorder.beginWindow(1000L);
                tupleRecorder.beginWindow(1000L);
                Tuple tuple = new Tuple();
                tuple.key = "speed";
                tuple.value = "5m/h";
                tupleRecorder.writeTuple(tuple, "ip1");
                tupleRecorder.endWindow();
                Tuple tuple2 = new Tuple();
                tuple2.key = "speed";
                tuple2.value = "4m/h";
                tupleRecorder.writeTuple(tuple2, "ip3");
                tupleRecorder.endWindow();
                Tuple tuple3 = new Tuple();
                tuple3.key = "speed";
                tuple3.value = "6m/h";
                tupleRecorder.writeTuple(tuple3, "ip2");
                tupleRecorder.endWindow();
                tupleRecorder.beginWindow(1000L);
                Tuple tuple4 = new Tuple();
                tuple4.key = "speed";
                tuple4.value = "2m/h";
                tupleRecorder.writeTuple(tuple4, "op1");
                tupleRecorder.endWindow();
                tupleRecorder.teardown();
                localFileSystem.initialize(new Path(tupleRecorder.getStorage().getBasePath()).toUri(), new Configuration());
                Assert.assertTrue("check index", new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(tupleRecorder.getStorage().getBasePath(), "index.txt")))).readLine().matches("F:part0.txt:\\d+-\\d+:4:T:1000-1000:33:\\{\"3\":\"1\",\"1\":\"1\",\"0\":\"1\",\"2\":\"1\"\\}"));
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(tupleRecorder.getStorage().getBasePath(), "meta.txt"))));
                ObjectMapper objectMapper = new ObjectMapper();
                Assert.assertEquals("check version", "1.2", bufferedReader.readLine());
                bufferedReader.readLine();
                TupleRecorder.PortInfo portInfo = (TupleRecorder.PortInfo) objectMapper.readValue(bufferedReader.readLine(), TupleRecorder.PortInfo.class);
                Assert.assertEquals("port1", ((TupleRecorder.PortInfo) tupleRecorder.getPortInfoMap().get(portInfo.name)).id, portInfo.id);
                Assert.assertEquals("port1", ((TupleRecorder.PortInfo) tupleRecorder.getPortInfoMap().get(portInfo.name)).type, portInfo.type);
                TupleRecorder.PortInfo portInfo2 = (TupleRecorder.PortInfo) objectMapper.readValue(bufferedReader.readLine(), TupleRecorder.PortInfo.class);
                Assert.assertEquals("port2", ((TupleRecorder.PortInfo) tupleRecorder.getPortInfoMap().get(portInfo2.name)).id, portInfo2.id);
                Assert.assertEquals("port2", ((TupleRecorder.PortInfo) tupleRecorder.getPortInfoMap().get(portInfo2.name)).type, portInfo2.type);
                TupleRecorder.PortInfo portInfo3 = (TupleRecorder.PortInfo) objectMapper.readValue(bufferedReader.readLine(), TupleRecorder.PortInfo.class);
                Assert.assertEquals("port3", ((TupleRecorder.PortInfo) tupleRecorder.getPortInfoMap().get(portInfo3.name)).id, portInfo3.id);
                Assert.assertEquals("port3", ((TupleRecorder.PortInfo) tupleRecorder.getPortInfoMap().get(portInfo3.name)).type, portInfo3.type);
                TupleRecorder.PortInfo portInfo4 = (TupleRecorder.PortInfo) objectMapper.readValue(bufferedReader.readLine(), TupleRecorder.PortInfo.class);
                Assert.assertEquals("port4", ((TupleRecorder.PortInfo) tupleRecorder.getPortInfoMap().get(portInfo4.name)).id, portInfo4.id);
                Assert.assertEquals("port4", ((TupleRecorder.PortInfo) tupleRecorder.getPortInfoMap().get(portInfo4.name)).type, portInfo4.type);
                Assert.assertEquals("port size", 4L, tupleRecorder.getPortInfoMap().size());
                BufferedReader bufferedReader2 = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(tupleRecorder.getStorage().getBasePath(), "part0.txt"))));
                String readLine = bufferedReader2.readLine();
                Assert.assertTrue("check part0", readLine.startsWith("B:"));
                Assert.assertTrue("check part0", readLine.endsWith(":1000"));
                String readLine2 = bufferedReader2.readLine();
                Assert.assertTrue("check part0 1", readLine2.startsWith("T:"));
                Assert.assertTrue("check part0 1", readLine2.endsWith(":0:30:{\"key\":\"speed\",\"value\":\"5m/h\"}"));
                String readLine3 = bufferedReader2.readLine();
                Assert.assertTrue("check part0 2", readLine3.startsWith("T:"));
                Assert.assertTrue("check part0 2", readLine3.endsWith(":2:30:{\"key\":\"speed\",\"value\":\"4m/h\"}"));
                String readLine4 = bufferedReader2.readLine();
                Assert.assertTrue("check part0 3", readLine4.startsWith("T:"));
                Assert.assertTrue("check part0 3", readLine4.endsWith(":1:30:{\"key\":\"speed\",\"value\":\"6m/h\"}"));
                String readLine5 = bufferedReader2.readLine();
                Assert.assertTrue("check part0 4", readLine5.startsWith("T:"));
                Assert.assertTrue("check part0 4", readLine5.endsWith(":3:30:{\"key\":\"speed\",\"value\":\"2m/h\"}"));
                String readLine6 = bufferedReader2.readLine();
                Assert.assertTrue("check part0 5", readLine6.startsWith("E:"));
                Assert.assertTrue("check part0 5", readLine6.endsWith(":1000"));
                localFileSystem.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            localFileSystem.close();
            throw th;
        }
    }

    @Test
    public void testRecordingFlow() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir.getAbsolutePath(), (Configuration) null));
        logicalPlan.getAttributes().put(LogicalPlan.APPLICATION_PATH, "file://" + testWorkDir.getAbsolutePath());
        logicalPlan.getAttributes().put(LogicalPlan.TUPLE_RECORDING_PART_FILE_SIZE, 1024);
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("op1", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("op2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("op3", GenericTestOperator.class);
        addOperator.setEmitInterval(100);
        logicalPlan.addStream("stream1", addOperator.outport, addOperator2.inport1);
        logicalPlan.addStream("stream2", addOperator2.outport1, addOperator3.inport1);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan);
        stramLocalCluster.runAsync();
        PTOperator findByLogicalNode = stramLocalCluster.findByLogicalNode(logicalPlan.getMeta(addOperator2));
        StramTestSupport.waitForActivation(stramLocalCluster, findByLogicalNode);
        testRecordingOnOperator(stramLocalCluster, findByLogicalNode, 2);
        PTOperator findByLogicalNode2 = stramLocalCluster.findByLogicalNode(logicalPlan.getMeta(addOperator));
        StramTestSupport.waitForActivation(stramLocalCluster, findByLogicalNode2);
        testRecordingOnOperator(stramLocalCluster, findByLogicalNode2, 1);
        stramLocalCluster.shutdown();
    }

    private void testRecordingOnOperator(final StramLocalCluster stramLocalCluster, final PTOperator pTOperator, int i) throws Exception {
        stramLocalCluster.getStreamingContainerManager().startRecording("xyz", pTOperator.getId(), (String) null, 0L);
        Assert.assertTrue("Should get a tuple recorder within 10 seconds", StramTestSupport.awaitCompletion(new StramTestSupport.WaitCondition() { // from class: com.datatorrent.stram.debug.TupleRecorderTest.1
            @Override // com.datatorrent.stram.support.StramTestSupport.WaitCondition
            public boolean isComplete() {
                return null != TupleRecorderTest.this.getTupleRecorder(stramLocalCluster, pTOperator);
            }
        }, 10000L));
        long startTime = getTupleRecorder(stramLocalCluster, pTOperator).getStartTime();
        File file = new File(testWorkDir, "recordings/" + pTOperator.getId() + "/xyz");
        File file2 = new File(file, "meta.txt");
        Assert.assertTrue("meta file should exist", file2.exists());
        BufferedReader bufferedReader = new BufferedReader(new FileReader(file2));
        Assert.assertEquals("version should be 1.2", "1.2", bufferedReader.readLine());
        Assert.assertEquals("Start time verification", startTime, new JSONObject(bufferedReader.readLine()).getLong("startTime"));
        for (int i2 = 0; i2 < i; i2++) {
            String readLine = bufferedReader.readLine();
            Assert.assertTrue("should contain name, streamName, type and id", readLine != null && readLine.contains("\"name\"") && readLine.contains("\"streamName\"") && readLine.contains("\"type\"") && readLine.contains("\"id\""));
        }
        Assert.assertTrue("Should record more than 10 tuples within 15 seconds", StramTestSupport.awaitCompletion(new StramTestSupport.WaitCondition() { // from class: com.datatorrent.stram.debug.TupleRecorderTest.2
            @Override // com.datatorrent.stram.support.StramTestSupport.WaitCondition
            public boolean isComplete() {
                return TupleRecorderTest.this.getTupleRecorder(stramLocalCluster, pTOperator).getTotalTupleCount() >= TupleRecorderTest.testTupleCount;
            }
        }, 15000L));
        stramLocalCluster.getStreamingContainerManager().stopRecording(pTOperator.getId(), (String) null);
        Assert.assertTrue("Tuple recorder shouldn't exist any more after stopping", StramTestSupport.awaitCompletion(new StramTestSupport.WaitCondition() { // from class: com.datatorrent.stram.debug.TupleRecorderTest.3
            @Override // com.datatorrent.stram.support.StramTestSupport.WaitCondition
            public boolean isComplete() {
                return TupleRecorderTest.this.getTupleRecorder(stramLocalCluster, pTOperator) == null;
            }
        }, 5000L));
        File file3 = new File(file, "index.txt");
        Assert.assertTrue("index file should exist", file3.exists());
        BufferedReader bufferedReader2 = new BufferedReader(new FileReader(file3));
        ArrayList arrayList = new ArrayList();
        int i3 = 0;
        while (true) {
            String readLine2 = bufferedReader2.readLine();
            if (readLine2 == null) {
                break;
            }
            String str = "part" + i3 + ".txt";
            if (readLine2.startsWith("F:" + str + ":")) {
                arrayList.add(str);
                i3++;
            } else {
                if (readLine2.startsWith("E")) {
                    Assert.assertEquals("index file should end after E line", bufferedReader2.readLine(), (Object) null);
                    break;
                }
                Assert.fail("index file line is not starting with F or E");
            }
        }
        int[] iArr = new int[i];
        boolean z = false;
        boolean z2 = false;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            String str2 = (String) it.next();
            File file4 = new File(file, str2);
            if (!str2.equals(arrayList.get(arrayList.size() - 1))) {
                Assert.assertTrue(str2 + " should be greater than 1KB", file4.length() >= 1024);
            }
            Assert.assertTrue(str2 + " should exist", file4.exists());
            BufferedReader bufferedReader3 = new BufferedReader(new FileReader(file4));
            while (true) {
                String readLine3 = bufferedReader3.readLine();
                if (readLine3 != null) {
                    if (readLine3.startsWith("B:")) {
                        z = true;
                    } else if (readLine3.startsWith("E:")) {
                        z2 = true;
                    } else if (readLine3.startsWith("T:")) {
                        int intValue = Integer.valueOf(readLine3.split(":")[2]).intValue();
                        iArr[intValue] = iArr[intValue] + 1;
                    }
                }
            }
        }
        Assert.assertTrue("begin window should exist", z);
        Assert.assertTrue("end window should exist", z2);
        int i4 = 0;
        for (int i5 = 0; i5 < i; i5++) {
            Assert.assertTrue("tuple exists for port " + i5, iArr[i5] > 0);
            i4 += iArr[i5];
        }
        Assert.assertTrue("total tuple count >= 10", i4 >= testTupleCount);
    }
}
