package com.datatorrent.stram;

import com.datatorrent.api.Context;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.Node;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.engine.TestOutputOperator;
import com.datatorrent.stram.engine.WindowGenerator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.support.ManualScheduledExecutorService;
import com.datatorrent.stram.support.StramTestSupport;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.LineNumberReader;
import java.util.Arrays;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/StramLocalClusterTest.class */
public class StramLocalClusterTest {
    private static final Logger LOG = LoggerFactory.getLogger(StramLocalClusterTest.class);

    @Rule
    public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();

    @Before
    public void setup() throws IOException {
    }

    @After
    public void teardown() {
    }

    @Test
    public void testLocalClusterInitShutdown() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(LogicalPlan.APPLICATION_PATH, this.testMeta.dir);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(this.testMeta.dir, (Configuration) null));
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("genNode", TestGeneratorInputOperator.class);
        addOperator.setMaxTuples(2);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("node1", GenericTestOperator.class);
        addOperator2.setEmitFormat("%s >> node1");
        File file = new File("./target/" + StramLocalClusterTest.class.getName() + "-testLocalClusterInitShutdown.out");
        file.delete();
        TestOutputOperator addOperator3 = logicalPlan.addOperator("outNode", TestOutputOperator.class);
        addOperator3.pathSpec = file.toURI().toString();
        logicalPlan.addStream("fromGenNode", addOperator.outport, addOperator2.inport1);
        logicalPlan.addStream("fromNode1", addOperator2.outport1, addOperator3.inport);
        logicalPlan.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan);
        stramLocalCluster.setHeartbeatMonitoringEnabled(false);
        stramLocalCluster.run();
        Assert.assertTrue(file + " exists", file.exists());
        LineNumberReader lineNumberReader = new LineNumberReader(new FileReader(file));
        while (true) {
            String readLine = lineNumberReader.readLine();
            if (readLine == null) {
                Assert.assertEquals("number lines", 2L, lineNumberReader.getLineNumber());
                lineNumberReader.close();
                return;
            }
            Assert.assertTrue("line match " + readLine, readLine.matches("" + lineNumberReader.getLineNumber() + " >> node1"));
        }
    }

    @Test
    public void testRecovery() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(LogicalPlan.APPLICATION_PATH, this.testMeta.dir);
        AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(this.testMeta.dir, (Configuration) null);
        asyncFSStorageAgent.setSyncCheckpoint(true);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent);
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("o1", TestGeneratorInputOperator.class);
        addOperator.setMaxTuples(0);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("o2", GenericTestOperator.class);
        logicalPlan.addStream("o1o2", addOperator.outport, addOperator2.inport1);
        logicalPlan.validate();
        logicalPlan.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
        final ManualScheduledExecutorService manualScheduledExecutorService = new ManualScheduledExecutorService(1);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan, new StramLocalCluster.MockComponentFactory() { // from class: com.datatorrent.stram.StramLocalClusterTest.1
            public WindowGenerator setupWindowGenerator() {
                WindowGenerator windowGenerator = StramTestSupport.setupWindowGenerator(manualScheduledExecutorService);
                windowGenerator.setCheckpointCount(2, 0);
                return windowGenerator;
            }
        });
        stramLocalCluster.setPerContainerBufferServer(true);
        stramLocalCluster.setHeartbeatMonitoringEnabled(false);
        stramLocalCluster.runAsync();
        PTOperator findByLogicalNode = stramLocalCluster.findByLogicalNode(logicalPlan.getMeta(addOperator));
        PTOperator findByLogicalNode2 = stramLocalCluster.findByLogicalNode(logicalPlan.getMeta(addOperator2));
        StramLocalCluster.LocalStreamingContainer waitForActivation = StramTestSupport.waitForActivation(stramLocalCluster, findByLogicalNode);
        Map nodes = waitForActivation.getNodes();
        Assert.assertEquals("number operators", 1L, nodes.size());
        TestGeneratorInputOperator operator = ((Node) nodes.get(Integer.valueOf(findByLogicalNode.getId()))).getOperator();
        Assert.assertNotNull(operator);
        StramLocalCluster.LocalStreamingContainer waitForActivation2 = StramTestSupport.waitForActivation(stramLocalCluster, findByLogicalNode2);
        Map nodes2 = waitForActivation2.getNodes();
        Assert.assertEquals("number operators downstream", 1L, nodes2.size());
        GenericTestOperator operator2 = ((Node) nodes2.get(Integer.valueOf(stramLocalCluster.findByLogicalNode(logicalPlan.getMeta(addOperator2)).getId()))).getOperator();
        Assert.assertNotNull(operator2);
        operator.addTuple("tuple1");
        Assert.assertEquals("initial window id", -1L, waitForActivation.getNodeContext(findByLogicalNode.getId()).getLastProcessedWindowId());
        manualScheduledExecutorService.tick(1L);
        manualScheduledExecutorService.tick(1L);
        Assert.assertEquals("current window", 2L, manualScheduledExecutorService.getCurrentTimeMillis());
        OperatorContext nodeContext = waitForActivation2.getNodeContext(findByLogicalNode2.getId());
        Assert.assertNotNull("context ", nodeContext);
        StramTestSupport.waitForWindowComplete(nodeContext, 1L);
        Assert.assertEquals("o2 received ", "tuple1", operator2.inport1Tuple);
        manualScheduledExecutorService.tick(1L);
        Assert.assertEquals("current window", 3L, manualScheduledExecutorService.getCurrentTimeMillis());
        StramTestSupport.waitForWindowComplete(nodeContext, 2L);
        waitForActivation.triggerHeartbeat();
        waitForActivation.waitForHeartbeat(5000);
        Assert.assertEquals("checkpoint " + findByLogicalNode, 1L, findByLogicalNode.getRecentCheckpoint().windowId);
        waitForActivation2.triggerHeartbeat();
        Thread.sleep(1L);
        waitForActivation2.waitForHeartbeat(5000);
        Assert.assertEquals("checkpoint " + findByLogicalNode2, 1L, findByLogicalNode2.getRecentCheckpoint().windowId);
        Assert.assertEquals("checkpoints " + findByLogicalNode, Arrays.asList(new Checkpoint(1L, 0, 0)), findByLogicalNode.checkpoints);
        Assert.assertEquals("checkpoints " + findByLogicalNode2, Arrays.asList(new Checkpoint(1L, 0, 0)), findByLogicalNode2.checkpoints);
        stramLocalCluster.failContainer(waitForActivation);
        StramLocalCluster.LocalStreamingContainer waitForActivation3 = StramTestSupport.waitForActivation(stramLocalCluster, findByLogicalNode);
        waitForActivation3.triggerHeartbeat();
        waitForActivation3.waitForHeartbeat(5000);
        Assert.assertNotSame("old container", waitForActivation, waitForActivation3);
        Assert.assertNotSame("old container", waitForActivation.getContainerId(), waitForActivation3.getContainerId());
        LOG.debug("triggering c2 heartbeat processing");
        StreamingContainerAgent containerAgent = stramLocalCluster.getContainerAgent(waitForActivation2);
        long currentTimeMillis = System.currentTimeMillis();
        while (containerAgent.hasPendingWork() && StramTestSupport.DEFAULT_TIMEOUT_MILLIS > System.currentTimeMillis() - currentTimeMillis) {
            Thread.sleep(200L);
            waitForActivation2.triggerHeartbeat();
            LOG.debug("Waiting for {} to complete pending work.", waitForActivation2.getContainerId());
        }
        Assert.assertEquals(waitForActivation2.getContainerId() + " operators after redeploy " + waitForActivation2.getNodes(), 1L, waitForActivation2.getNodes().size());
        Assert.assertEquals("active " + findByLogicalNode2, waitForActivation2, StramTestSupport.waitForActivation(stramLocalCluster, findByLogicalNode2));
        GenericTestOperator operator3 = ((Node) nodes2.get(Integer.valueOf(stramLocalCluster.findByLogicalNode(logicalPlan.getMeta(addOperator2)).getId()))).getOperator();
        Assert.assertNotNull("redeployed " + findByLogicalNode2, operator3);
        Assert.assertNotSame("new instance " + findByLogicalNode2, operator2, operator3);
        Assert.assertEquals("restored state " + findByLogicalNode2, "tuple1", operator3.inport1Tuple);
        TestGeneratorInputOperator operator4 = ((Node) waitForActivation3.getNodes().get(Integer.valueOf(findByLogicalNode.getId()))).getOperator();
        Assert.assertNotNull(operator4);
        OperatorContext nodeContext2 = waitForActivation3.getNodeContext(findByLogicalNode.getId());
        Assert.assertNotNull("active " + findByLogicalNode, nodeContext2);
        manualScheduledExecutorService.tick(1L);
        Assert.assertEquals("current window", 4L, manualScheduledExecutorService.getCurrentTimeMillis());
        OperatorContext nodeContext3 = waitForActivation2.getNodeContext(findByLogicalNode2.getId());
        Assert.assertNotNull("active " + findByLogicalNode2, nodeContext3);
        StramTestSupport.waitForWindowComplete(nodeContext2, 3L);
        StramTestSupport.waitForWindowComplete(nodeContext3, 3L);
        manualScheduledExecutorService.tick(1L);
        Assert.assertEquals("current window", 5L, manualScheduledExecutorService.getCurrentTimeMillis());
        operator4.addTuple("tuple2");
        StramTestSupport.waitForWindowComplete(nodeContext2, 4L);
        StramTestSupport.waitForWindowComplete(nodeContext3, 4L);
        Assert.assertEquals("retrieved tuple (after recovery) " + findByLogicalNode2, "tuple2", operator3.inport1Tuple);
        waitForActivation3.triggerHeartbeat();
        waitForActivation3.waitForHeartbeat(5000);
        waitForActivation2.triggerHeartbeat();
        waitForActivation2.waitForHeartbeat(5000);
        stramLocalCluster.dnmgr.monitorHeartbeat();
        Assert.assertEquals("checkpoints " + findByLogicalNode, Arrays.asList(new Checkpoint(3L, 0, 0)), findByLogicalNode.checkpoints);
        Assert.assertEquals("checkpoints " + findByLogicalNode2, Arrays.asList(new Checkpoint(3L, 0, 0)), findByLogicalNode2.checkpoints);
        stramLocalCluster.shutdown();
    }
}
