package com.datatorrent.stram;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Stats;
import com.datatorrent.common.metric.AutoMetricBuiltInTransport;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.MockContainer;
import com.datatorrent.stram.PartitioningTest;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.api.AppDataSource;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.ContainerContext;
import com.datatorrent.stram.api.OperatorDeployInfo;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.appdata.AppDataPushAgent;
import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
import com.datatorrent.stram.engine.DefaultUnifier;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestAppDataQueryOperator;
import com.datatorrent.stram.engine.TestAppDataResultOperator;
import com.datatorrent.stram.engine.TestAppDataSourceOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.engine.WindowGenerator;
import com.datatorrent.stram.plan.TestPlanContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.OperatorStatus;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.plan.physical.PhysicalPlanTest;
import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.tuple.Tuple;
import com.datatorrent.stram.webapp.LogicalOperatorInfo;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputByteBuffer;
import org.codehaus.jettison.json.JSONObject;
import org.eclipse.jetty.websocket.WebSocket;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/StreamingContainerManagerTest.class */
public class StreamingContainerManagerTest {

    @Rule
    public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
    private LogicalPlan dag;
    private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerManagerTest.class);

    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManagerTest$HighLatencyTestOperator.class */
    public static class HighLatencyTestOperator extends GenericTestOperator {
        private long firstWindowMillis;
        private long windowWidthMillis;
        private long currentWindowId;
        private long latency;

        public void setup(Context.OperatorContext operatorContext) {
            this.firstWindowMillis = System.currentTimeMillis();
            this.windowWidthMillis = ((Integer) operatorContext.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)).intValue();
        }

        public void beginWindow(long j) {
            this.currentWindowId = j;
        }

        public void endWindow() {
            long currentTimeMillis = this.latency - (System.currentTimeMillis() - WindowGenerator.getWindowMillis(this.currentWindowId, this.firstWindowMillis, this.windowWidthMillis));
            if (currentTimeMillis > 0) {
                try {
                    Thread.sleep(currentTimeMillis);
                } catch (InterruptedException e) {
                }
            }
        }

        public void setLatency(long j) {
            this.latency = j;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManagerTest$TestMetricTransport.class */
    public static class TestMetricTransport implements AutoMetric.Transport, Serializable {
        private String prefix;
        private static List<String> messages = new ArrayList();

        public TestMetricTransport(String str) {
            this.prefix = str;
        }

        public void push(String str) throws IOException {
            messages.add(this.prefix + ":" + str);
        }

        public long getSchemaResendInterval() {
            return 0L;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManagerTest$TestStaticPartitioningSerDe.class */
    public static class TestStaticPartitioningSerDe extends DefaultStatefulStreamCodec<Object> {
        public static final int[] partitions = {0, 1, 2};

        public int getPartition(Object obj) {
            if (obj instanceof Tuple) {
                throw new UnsupportedOperationException("should not be called with control tuple");
            }
            return partitions[0];
        }
    }

    @Before
    public void setup() {
        this.dag = StramTestSupport.createDAG(this.testMeta);
    }

    @Test
    public void testDeployInfoSerialization() throws Exception {
        OperatorDeployInfo operatorDeployInfo = new OperatorDeployInfo();
        operatorDeployInfo.name = "node1";
        operatorDeployInfo.type = OperatorDeployInfo.OperatorType.GENERIC;
        operatorDeployInfo.id = 1;
        operatorDeployInfo.contextAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
        operatorDeployInfo.contextAttributes.put(Context.OperatorContext.SPIN_MILLIS, 100);
        OperatorDeployInfo.InputDeployInfo inputDeployInfo = new OperatorDeployInfo.InputDeployInfo();
        inputDeployInfo.declaredStreamId = "streamToNode";
        inputDeployInfo.portName = "inputPortNameOnNode";
        inputDeployInfo.sourceNodeId = 99;
        operatorDeployInfo.inputs = new ArrayList();
        operatorDeployInfo.inputs.add(inputDeployInfo);
        OperatorDeployInfo.OutputDeployInfo outputDeployInfo = new OperatorDeployInfo.OutputDeployInfo();
        outputDeployInfo.declaredStreamId = "streamFromNode";
        outputDeployInfo.portName = "outputPortNameOnNode";
        operatorDeployInfo.outputs = new ArrayList();
        operatorDeployInfo.outputs.add(outputDeployInfo);
        StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse containerHeartbeatResponse = new StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse();
        containerHeartbeatResponse.deployRequest = Collections.singletonList(operatorDeployInfo);
        DataOutputByteBuffer dataOutputByteBuffer = new DataOutputByteBuffer();
        containerHeartbeatResponse.write(dataOutputByteBuffer);
        DataInputByteBuffer dataInputByteBuffer = new DataInputByteBuffer();
        dataInputByteBuffer.reset(dataOutputByteBuffer.getData());
        StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse containerHeartbeatResponse2 = new StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse();
        containerHeartbeatResponse2.readFields(dataInputByteBuffer);
        Assert.assertNotNull(containerHeartbeatResponse2.deployRequest);
        Assert.assertEquals(1L, containerHeartbeatResponse2.deployRequest.size());
        OperatorDeployInfo operatorDeployInfo2 = (OperatorDeployInfo) containerHeartbeatResponse2.deployRequest.get(0);
        Assert.assertEquals("name", operatorDeployInfo.name, operatorDeployInfo2.name);
        Assert.assertEquals("type", operatorDeployInfo.type, operatorDeployInfo2.type);
        String operatorDeployInfo3 = operatorDeployInfo.toString();
        Assert.assertTrue(operatorDeployInfo3.contains(inputDeployInfo.portName));
        Assert.assertTrue(operatorDeployInfo3.contains(outputDeployInfo.portName));
        Assert.assertEquals("contextAttributes " + operatorDeployInfo2.contextAttributes, 100, operatorDeployInfo2.contextAttributes.get(Context.OperatorContext.SPIN_MILLIS));
    }

    @Test
    public void testGenerateDeployInfo() {
        TestGeneratorInputOperator addOperator = this.dag.addOperator("o1", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = this.dag.addOperator("o3", GenericTestOperator.class);
        GenericTestOperator addOperator4 = this.dag.addOperator("o4", GenericTestOperator.class);
        this.dag.setOutputPortAttribute(addOperator.outport, Context.PortContext.BUFFER_MEMORY_MB, 256);
        this.dag.addStream("o1.outport", addOperator.outport, addOperator2.inport1);
        this.dag.setOutputPortAttribute(addOperator.outport, Context.PortContext.SPIN_MILLIS, 99);
        this.dag.addStream("o2.outport1", addOperator2.outport1, addOperator3.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL);
        this.dag.addStream("o3.outport1", addOperator3.outport1, addOperator4.inport1).setLocality(DAG.Locality.THREAD_LOCAL);
        this.dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        Assert.assertEquals("number operators", 4L, this.dag.getAllOperators().size());
        Assert.assertEquals("number root operators", 1L, this.dag.getRootOperators().size());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        Assert.assertEquals("number containers", 2L, streamingContainerManager.getPhysicalPlan().getContainers().size());
        streamingContainerManager.assignContainer(new StreamingContainerManager.ContainerResource(0, "container1Id", "host1", 1024, 0, (String) null), InetSocketAddress.createUnresolved("host1", 9001));
        streamingContainerManager.assignContainer(new StreamingContainerManager.ContainerResource(0, "container2Id", "host2", 1024, 0, (String) null), InetSocketAddress.createUnresolved("host2", 9002));
        StreamingContainerAgent containerAgent = streamingContainerManager.getContainerAgent(((PTContainer) streamingContainerManager.getPhysicalPlan().getContainers().get(0)).getExternalId());
        StreamingContainerAgent containerAgent2 = streamingContainerManager.getContainerAgent(((PTContainer) streamingContainerManager.getPhysicalPlan().getContainers().get(1)).getExternalId());
        Assert.assertEquals("", streamingContainerManager.getPhysicalPlan().getContainers().get(0), containerAgent.container);
        Assert.assertEquals("", PTContainer.State.ALLOCATED, containerAgent.container.getState());
        List deployInfoList = containerAgent.getDeployInfoList(containerAgent.container.getOperators());
        Assert.assertEquals("number operators assigned to c1", 1L, deployInfoList.size());
        OperatorDeployInfo nodeDeployInfo = getNodeDeployInfo(deployInfoList, this.dag.getMeta(addOperator));
        Assert.assertNotNull(addOperator + " assigned to " + containerAgent.container.getExternalId(), nodeDeployInfo);
        Assert.assertEquals("type " + nodeDeployInfo, OperatorDeployInfo.OperatorType.INPUT, nodeDeployInfo.type);
        Assert.assertEquals("inputs " + nodeDeployInfo.name, 0L, nodeDeployInfo.inputs.size());
        Assert.assertEquals("outputs " + nodeDeployInfo.name, 1L, nodeDeployInfo.outputs.size());
        Assert.assertNotNull("contextAttributes " + nodeDeployInfo.name, nodeDeployInfo.contextAttributes);
        OperatorDeployInfo.OutputDeployInfo outputDeployInfo = (OperatorDeployInfo.OutputDeployInfo) nodeDeployInfo.outputs.get(0);
        Assert.assertNotNull("stream connection for container1", outputDeployInfo);
        Assert.assertEquals("stream connection for container1", "o1.outport", outputDeployInfo.declaredStreamId);
        Assert.assertEquals("stream connects to upstream host", containerAgent.container.host, outputDeployInfo.bufferServerHost);
        Assert.assertEquals("stream connects to upstream port", containerAgent.container.bufferServerAddress.getPort(), outputDeployInfo.bufferServerPort);
        Assert.assertNotNull("contextAttributes " + outputDeployInfo, outputDeployInfo.contextAttributes);
        Assert.assertEquals("contextAttributes " + outputDeployInfo, 99, outputDeployInfo.contextAttributes.get(Context.PortContext.SPIN_MILLIS));
        List deployInfoList2 = containerAgent2.getDeployInfoList(containerAgent2.container.getOperators());
        Assert.assertEquals("number operators assigned to container", 3L, deployInfoList2.size());
        OperatorDeployInfo nodeDeployInfo2 = getNodeDeployInfo(deployInfoList2, this.dag.getMeta(addOperator2));
        OperatorDeployInfo nodeDeployInfo3 = getNodeDeployInfo(deployInfoList2, this.dag.getMeta(addOperator3));
        Assert.assertNotNull(this.dag.getMeta(addOperator2) + " assigned to " + containerAgent2.container.getExternalId(), nodeDeployInfo2);
        Assert.assertNotNull(this.dag.getMeta(addOperator3) + " assigned to " + containerAgent2.container.getExternalId(), nodeDeployInfo3);
        Assert.assertTrue("The buffer server memory for container 1", 256 == ((Integer) containerAgent.getInitContext().getValue(ContainerContext.BUFFER_SERVER_MB)).intValue());
        Assert.assertTrue("The buffer server memory for container 2", 0 == ((Integer) containerAgent2.getInitContext().getValue(ContainerContext.BUFFER_SERVER_MB)).intValue());
        OperatorDeployInfo.InputDeployInfo inputDeployInfo = getInputDeployInfo(nodeDeployInfo2, "o1.outport");
        Assert.assertNotNull("stream connection for container2", inputDeployInfo);
        Assert.assertEquals("stream connects to upstream host", containerAgent.container.host, inputDeployInfo.bufferServerHost);
        Assert.assertEquals("stream connects to upstream port", containerAgent.container.bufferServerAddress.getPort(), inputDeployInfo.bufferServerPort);
        Assert.assertEquals("portName " + inputDeployInfo, this.dag.getMeta(addOperator2).getMeta(addOperator2.inport1).getPortName(), inputDeployInfo.portName);
        Assert.assertNull("partitionKeys " + inputDeployInfo, inputDeployInfo.partitionKeys);
        Assert.assertEquals("sourceNodeId " + inputDeployInfo, nodeDeployInfo.id, inputDeployInfo.sourceNodeId);
        Assert.assertEquals("sourcePortName " + inputDeployInfo, TestGeneratorInputOperator.OUTPUT_PORT, inputDeployInfo.sourcePortName);
        Assert.assertNotNull("contextAttributes " + inputDeployInfo, inputDeployInfo.contextAttributes);
        OperatorDeployInfo.InputDeployInfo inputDeployInfo2 = getInputDeployInfo(nodeDeployInfo3, "o2.outport1");
        Assert.assertNotNull("input from o2.outport1", inputDeployInfo2);
        Assert.assertEquals("portName " + inputDeployInfo2, GenericTestOperator.IPORT1, inputDeployInfo2.portName);
        Assert.assertNotNull("stream connection for container2", inputDeployInfo2);
        Assert.assertNull("bufferServerHost " + inputDeployInfo2, inputDeployInfo2.bufferServerHost);
        Assert.assertEquals("bufferServerPort " + inputDeployInfo2, 0L, inputDeployInfo2.bufferServerPort);
        Assert.assertNull("partitionKeys " + inputDeployInfo2, inputDeployInfo2.partitionKeys);
        Assert.assertEquals("sourceNodeId " + inputDeployInfo2, nodeDeployInfo2.id, inputDeployInfo2.sourceNodeId);
        Assert.assertEquals("sourcePortName " + inputDeployInfo2, GenericTestOperator.OPORT1, inputDeployInfo2.sourcePortName);
        Assert.assertEquals("locality " + inputDeployInfo2, DAG.Locality.CONTAINER_LOCAL, inputDeployInfo2.locality);
        OperatorDeployInfo nodeDeployInfo4 = getNodeDeployInfo(deployInfoList2, this.dag.getMeta(addOperator4));
        Assert.assertNotNull(this.dag.getMeta(addOperator4) + " assigned to " + containerAgent2.container.getExternalId(), nodeDeployInfo4);
        OperatorDeployInfo.InputDeployInfo inputDeployInfo3 = getInputDeployInfo(nodeDeployInfo4, "o3.outport1");
        Assert.assertNotNull("input from o3.outport1", inputDeployInfo3);
        Assert.assertEquals("portName " + inputDeployInfo3, GenericTestOperator.IPORT1, inputDeployInfo3.portName);
        Assert.assertNotNull("stream connection for container2", inputDeployInfo3);
        Assert.assertNull("bufferServerHost " + inputDeployInfo3, inputDeployInfo3.bufferServerHost);
        Assert.assertEquals("bufferServerPort " + inputDeployInfo3, 0L, inputDeployInfo3.bufferServerPort);
        Assert.assertNull("partitionKeys " + inputDeployInfo3, inputDeployInfo3.partitionKeys);
        Assert.assertEquals("sourceNodeId " + inputDeployInfo3, nodeDeployInfo3.id, inputDeployInfo3.sourceNodeId);
        Assert.assertEquals("sourcePortName " + inputDeployInfo3, GenericTestOperator.OPORT1, inputDeployInfo3.sourcePortName);
        Assert.assertEquals("locality " + inputDeployInfo3, DAG.Locality.THREAD_LOCAL, inputDeployInfo3.locality);
    }

    @Test
    public void testStaticPartitioning() {
        GenericTestOperator addOperator = this.dag.addOperator("node1", GenericTestOperator.class);
        PhysicalPlanTest.PartitioningTestOperator addOperator2 = this.dag.addOperator("node2", PhysicalPlanTest.PartitioningTestOperator.class);
        addOperator2.setPartitionCount(3);
        this.dag.setAttribute(addOperator2, Context.OperatorContext.SPIN_MILLIS, 10);
        this.dag.setOutputPortAttribute(addOperator2.outport1, Context.PortContext.QUEUE_CAPACITY, 1111);
        GenericTestOperator addOperator3 = this.dag.addOperator("node3", GenericTestOperator.class);
        this.dag.setInputPortAttribute(addOperator3.inport1, Context.PortContext.QUEUE_CAPACITY, 2222);
        LogicalPlan.StreamMeta addStream = this.dag.addStream("n1n2", addOperator.outport1, addOperator2.inport1);
        LogicalPlan.StreamMeta addStream2 = this.dag.addStream("n2n3", addOperator2.outport1, addOperator3.inport1);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
        StramTestSupport.MemoryStorageAgent memoryStorageAgent = new StramTestSupport.MemoryStorageAgent();
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, memoryStorageAgent);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        Assert.assertEquals("number containers", 6L, physicalPlan.getContainers().size());
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < physicalPlan.getContainers().size(); i++) {
            newArrayList.add(assignContainer(streamingContainerManager, "container" + (i + 1)));
        }
        StreamingContainerAgent containerAgent = streamingContainerManager.getContainerAgent(((PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator)).get(0)).getContainer().getExternalId());
        List<OperatorDeployInfo> deployInfo = getDeployInfo(containerAgent);
        Assert.assertEquals("number operators assigned to container", 1L, deployInfo.size());
        Assert.assertTrue(this.dag.getMeta(addOperator2) + " assigned to " + containerAgent.container.getExternalId(), containsNodeContext(deployInfo, this.dag.getMeta(addOperator)));
        List operators = physicalPlan.getOperators(this.dag.getMeta(addOperator2));
        Assert.assertEquals("number partitions", TestStaticPartitioningSerDe.partitions.length, operators.size());
        for (int i2 = 0; i2 < operators.size(); i2++) {
            String externalId = ((PTOperator) operators.get(i2)).getContainer().getExternalId();
            List<OperatorDeployInfo> deployInfo2 = getDeployInfo(streamingContainerManager.getContainerAgent(externalId));
            Assert.assertEquals("number operators assigned to container", 1L, deployInfo2.size());
            Assert.assertTrue(this.dag.getMeta(addOperator2) + " assigned to " + externalId, containsNodeContext(deployInfo2, this.dag.getMeta(addOperator2)));
            OperatorDeployInfo operatorDeployInfo = deployInfo2.get(0);
            Assert.assertEquals("type " + operatorDeployInfo, OperatorDeployInfo.OperatorType.GENERIC, operatorDeployInfo.type);
            Assert.assertEquals("inputs " + operatorDeployInfo, 1L, operatorDeployInfo.inputs.size());
            Assert.assertEquals("outputs " + operatorDeployInfo, 1L, operatorDeployInfo.outputs.size());
            OperatorDeployInfo.InputDeployInfo inputDeployInfo = (OperatorDeployInfo.InputDeployInfo) operatorDeployInfo.inputs.get(0);
            Assert.assertEquals("stream " + inputDeployInfo, addStream.getName(), inputDeployInfo.declaredStreamId);
            Assert.assertEquals("partition for " + externalId, Sets.newHashSet(new Integer[]{addOperator2.partitionKeys[i2]}), inputDeployInfo.partitionKeys);
            Assert.assertEquals("number stream codecs for " + inputDeployInfo, 1L, inputDeployInfo.streamCodecs.size());
        }
        List mergeOperators = physicalPlan.getMergeOperators(this.dag.getMeta(addOperator2));
        Assert.assertEquals("number unifiers", 1L, mergeOperators.size());
        List<OperatorDeployInfo> deployInfo3 = getDeployInfo(streamingContainerManager.getContainerAgent(((PTOperator) mergeOperators.get(0)).getContainer().getExternalId()));
        Assert.assertEquals("number operators " + deployInfo3, 1L, deployInfo3.size());
        OperatorDeployInfo nodeDeployInfo = getNodeDeployInfo(deployInfo3, this.dag.getMeta(addOperator2).getMeta(addOperator2.outport1).getUnifierMeta());
        Assert.assertNotNull("unifier for " + addOperator2, nodeDeployInfo);
        Assert.assertEquals("type " + nodeDeployInfo, OperatorDeployInfo.OperatorType.UNIFIER, nodeDeployInfo.type);
        Assert.assertEquals("inputs " + nodeDeployInfo, 3L, nodeDeployInfo.inputs.size());
        ArrayList newArrayList2 = Lists.newArrayList();
        for (OperatorDeployInfo.InputDeployInfo inputDeployInfo2 : nodeDeployInfo.inputs) {
            Assert.assertEquals("streamName " + inputDeployInfo2, addStream2.getName(), inputDeployInfo2.declaredStreamId);
            Assert.assertEquals("portName " + inputDeployInfo2, "<merge#" + this.dag.getMeta(addOperator2).getMeta(addOperator2.outport1).getPortName() + ">", inputDeployInfo2.portName);
            Assert.assertNotNull("sourceNodeId " + inputDeployInfo2, Integer.valueOf(inputDeployInfo2.sourceNodeId));
            Assert.assertNotNull("contextAttributes " + inputDeployInfo2, inputDeployInfo2.contextAttributes);
            Assert.assertEquals("contextAttributes ", new Integer(1111), inputDeployInfo2.getValue(Context.PortContext.QUEUE_CAPACITY));
            newArrayList2.add(Integer.valueOf(inputDeployInfo2.sourceNodeId));
        }
        for (PTOperator pTOperator : streamingContainerManager.getPhysicalPlan().getOperators(this.dag.getMeta(addOperator2))) {
            Assert.assertTrue(newArrayList2 + " contains " + pTOperator.getId(), newArrayList2.contains(Integer.valueOf(pTOperator.getId())));
        }
        Assert.assertEquals("outputs " + nodeDeployInfo, 1L, nodeDeployInfo.outputs.size());
        for (OperatorDeployInfo.OutputDeployInfo outputDeployInfo : nodeDeployInfo.outputs) {
            Assert.assertNotNull("contextAttributes " + outputDeployInfo, outputDeployInfo.contextAttributes);
            Assert.assertEquals("contextAttributes ", new Integer(2222), outputDeployInfo.getValue(Context.PortContext.QUEUE_CAPACITY));
        }
        try {
            Object load = memoryStorageAgent.load(nodeDeployInfo.id, -1L);
            Assert.assertTrue("" + load, load instanceof DefaultUnifier);
            List<OperatorDeployInfo> deployInfo4 = getDeployInfo(streamingContainerManager.getContainerAgent(((PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator3)).get(0)).getContainer().getExternalId()));
            Assert.assertEquals("number operators " + deployInfo4, 1L, deployInfo4.size());
            OperatorDeployInfo nodeDeployInfo2 = getNodeDeployInfo(deployInfo4, this.dag.getMeta(addOperator3));
            Assert.assertNotNull(this.dag.getMeta(addOperator3) + " assigned", nodeDeployInfo2);
            Assert.assertEquals("inputs " + nodeDeployInfo2, 1L, nodeDeployInfo2.inputs.size());
            OperatorDeployInfo.InputDeployInfo inputDeployInfo3 = (OperatorDeployInfo.InputDeployInfo) nodeDeployInfo2.inputs.get(0);
            Assert.assertEquals("streamName " + inputDeployInfo3, addStream2.getName(), inputDeployInfo3.declaredStreamId);
            Assert.assertEquals("portName " + inputDeployInfo3, this.dag.getMeta(addOperator3).getMeta(addOperator3.inport1).getPortName(), inputDeployInfo3.portName);
            Assert.assertNotNull("sourceNodeId " + nodeDeployInfo2, Integer.valueOf(inputDeployInfo3.sourceNodeId));
            Assert.assertEquals("sourcePortName " + nodeDeployInfo2, ((OperatorDeployInfo.OutputDeployInfo) nodeDeployInfo.outputs.get(0)).portName, inputDeployInfo3.sourcePortName);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testRecoveryOrder() throws Exception {
        GenericTestOperator addOperator = this.dag.addOperator("node1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("node2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = this.dag.addOperator("node3", GenericTestOperator.class);
        this.dag.addStream("n1n2", addOperator.outport1, addOperator2.inport1);
        this.dag.addStream("n2n3", addOperator2.outport1, addOperator3.inport1);
        this.dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        Assert.assertEquals("" + streamingContainerManager.containerStartRequests, 2L, streamingContainerManager.containerStartRequests.size());
        streamingContainerManager.containerStartRequests.clear();
        List containers = streamingContainerManager.getPhysicalPlan().getContainers();
        Assert.assertEquals("" + containers, 2L, r0.getContainers().size());
        PTContainer pTContainer = (PTContainer) containers.get(0);
        Assert.assertEquals("c1.operators " + pTContainer.getOperators(), 2L, pTContainer.getOperators().size());
        PTContainer pTContainer2 = (PTContainer) containers.get(1);
        Assert.assertEquals("c2.operators " + pTContainer2.getOperators(), 1L, pTContainer2.getOperators().size());
        assignContainer(streamingContainerManager, "container1");
        assignContainer(streamingContainerManager, "container2");
        StreamingContainerAgent containerAgent = streamingContainerManager.getContainerAgent(pTContainer.getExternalId());
        StreamingContainerAgent containerAgent2 = streamingContainerManager.getContainerAgent(pTContainer2.getExternalId());
        Assert.assertEquals("", 0L, countState(containerAgent.container, PTOperator.State.PENDING_UNDEPLOY));
        Assert.assertEquals("", 2L, countState(containerAgent.container, PTOperator.State.PENDING_DEPLOY));
        streamingContainerManager.scheduleContainerRestart(pTContainer.getExternalId());
        Assert.assertEquals("", 0L, countState(containerAgent.container, PTOperator.State.PENDING_UNDEPLOY));
        Assert.assertEquals("", 2L, countState(containerAgent.container, PTOperator.State.PENDING_DEPLOY));
        Assert.assertEquals("" + streamingContainerManager.containerStartRequests, 1L, streamingContainerManager.containerStartRequests.size());
        Assert.assertNotNull((StreamingContainerAgent.ContainerStartRequest) streamingContainerManager.containerStartRequests.peek());
        Assert.assertEquals("" + containerAgent2.container, 1L, countState(containerAgent2.container, PTOperator.State.PENDING_UNDEPLOY));
        Assert.assertEquals("" + containerAgent2.container, 0L, countState(containerAgent2.container, PTOperator.State.PENDING_DEPLOY));
    }

    @Test
    public void testRecoveryUpstreamInline() throws Exception {
        GenericTestOperator addOperator = this.dag.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = this.dag.addOperator("o3", GenericTestOperator.class);
        this.dag.addStream("o1o3", addOperator.outport1, addOperator3.inport1);
        this.dag.addStream("o2o3", addOperator2.outport1, addOperator3.inport2);
        this.dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        Assert.assertEquals(2L, physicalPlan.getContainers().size());
        physicalPlan.getOperators(this.dag.getMeta(addOperator)).get(0);
        Assert.assertEquals(2L, physicalPlan.getContainers().size());
        PTContainer pTContainer = (PTContainer) physicalPlan.getContainers().get(0);
        Assert.assertEquals(Sets.newHashSet(new PTOperator[]{(PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator)).get(0), (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator3)).get(0)}), Sets.newHashSet(pTContainer.getOperators()));
        PTContainer pTContainer2 = (PTContainer) physicalPlan.getContainers().get(1);
        assignContainer(streamingContainerManager, "c1");
        assignContainer(streamingContainerManager, "c2");
        for (PTOperator pTOperator : pTContainer.getOperators()) {
            Assert.assertEquals("state " + pTOperator, PTOperator.State.PENDING_DEPLOY, pTOperator.getState());
        }
        streamingContainerManager.scheduleContainerRestart(pTContainer2.getExternalId());
        for (PTOperator pTOperator2 : pTContainer.getOperators()) {
            Assert.assertEquals("state " + pTOperator2, PTOperator.State.PENDING_UNDEPLOY, pTOperator2.getState());
        }
    }

    @Test
    public void testCheckpointWindowIds() throws Exception {
        FSStorageAgent fSStorageAgent = new FSStorageAgent(this.testMeta.getPath(), (Configuration) null);
        long[] jArr = {123, 345, 234};
        for (long j : jArr) {
            fSStorageAgent.save(Long.valueOf(j), 1, j);
        }
        Arrays.sort(jArr);
        long[] windowIds = fSStorageAgent.getWindowIds(1);
        Arrays.sort(windowIds);
        Assert.assertArrayEquals("Saved windowIds", jArr, windowIds);
        for (long j2 : jArr) {
            fSStorageAgent.delete(1, j2);
        }
        try {
            fSStorageAgent.getWindowIds(1);
            Assert.fail("There should not be any most recently saved windowId!");
        } catch (IOException e) {
            Assert.assertTrue("No State Saved", true);
        }
    }

    @Test
    public void testAsyncCheckpointWindowIds() throws Exception {
        AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(this.testMeta.getPath(), (Configuration) null);
        long[] jArr = {123, 345, 234};
        for (long j : jArr) {
            asyncFSStorageAgent.save(Long.valueOf(j), 1, j);
            asyncFSStorageAgent.copyToHDFS(1, j);
        }
        Arrays.sort(jArr);
        long[] windowIds = asyncFSStorageAgent.getWindowIds(1);
        Arrays.sort(windowIds);
        Assert.assertArrayEquals("Saved windowIds", jArr, windowIds);
        for (long j2 : jArr) {
            asyncFSStorageAgent.delete(1, j2);
        }
        try {
            asyncFSStorageAgent.getWindowIds(1);
            Assert.fail("There should not be any most recently saved windowId!");
        } catch (IOException e) {
            Assert.assertTrue("No State Saved", true);
        }
    }

    @Test
    public void testProcessHeartbeat() throws Exception {
        TestGeneratorInputOperator addOperator = this.dag.addOperator("o1", TestGeneratorInputOperator.class);
        this.dag.setAttribute(addOperator, Context.OperatorContext.STATS_LISTENERS, Arrays.asList(new PartitioningTest.PartitionLoadWatch()));
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        Assert.assertEquals("number required containers", 1L, physicalPlan.getContainers().size());
        PTOperator pTOperator = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator)).get(0);
        Assert.assertNotNull(streamingContainerManager.assignContainer(new StreamingContainerManager.ContainerResource(0, "container1", "localhost", 512, 0, (String) null), InetSocketAddress.createUnresolved("localhost", 0)));
        Assert.assertEquals(PTContainer.State.ALLOCATED, pTOperator.getContainer().getState());
        Assert.assertEquals(PTOperator.State.PENDING_DEPLOY, pTOperator.getState());
        StreamingContainerUmbilicalProtocol.ContainerStats containerStats = new StreamingContainerUmbilicalProtocol.ContainerStats("container1");
        StreamingContainerUmbilicalProtocol.ContainerHeartbeat containerHeartbeat = new StreamingContainerUmbilicalProtocol.ContainerHeartbeat();
        containerHeartbeat.setContainerStats(containerStats);
        StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse processHeartbeat = streamingContainerManager.processHeartbeat(containerHeartbeat);
        Assert.assertNotNull(processHeartbeat.deployRequest);
        Assert.assertEquals("" + processHeartbeat.deployRequest, 1L, processHeartbeat.deployRequest.size());
        Assert.assertEquals(PTContainer.State.ACTIVE, pTOperator.getContainer().getState());
        Assert.assertEquals("state " + pTOperator, PTOperator.State.PENDING_DEPLOY, pTOperator.getState());
        StreamingContainerUmbilicalProtocol.OperatorHeartbeat operatorHeartbeat = new StreamingContainerUmbilicalProtocol.OperatorHeartbeat();
        operatorHeartbeat.setNodeId(pTOperator.getId());
        operatorHeartbeat.setState(StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.ACTIVE);
        Stats.OperatorStats operatorStats = new Stats.OperatorStats();
        operatorStats.checkpoint = new Checkpoint(2L, 0, 0);
        operatorStats.windowId = 3L;
        operatorStats.outputPorts = Lists.newArrayList();
        Stats.OperatorStats.PortStats portStats = new Stats.OperatorStats.PortStats(TestGeneratorInputOperator.OUTPUT_PORT);
        portStats.bufferServerBytes = 101L;
        portStats.tupleCount = 1;
        operatorStats.outputPorts.add(portStats);
        operatorHeartbeat.windowStats = Lists.newArrayList(new Stats.OperatorStats[]{operatorStats});
        containerStats.operators.add(operatorHeartbeat);
        streamingContainerManager.processHeartbeat(containerHeartbeat);
        Assert.assertEquals(PTContainer.State.ACTIVE, pTOperator.getContainer().getState());
        Assert.assertEquals("state " + pTOperator, PTOperator.State.ACTIVE, pTOperator.getState());
        Assert.assertEquals("tuples " + pTOperator, 1L, pTOperator.stats.totalTuplesEmitted.get());
        Assert.assertEquals("tuples " + pTOperator, 0L, pTOperator.stats.totalTuplesProcessed.get());
        Assert.assertEquals("window " + pTOperator, 3L, pTOperator.stats.currentWindowId.get());
        Assert.assertEquals("port stats", 1L, pTOperator.stats.outputPortStatusList.size());
        OperatorStatus.PortStatus portStatus = (OperatorStatus.PortStatus) pTOperator.stats.outputPortStatusList.get(TestGeneratorInputOperator.OUTPUT_PORT);
        Assert.assertNotNull("port stats", portStatus);
        Assert.assertEquals("port stats", 1L, portStatus.totalTuples);
        Stats.OperatorStats operatorStats2 = new Stats.OperatorStats();
        operatorStats2.checkpoint = new Checkpoint(2L, 0, 0);
        operatorStats2.windowId = 4L;
        operatorStats2.outputPorts = Lists.newArrayList();
        Stats.OperatorStats.PortStats portStats2 = new Stats.OperatorStats.PortStats(TestGeneratorInputOperator.OUTPUT_PORT);
        portStats2.bufferServerBytes = 1L;
        portStats2.tupleCount = 1;
        operatorStats2.outputPorts.add(portStats2);
        operatorHeartbeat.windowStats = Lists.newArrayList(new Stats.OperatorStats[]{operatorStats2});
        containerStats.operators.clear();
        containerStats.operators.add(operatorHeartbeat);
        streamingContainerManager.processHeartbeat(containerHeartbeat);
        Assert.assertEquals("tuples " + pTOperator, 2L, pTOperator.stats.totalTuplesEmitted.get());
        Assert.assertEquals("window " + pTOperator, 4L, pTOperator.stats.currentWindowId.get());
        Assert.assertEquals("statsQueue " + pTOperator, 2L, pTOperator.stats.listenerStats.size());
        streamingContainerManager.processEvents();
        Assert.assertEquals("statsQueue " + pTOperator, 0L, pTOperator.stats.listenerStats.size());
        Assert.assertEquals("lastStats " + pTOperator, 2L, pTOperator.stats.lastWindowedStats.size());
    }

    private int countState(PTContainer pTContainer, PTOperator.State state) {
        int i = 0;
        Iterator it = pTContainer.getOperators().iterator();
        while (it.hasNext()) {
            if (((PTOperator) it.next()).getState() == state) {
                i++;
            }
        }
        return i;
    }

    private boolean containsNodeContext(List<OperatorDeployInfo> list, LogicalPlan.OperatorMeta operatorMeta) {
        return getNodeDeployInfo(list, operatorMeta) != null;
    }

    public static List<OperatorDeployInfo> getDeployInfo(StreamingContainerAgent streamingContainerAgent) {
        return streamingContainerAgent.getDeployInfoList(streamingContainerAgent.container.getOperators());
    }

    private static OperatorDeployInfo getNodeDeployInfo(List<OperatorDeployInfo> list, LogicalPlan.OperatorMeta operatorMeta) {
        for (OperatorDeployInfo operatorDeployInfo : list) {
            if (operatorMeta.getName().equals(operatorDeployInfo.name)) {
                return operatorDeployInfo;
            }
        }
        return null;
    }

    private static OperatorDeployInfo.InputDeployInfo getInputDeployInfo(OperatorDeployInfo operatorDeployInfo, String str) {
        for (OperatorDeployInfo.InputDeployInfo inputDeployInfo : operatorDeployInfo.inputs) {
            if (str.equals(inputDeployInfo.declaredStreamId)) {
                return inputDeployInfo;
            }
        }
        return null;
    }

    public static StreamingContainerAgent assignContainer(StreamingContainerManager streamingContainerManager, String str) {
        return streamingContainerManager.assignContainer(new StreamingContainerManager.ContainerResource(0, str, "localhost", 1024, 0, (String) null), InetSocketAddress.createUnresolved(str + "Host", 0));
    }

    @Test
    public void testValidGenericOperatorDeployInfoType() {
        GenericTestOperator addOperator = this.dag.addOperator("o1", GenericTestOperator.class);
        TestGeneratorInputOperator.ValidGenericOperator addOperator2 = this.dag.addOperator("o2", TestGeneratorInputOperator.ValidGenericOperator.class);
        this.dag.addStream("stream1", addOperator.outport1, addOperator2.input);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        for (int i = 0; i < containers.size(); i++) {
            assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        OperatorDeployInfo operatorDeployInfo = getDeployInfo(streamingContainerManager.getContainerAgent(((PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator2)).get(0)).getContainer().getExternalId())).get(0);
        Assert.assertEquals("type " + operatorDeployInfo, OperatorDeployInfo.OperatorType.GENERIC, operatorDeployInfo.type);
    }

    @Test
    public void testValidInputOperatorDeployInfoType() {
        TestGeneratorInputOperator.ValidInputOperator addOperator = this.dag.addOperator("o1", TestGeneratorInputOperator.ValidInputOperator.class);
        this.dag.addStream("stream1", addOperator.outport, this.dag.addOperator("o2", GenericTestOperator.class).inport1);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        for (int i = 0; i < containers.size(); i++) {
            assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        OperatorDeployInfo operatorDeployInfo = getDeployInfo(streamingContainerManager.getContainerAgent(((PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator)).get(0)).getContainer().getExternalId())).get(0);
        Assert.assertEquals("type " + operatorDeployInfo, OperatorDeployInfo.OperatorType.INPUT, operatorDeployInfo.type);
    }

    @Test
    public void testOperatorShutdown() {
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = this.dag.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = this.dag.addOperator("o3", GenericTestOperator.class);
        this.dag.addStream("stream1", addOperator.outport1, addOperator2.inport1);
        this.dag.addStream("stream2", addOperator2.outport1, addOperator3.inport1);
        this.dag.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        HashMap hashMap = new HashMap();
        for (PTContainer pTContainer : physicalPlan.getContainers()) {
            hashMap.put(pTContainer, new MockContainer(streamingContainerManager, pTContainer));
        }
        Iterator it = hashMap.entrySet().iterator();
        while (it.hasNext()) {
            ((MockContainer) ((Map.Entry) it.next()).getValue()).deploy();
        }
        Iterator it2 = hashMap.entrySet().iterator();
        while (it2.hasNext()) {
            ((PTContainer) ((Map.Entry) it2.next()).getKey()).bufferServerAddress = null;
        }
        PTOperator pTOperator = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator)).get(0);
        MockContainer mockContainer = (MockContainer) hashMap.get(pTOperator.getContainer());
        MockContainer.MockOperatorStats stats = mockContainer.stats(pTOperator.getId());
        stats.currentWindowId(1L).checkpointWindowId(1L).deployState(StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.ACTIVE);
        mockContainer.sendHeartbeat();
        PTOperator pTOperator2 = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator2)).get(0);
        MockContainer mockContainer2 = (MockContainer) hashMap.get(pTOperator2.getContainer());
        MockContainer.MockOperatorStats stats2 = mockContainer2.stats(pTOperator2.getId());
        stats2.currentWindowId(1L).checkpointWindowId(1L).deployState(StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.ACTIVE);
        mockContainer2.sendHeartbeat();
        Assert.assertEquals("2 partitions", 2L, physicalPlan.getOperators(this.dag.getMeta(addOperator2)).size());
        PTOperator pTOperator3 = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator2)).get(1);
        MockContainer mockContainer3 = (MockContainer) hashMap.get(pTOperator3.getContainer());
        MockContainer.MockOperatorStats stats3 = mockContainer3.stats(pTOperator3.getId());
        stats3.currentWindowId(1L).checkpointWindowId(1L).deployState(StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.ACTIVE);
        mockContainer3.sendHeartbeat();
        PTOperator pTOperator4 = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator3)).get(0);
        MockContainer mockContainer4 = (MockContainer) hashMap.get(pTOperator4.getContainer());
        MockContainer.MockOperatorStats stats4 = mockContainer4.stats(pTOperator4.getId());
        stats4.currentWindowId(1L).checkpointWindowId(1L).deployState(StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.ACTIVE);
        mockContainer4.sendHeartbeat();
        PTOperator pTOperator5 = (PTOperator) physicalPlan.getMergeOperators(this.dag.getMeta(addOperator2)).get(0);
        MockContainer mockContainer5 = (MockContainer) hashMap.get(pTOperator5.getContainer());
        MockContainer.MockOperatorStats stats5 = mockContainer5.stats(pTOperator5.getId());
        stats5.currentWindowId(1L).checkpointWindowId(1L).deployState(StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.ACTIVE);
        mockContainer5.sendHeartbeat();
        stats.currentWindowId(2L).deployState(StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.SHUTDOWN);
        mockContainer.sendHeartbeat();
        streamingContainerManager.monitorHeartbeat();
        Assert.assertEquals("committedWindowId", -1L, streamingContainerManager.getCommittedWindowId());
        streamingContainerManager.monitorHeartbeat();
        Assert.assertEquals("committedWindowId", 1L, streamingContainerManager.getCommittedWindowId());
        streamingContainerManager.processEvents();
        Assert.assertEquals("containers at committedWindowId=1", 5L, physicalPlan.getContainers().size());
        stats.checkpointWindowId(2L);
        mockContainer.sendHeartbeat();
        streamingContainerManager.monitorHeartbeat();
        Assert.assertEquals("committedWindowId", 1L, streamingContainerManager.getCommittedWindowId());
        stats2.currentWindowId(2L).checkpointWindowId(2L);
        stats3.currentWindowId(2L).checkpointWindowId(2L);
        stats4.currentWindowId(2L).checkpointWindowId(2L);
        stats5.currentWindowId(2L).checkpointWindowId(2L);
        mockContainer2.sendHeartbeat();
        mockContainer3.sendHeartbeat();
        mockContainer4.sendHeartbeat();
        mockContainer5.sendHeartbeat();
        streamingContainerManager.monitorHeartbeat();
        Assert.assertEquals(0L, pTOperator.getContainer().getOperators().size());
        Assert.assertEquals(0L, pTOperator2.getContainer().getOperators().size());
        Assert.assertEquals(0L, physicalPlan.getContainers().size());
    }

    private void testDownStreamPartition(DAG.Locality locality) throws Exception {
        TestGeneratorInputOperator addOperator = this.dag.addOperator("o1", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("o2", GenericTestOperator.class);
        this.dag.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        this.dag.addStream("o1Output1", addOperator.outport, addOperator2.inport1).setLocality(locality);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 5);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        this.dag.validate();
        PhysicalPlan physicalPlan = new PhysicalPlan(this.dag, new TestPlanContext());
        Assert.assertEquals("number of containers", 1L, physicalPlan.getContainers().size());
        Assert.assertEquals("number operators " + ((PTContainer) physicalPlan.getContainers().get(0)), 3L, r0.getOperators().size());
        new StramLocalCluster(this.dag).run(5000L);
    }

    @Test
    public void testOIODownstreamPartition() throws Exception {
        testDownStreamPartition(DAG.Locality.THREAD_LOCAL);
    }

    @Test
    public void testContainerLocalDownstreamPartition() throws Exception {
        testDownStreamPartition(DAG.Locality.CONTAINER_LOCAL);
    }

    @Test
    public void testPhysicalPropertyUpdate() throws Exception {
        TestGeneratorInputOperator addOperator = this.dag.addOperator("o1", TestGeneratorInputOperator.class);
        this.dag.addStream("o1.outport", addOperator.outport, this.dag.addOperator("o2", GenericTestOperator.class).inport1);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(this.dag);
        stramLocalCluster.runAsync();
        StreamingContainerManager streamingContainerManager = stramLocalCluster.dnmgr;
        Iterator it = streamingContainerManager.getPhysicalPlan().getAllOperators().values().iterator();
        while (it.hasNext()) {
            StramTestSupport.waitForActivation(stramLocalCluster, (PTOperator) it.next());
        }
        streamingContainerManager.setPhysicalOperatorProperty(((PTOperator) stramLocalCluster.getPlanOperators(this.dag.getMeta(addOperator)).get(0)).getId(), "maxTuples", "2");
        Object obj = streamingContainerManager.getPhysicalOperatorProperty(((PTOperator) stramLocalCluster.getPlanOperators(this.dag.getMeta(addOperator)).get(0)).getId(), "maxTuples", 10000L).get(10000L, TimeUnit.MILLISECONDS);
        Assert.assertNotNull(obj);
        Assert.assertEquals(2, ((Map) obj).get("maxTuples"));
        stramLocalCluster.shutdown();
    }

    private void setupAppDataSourceLogicalPlan(Class<? extends TestAppDataQueryOperator> cls, Class<? extends TestAppDataSourceOperator> cls2, Class<? extends TestAppDataResultOperator> cls3) {
        TestGeneratorInputOperator addOperator = this.dag.addOperator("o1", TestGeneratorInputOperator.class);
        TestAppDataQueryOperator addOperator2 = this.dag.addOperator("q", cls);
        TestAppDataResultOperator addOperator3 = this.dag.addOperator("r", cls3);
        TestAppDataSourceOperator addOperator4 = this.dag.addOperator("ds", cls2);
        addOperator2.setAppDataUrl("ws://123.123.123.123:9090/pubsub");
        addOperator2.setTopic("xyz.query");
        addOperator3.setAppDataUrl("ws://123.123.123.124:9090/pubsub");
        addOperator3.setTopic("xyz.result");
        this.dag.addStream("o1-to-ds", addOperator.outport, addOperator4.inport1);
        this.dag.addStream("q-to-ds", addOperator2.outport, addOperator4.query);
        this.dag.addStream("ds-to-r", addOperator4.result, addOperator3.inport);
    }

    private void testAppDataSources(boolean z) throws Exception {
        StramLocalCluster stramLocalCluster = new StramLocalCluster(this.dag);
        stramLocalCluster.runAsync();
        List appDataSources = stramLocalCluster.dnmgr.getAppDataSources();
        Assert.assertEquals("There should be exactly one data source", 1L, appDataSources.size());
        AppDataSource appDataSource = (AppDataSource) appDataSources.get(0);
        Assert.assertEquals("Data Source name verification", "ds.result", appDataSource.getName());
        AppDataSource.QueryInfo query = appDataSource.getQuery();
        Assert.assertEquals("Query operator name verification", "q", query.operatorName);
        Assert.assertEquals("Query topic verification", "xyz.query", query.topic);
        Assert.assertEquals("Query URL verification", "ws://123.123.123.123:9090/pubsub", query.url);
        AppDataSource.ResultInfo result = appDataSource.getResult();
        Assert.assertEquals("Result operator name verification", "r", result.operatorName);
        Assert.assertEquals("Result topic verification", "xyz.result", result.topic);
        Assert.assertEquals("Result URL verification", "ws://123.123.123.124:9090/pubsub", result.url);
        Assert.assertEquals("Result QID append verification", Boolean.valueOf(z), Boolean.valueOf(result.appendQIDToTopic));
        stramLocalCluster.shutdown();
    }

    @Test
    public void testGetAppDataSources1() throws Exception {
        setupAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator1.class);
        testAppDataSources(true);
    }

    @Test
    public void testGetAppDataSources2() throws Exception {
        setupAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator2.class);
        testAppDataSources(false);
    }

    @Test
    public void testGetAppDataSources3() throws Exception {
        setupAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator3.class);
        testAppDataSources(false);
    }

    @Test
    public void testAppDataPush() throws Exception {
        if (StramTestSupport.isInTravis()) {
            LOG.info("Test testAppDataPush is disabled in Travis");
            return;
        }
        final ArrayList arrayList = new ArrayList();
        StramTestSupport.EmbeddedWebSocketServer embeddedWebSocketServer = new StramTestSupport.EmbeddedWebSocketServer(0);
        embeddedWebSocketServer.setWebSocket(new WebSocket.OnTextMessage() { // from class: com.datatorrent.stram.StreamingContainerManagerTest.1
            public void onMessage(String str) {
                arrayList.add(str);
            }

            public void onOpen(WebSocket.Connection connection) {
            }

            public void onClose(int i, String str) {
            }
        });
        try {
            embeddedWebSocketServer.start();
            int port = embeddedWebSocketServer.getPort();
            this.dag.addStream("o1.outport", this.dag.addOperator("o1", TestGeneratorInputOperator.class).outport, this.dag.addOperator("o2", GenericTestOperator.class).inport1);
            this.dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, new AutoMetricBuiltInTransport("xyz"));
            this.dag.setAttribute(LogicalPlan.GATEWAY_CONNECT_ADDRESS, "localhost:" + port);
            this.dag.setAttribute(LogicalPlan.PUBSUB_CONNECT_TIMEOUT_MILLIS, 2000);
            AppDataPushAgent appDataPushAgent = new AppDataPushAgent(new StramLocalCluster(this.dag).dnmgr, new StramTestSupport.TestAppContext(this.dag.getAttributes()));
            appDataPushAgent.init();
            appDataPushAgent.pushData();
            Thread.sleep(1000L);
            Assert.assertTrue(arrayList.size() > 0);
            appDataPushAgent.close();
            JSONObject jSONObject = new JSONObject((String) arrayList.get(0));
            Assert.assertEquals("xyz", jSONObject.getString("topic"));
            Assert.assertEquals("publish", jSONObject.getString("type"));
            JSONObject jSONObject2 = jSONObject.getJSONObject("data");
            Assert.assertTrue(StringUtils.isNotBlank(jSONObject2.getString("appId")));
            Assert.assertTrue(StringUtils.isNotBlank(jSONObject2.getString("appUser")));
            Assert.assertTrue(StringUtils.isNotBlank(jSONObject2.getString("appName")));
            JSONObject jSONObject3 = jSONObject2.getJSONObject("logicalOperators");
            for (String str : new String[]{"o1", "o2"}) {
                JSONObject jSONObject4 = jSONObject3.getJSONObject(str);
                Assert.assertTrue(jSONObject4.has("totalTuplesProcessed"));
                Assert.assertTrue(jSONObject4.has("totalTuplesEmitted"));
                Assert.assertTrue(jSONObject4.has("tuplesProcessedPSMA"));
                Assert.assertTrue(jSONObject4.has("tuplesEmittedPSMA"));
                Assert.assertTrue(jSONObject4.has("latencyMA"));
            }
        } finally {
            embeddedWebSocketServer.stop();
        }
    }

    @Test
    public void testCustomMetricsTransport() throws Exception {
        this.dag.addStream("o1.outport", this.dag.addOperator("o1", TestGeneratorInputOperator.class).outport, this.dag.addOperator("o2", GenericTestOperator.class).inport1);
        this.dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, new TestMetricTransport("xyz"));
        AppDataPushAgent appDataPushAgent = new AppDataPushAgent(new StramLocalCluster(this.dag).dnmgr, new StramTestSupport.TestAppContext(this.dag.getAttributes()));
        appDataPushAgent.init();
        appDataPushAgent.pushData();
        Assert.assertTrue(TestMetricTransport.messages.size() > 0);
        appDataPushAgent.close();
        Assert.assertTrue(((String) TestMetricTransport.messages.get(0)).startsWith("xyz:"));
    }

    @Test
    public void testLatency() throws Exception {
        TestGeneratorInputOperator addOperator = this.dag.addOperator("o1", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("o2", GenericTestOperator.class);
        HighLatencyTestOperator addOperator3 = this.dag.addOperator("o3", HighLatencyTestOperator.class);
        GenericTestOperator addOperator4 = this.dag.addOperator("o4", GenericTestOperator.class);
        addOperator3.setLatency(5000L);
        this.dag.addStream("o1.outport", addOperator.outport, addOperator2.inport1, addOperator3.inport1);
        this.dag.addStream("o2.outport1", addOperator2.outport1, addOperator4.inport1);
        this.dag.addStream("o3.outport1", addOperator3.outport1, addOperator4.inport2);
        this.dag.setAttribute(Context.DAGContext.STATS_MAX_ALLOWABLE_WINDOWS_LAG, 2);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(this.dag);
        StreamingContainerManager streamingContainerManager = stramLocalCluster.dnmgr;
        stramLocalCluster.runAsync();
        Thread.sleep(10000L);
        LogicalOperatorInfo logicalOperatorInfo = streamingContainerManager.getLogicalOperatorInfo("o1");
        LogicalOperatorInfo logicalOperatorInfo2 = streamingContainerManager.getLogicalOperatorInfo("o2");
        LogicalOperatorInfo logicalOperatorInfo3 = streamingContainerManager.getLogicalOperatorInfo("o3");
        LogicalOperatorInfo logicalOperatorInfo4 = streamingContainerManager.getLogicalOperatorInfo("o4");
        Assert.assertEquals("Input operator latency must be zero", 0L, logicalOperatorInfo.latencyMA);
        Assert.assertTrue("Latency must be greater than or equal to zero", logicalOperatorInfo2.latencyMA >= 0);
        Assert.assertTrue("Actual latency must be greater than the artificially introduced latency", logicalOperatorInfo3.latencyMA > 5000);
        Assert.assertTrue("Latency must be greater than or equal to zero", logicalOperatorInfo4.latencyMA >= 0);
        StreamingContainerManager.CriticalPathInfo criticalPathInfo = streamingContainerManager.getCriticalPathInfo();
        Assert.assertArrayEquals("Critical Path must be the path in the DAG that includes the HighLatencyTestOperator", new Integer[]{(Integer) logicalOperatorInfo.partitions.iterator().next(), (Integer) logicalOperatorInfo3.partitions.iterator().next(), (Integer) logicalOperatorInfo4.partitions.iterator().next()}, criticalPathInfo.path.toArray());
        Assert.assertTrue("Whole DAG latency must be greater than the artificially introduced latency", criticalPathInfo.latency > 5000);
        stramLocalCluster.shutdown();
    }
}
