package com.datatorrent.stram;

import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.api.OperatorDeployInfo;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestOutputOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.stream.OiOEndWindowTest;
import com.datatorrent.stram.support.StramTestSupport;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/stram/OutputUnifiedTest.class */
public class OutputUnifiedTest {

    @Rule
    public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();

    @Test
    public void testManyToOnePartition() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.DAGContext.APPLICATION_PATH, this.testMeta.dir);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        OiOEndWindowTest.TestInputOperator testInputOperator = new OiOEndWindowTest.TestInputOperator();
        logicalPlan.addOperator("i1", testInputOperator);
        GenericTestOperator genericTestOperator = new GenericTestOperator();
        logicalPlan.addOperator("op1", genericTestOperator);
        logicalPlan.setAttribute(genericTestOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        TestOutputOperator testOutputOperator = new TestOutputOperator();
        logicalPlan.addOperator("op2", testOutputOperator);
        logicalPlan.addStream("s1", testInputOperator.output, genericTestOperator.inport1);
        logicalPlan.addStream("s2", genericTestOperator.outport1, testOutputOperator.inport);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(logicalPlan);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List<PTContainer> containers = physicalPlan.getContainers();
        Assert.assertEquals("Number of containers", 6L, containers.size());
        assignContainers(streamingContainerManager, containers);
        testOutputAttribute(logicalPlan, testInputOperator, streamingContainerManager, physicalPlan, false);
        testOutputAttribute(logicalPlan, genericTestOperator, streamingContainerManager, physicalPlan, true);
    }

    @Test
    public void testMxNPartition() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.DAGContext.APPLICATION_PATH, this.testMeta.dir);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        OiOEndWindowTest.TestInputOperator testInputOperator = new OiOEndWindowTest.TestInputOperator();
        logicalPlan.addOperator("i1", testInputOperator);
        GenericTestOperator genericTestOperator = new GenericTestOperator();
        logicalPlan.addOperator("op1", genericTestOperator);
        logicalPlan.setAttribute(genericTestOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(3));
        TestOutputOperator testOutputOperator = new TestOutputOperator();
        logicalPlan.addOperator("op2", testOutputOperator);
        logicalPlan.setAttribute(testOutputOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        logicalPlan.addStream("s1", testInputOperator.output, genericTestOperator.inport1);
        logicalPlan.addStream("s2", genericTestOperator.outport1, testOutputOperator.inport);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(logicalPlan);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List<PTContainer> containers = physicalPlan.getContainers();
        Assert.assertEquals("Number of containers", 6L, containers.size());
        assignContainers(streamingContainerManager, containers);
        testOutputAttribute(logicalPlan, testInputOperator, streamingContainerManager, physicalPlan, false);
        testOutputAttribute(logicalPlan, genericTestOperator, streamingContainerManager, physicalPlan, true);
    }

    @Test
    public void testParallelPartition() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.DAGContext.APPLICATION_PATH, this.testMeta.dir);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        OiOEndWindowTest.TestInputOperator testInputOperator = new OiOEndWindowTest.TestInputOperator();
        logicalPlan.addOperator("i1", testInputOperator);
        logicalPlan.setAttribute(testInputOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        GenericTestOperator genericTestOperator = new GenericTestOperator();
        logicalPlan.addOperator("op1", genericTestOperator);
        logicalPlan.setInputPortAttribute(genericTestOperator.inport1, Context.PortContext.PARTITION_PARALLEL, true);
        TestOutputOperator testOutputOperator = new TestOutputOperator();
        logicalPlan.addOperator("op2", testOutputOperator);
        logicalPlan.addStream("s1", testInputOperator.output, genericTestOperator.inport1);
        logicalPlan.addStream("s2", genericTestOperator.outport1, testOutputOperator.inport);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(logicalPlan);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List<PTContainer> containers = physicalPlan.getContainers();
        Assert.assertEquals("Number of containers", 6L, containers.size());
        assignContainers(streamingContainerManager, containers);
        testOutputAttribute(logicalPlan, testInputOperator, streamingContainerManager, physicalPlan, false);
        testOutputAttribute(logicalPlan, genericTestOperator, streamingContainerManager, physicalPlan, true);
    }

    private void testOutputAttribute(LogicalPlan logicalPlan, Operator operator, StreamingContainerManager streamingContainerManager, PhysicalPlan physicalPlan, boolean z) {
        Iterator it = physicalPlan.getOperators(logicalPlan.getMeta(operator)).iterator();
        while (it.hasNext()) {
            PTContainer container = ((PTOperator) it.next()).getContainer();
            StreamingContainerAgent containerAgent = streamingContainerManager.getContainerAgent("container" + container.getId());
            System.out.println("Opsize " + container.getOperators().size());
            List deployInfoList = containerAgent.getDeployInfoList(container.getOperators());
            Assert.assertEquals("Deploy info size", 1L, deployInfoList.size());
            Assert.assertEquals("Is output unified", ((OperatorDeployInfo.OutputDeployInfo) ((OperatorDeployInfo) deployInfoList.get(0)).outputs.get(0)).getAttributes().get(Context.PortContext.IS_OUTPUT_UNIFIED), Boolean.valueOf(z));
        }
    }

    private void assignContainers(StreamingContainerManager streamingContainerManager, List<PTContainer> list) {
        Iterator<PTContainer> it = list.iterator();
        while (it.hasNext()) {
            assignContainer(streamingContainerManager, "container" + it.next().getId());
        }
    }

    private static StreamingContainerAgent assignContainer(StreamingContainerManager streamingContainerManager, String str) {
        return streamingContainerManager.assignContainer(new StreamingContainerManager.ContainerResource(0, str, "localhost", 1024, 0, (String) null), InetSocketAddress.createUnresolved(str + "Host", 0));
    }
}
