package com.datatorrent.stram.plan;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.stram.PartitioningTest;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.StreamingContainerManagerTest;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.logical.DefaultKryoStreamCodec;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/plan/StreamPersistanceTests.class */
public class StreamPersistanceTests {
    static final Logger logger = LoggerFactory.getLogger(StreamPersistanceTests.class);

    @Rule
    public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();

    /* loaded from: input_file:com/datatorrent/stram/plan/StreamPersistanceTests$AscendingNumbersOperator.class */
    public static class AscendingNumbersOperator implements InputOperator {
        private Integer count = 0;
        public final transient DefaultOutputPort<Integer> outputPort = new DefaultOutputPort<>();

        public void emitTuples() {
            DefaultOutputPort<Integer> defaultOutputPort = this.outputPort;
            Integer num = this.count;
            this.count = Integer.valueOf(this.count.intValue() + 1);
            defaultOutputPort.emit(num);
        }

        public void beginWindow(long j) {
        }

        public void endWindow() {
        }

        public void setup(Context.OperatorContext operatorContext) {
        }

        public void teardown() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/StreamPersistanceTests$DivisibleByStreamCodec.class */
    public static class DivisibleByStreamCodec extends DefaultKryoStreamCodec {
        protected int number;

        public DivisibleByStreamCodec() {
            this.number = 1;
        }

        public DivisibleByStreamCodec(int i) {
            this.number = 1;
            this.number = i;
        }

        public int getPartition(Object obj) {
            return ((Integer) obj).intValue() % this.number == 0 ? 1 : 2;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/StreamPersistanceTests$PartitionedTestOperatorWithFiltering.class */
    public static class PartitionedTestOperatorWithFiltering extends BaseOperator implements Partitioner<PassThruOperatorWithCodec> {
        public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.StreamPersistanceTests.PartitionedTestOperatorWithFiltering.1
            public void process(Object obj) {
                PartitionedTestOperatorWithFiltering.this.output.emit(obj);
            }
        };
        public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();

        public Collection definePartitions(Collection collection, Partitioner.PartitioningContext partitioningContext) {
            ArrayList arrayList = new ArrayList();
            DefaultPartition defaultPartition = new DefaultPartition(new PassThruOperatorWithCodec());
            defaultPartition.getPartitionKeys().put(this.input, new Partitioner.PartitionKeys(3, Sets.newHashSet(new Integer[]{0})));
            arrayList.add(defaultPartition);
            DefaultPartition defaultPartition2 = new DefaultPartition(new PassThruOperatorWithCodec());
            defaultPartition2.getPartitionKeys().put(this.input, new Partitioner.PartitionKeys(3, Sets.newHashSet(new Integer[]{1})));
            arrayList.add(defaultPartition2);
            return arrayList;
        }

        public void partitioned(Map map) {
            System.out.println("Dynamic partitioning done....");
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/StreamPersistanceTests$PartitionedTestPersistanceOperator.class */
    public static class PartitionedTestPersistanceOperator extends TestPersistanceOperator implements Partitioner<PartitionedTestPersistanceOperator> {
        public Collection definePartitions(Collection collection, Partitioner.PartitioningContext partitioningContext) {
            ArrayList arrayList = new ArrayList();
            DefaultPartition defaultPartition = new DefaultPartition(new PartitionedTestPersistanceOperator());
            defaultPartition.getPartitionKeys().put(this.inport, new Partitioner.PartitionKeys(3, Sets.newHashSet(new Integer[]{0})));
            arrayList.add(defaultPartition);
            return arrayList;
        }

        public void partitioned(Map map) {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/StreamPersistanceTests$PassThruOperatorWithCodec.class */
    public static class PassThruOperatorWithCodec extends BaseOperator implements Partitioner<PassThruOperatorWithCodec> {
        private int divisibleBy;
        public final transient DefaultInputPort<Object> input;
        public final transient DefaultOutputPort<Object> output;

        public PassThruOperatorWithCodec() {
            this.divisibleBy = 1;
            this.input = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.StreamPersistanceTests.PassThruOperatorWithCodec.1
                public void process(Object obj) {
                    PassThruOperatorWithCodec.this.output.emit(obj);
                }

                public StreamCodec<Object> getStreamCodec() {
                    return new DivisibleByStreamCodec(PassThruOperatorWithCodec.this.divisibleBy);
                }
            };
            this.output = new DefaultOutputPort<>();
        }

        public PassThruOperatorWithCodec(int i) {
            this.divisibleBy = 1;
            this.input = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.StreamPersistanceTests.PassThruOperatorWithCodec.1
                public void process(Object obj) {
                    PassThruOperatorWithCodec.this.output.emit(obj);
                }

                public StreamCodec<Object> getStreamCodec() {
                    return new DivisibleByStreamCodec(PassThruOperatorWithCodec.this.divisibleBy);
                }
            };
            this.output = new DefaultOutputPort<>();
            this.divisibleBy = i;
        }

        public Collection definePartitions(Collection collection, Partitioner.PartitioningContext partitioningContext) {
            ArrayList arrayList = new ArrayList();
            int numberOfLeadingZeros = Integer.numberOfLeadingZeros(0) - Integer.numberOfLeadingZeros(1);
            if (numberOfLeadingZeros > 0) {
                int numberOfLeadingZeros2 = (-1) >>> (Integer.numberOfLeadingZeros(-1) - numberOfLeadingZeros);
            }
            if (collection.size() == 1) {
                DefaultPartition defaultPartition = new DefaultPartition(new PassThruOperatorWithCodec());
                defaultPartition.getPartitionKeys().put(this.input, new Partitioner.PartitionKeys(1, Sets.newHashSet(new Integer[]{1})));
                arrayList.add(defaultPartition);
            }
            return arrayList;
        }

        public void partitioned(Map map) {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/StreamPersistanceTests$TestOperatorWithMultipleNonOptionalInputPorts.class */
    public class TestOperatorWithMultipleNonOptionalInputPorts extends BaseOperator {

        @InputPortFieldAnnotation(optional = false)
        public final transient DefaultInputPort<Object> inputPort1 = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.StreamPersistanceTests.TestOperatorWithMultipleNonOptionalInputPorts.1
            public final void process(Object obj) {
            }
        };

        @InputPortFieldAnnotation(optional = false)
        public final transient DefaultInputPort<Object> inputPort2 = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.StreamPersistanceTests.TestOperatorWithMultipleNonOptionalInputPorts.2
            public final void process(Object obj) {
            }
        };
        public final transient DefaultInputPort<Object> inputPort3 = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.StreamPersistanceTests.TestOperatorWithMultipleNonOptionalInputPorts.3
            public final void process(Object obj) {
            }
        };

        public TestOperatorWithMultipleNonOptionalInputPorts() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/StreamPersistanceTests$TestOperatorWithOutputPorts.class */
    public class TestOperatorWithOutputPorts extends BaseOperator {

        @InputPortFieldAnnotation(optional = true)
        public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.StreamPersistanceTests.TestOperatorWithOutputPorts.1
            public final void process(Object obj) {
            }
        };

        @InputPortFieldAnnotation(optional = false)
        public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<>();

        public TestOperatorWithOutputPorts() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/StreamPersistanceTests$TestOperatorWithoutInputPorts.class */
    public class TestOperatorWithoutInputPorts extends BaseOperator {
        public TestOperatorWithoutInputPorts() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/StreamPersistanceTests$TestPartitionCodec.class */
    public static class TestPartitionCodec extends DefaultKryoStreamCodec {
        public int getPartition(Object obj) {
            return ((Integer) obj).intValue();
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/StreamPersistanceTests$TestPersistanceOperator.class */
    public static class TestPersistanceOperator implements Operator {
        public static volatile List<Object> results = new ArrayList();

        @InputPortFieldAnnotation(optional = true)
        public final transient Operator.InputPort<Object> inport = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.StreamPersistanceTests.TestPersistanceOperator.1
            public final void process(Object obj) {
                TestPersistanceOperator.results.add(obj);
            }
        };

        public void setup(Context.OperatorContext operatorContext) {
        }

        public void teardown() {
        }

        public void beginWindow(long j) {
        }

        public void endWindow() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/StreamPersistanceTests$TestRecieverOperator.class */
    public static class TestRecieverOperator extends BaseOperator {
        public static volatile List<Object> results = new ArrayList();
        public volatile AtomicInteger size = new AtomicInteger(0);

        @InputPortFieldAnnotation(optional = true)
        public final transient Operator.InputPort<Object> inport = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.StreamPersistanceTests.TestRecieverOperator.1
            public final void process(Object obj) {
                TestRecieverOperator.results.add(obj);
                TestRecieverOperator.this.size.incrementAndGet();
            }
        };
    }

    @Test
    public void testPersistStreamOperatorIsAdded() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("input1", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("x", new GenericTestOperator());
        TestRecieverOperator testRecieverOperator = new TestRecieverOperator();
        logicalPlan.addStream("Stream1", addOperator.outport, addOperator2.inport1).persistUsing("Stream1_persister", testRecieverOperator, testRecieverOperator.inport);
        Assert.assertEquals("Persist operator not added to dag ", testRecieverOperator, logicalPlan.getOperatorMeta("Stream1_persister").getOperator());
        logicalPlan.validate();
    }

    @Test
    public void testPersistStreamOperatorIsAddedPerSink() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("input1", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("x1", new GenericTestOperator());
        GenericTestOperator addOperator3 = logicalPlan.addOperator("x2", new GenericTestOperator());
        GenericTestOperator addOperator4 = logicalPlan.addOperator("x3", new GenericTestOperator());
        TestRecieverOperator testRecieverOperator = new TestRecieverOperator();
        TestRecieverOperator testRecieverOperator2 = new TestRecieverOperator();
        TestRecieverOperator testRecieverOperator3 = new TestRecieverOperator();
        LogicalPlan.StreamMeta addStream = logicalPlan.addStream("Stream1", addOperator.outport, new Operator.InputPort[]{addOperator2.inport1, addOperator3.inport1, addOperator4.inport1});
        addStream.persistUsing("Stream1_persister", testRecieverOperator, testRecieverOperator.inport);
        addStream.persistUsing("Stream1_x1_persister", testRecieverOperator2, testRecieverOperator2.inport, addOperator2.inport1);
        addStream.persistUsing("Stream1_x2_persister", testRecieverOperator3, testRecieverOperator3.inport, addOperator3.inport1);
        Assert.assertEquals("Persist operator not added to dag ", testRecieverOperator, logicalPlan.getOperatorMeta("Stream1_persister").getOperator());
        Assert.assertEquals("Persist operator not added to dag ", testRecieverOperator2, logicalPlan.getOperatorMeta("Stream1_x1_persister").getOperator());
        Assert.assertEquals("Persist operator not added to dag ", testRecieverOperator3, logicalPlan.getOperatorMeta("Stream1_x2_persister").getOperator());
        logicalPlan.validate();
    }

    @Test
    public void testaddStreamThrowsExceptionOnInvalidLoggerType() {
        LogicalPlan logicalPlan = new LogicalPlan();
        LogicalPlan.StreamMeta addStream = logicalPlan.addStream("Stream1", logicalPlan.addOperator("input1", TestGeneratorInputOperator.class).outport, logicalPlan.addOperator("x", new GenericTestOperator()).inport1);
        TestOperatorWithOutputPorts testOperatorWithOutputPorts = new TestOperatorWithOutputPorts();
        try {
            addStream.persistUsing("persister", testOperatorWithOutputPorts, testOperatorWithOutputPorts.inputPort);
            Assert.fail("should throw Illegal argument exception: Persist operator has non optional output ports");
        } catch (IllegalArgumentException e) {
            logger.debug(e.getMessage());
        }
        TestOperatorWithOutputPorts testOperatorWithOutputPorts2 = new TestOperatorWithOutputPorts();
        try {
            addStream.persistUsing("Stream1_persister", testOperatorWithOutputPorts2, testOperatorWithOutputPorts2.inputPort);
            Assert.fail("should throw exception that Stream1_persister object was already added");
        } catch (IllegalArgumentException e2) {
            logger.debug(e2.getMessage());
        }
        logicalPlan.removeOperator(logicalPlan.getOperatorMeta("Stream1_persister").getOperator());
        try {
            addStream.persistUsing("Stream1_persister", new TestOperatorWithoutInputPorts());
            Assert.fail("should throw Illegal argument exception: persist operator should have input ports");
        } catch (IllegalArgumentException e3) {
            logger.debug(e3.getMessage());
        }
        logicalPlan.removeOperator(logicalPlan.getOperatorMeta("Stream1_persister").getOperator());
        try {
            addStream.persistUsing("Stream1_persister", new TestOperatorWithMultipleNonOptionalInputPorts());
            Assert.fail("should throw Illegal argument exception: persist operator should have at most 1 non-optional input port");
        } catch (IllegalArgumentException e4) {
            logger.debug(e4.getMessage());
        }
    }

    @Test
    public void testaddStreamThrowsExceptionOnInvalidInputPortForLoggerType() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("input1", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("x", new GenericTestOperator());
        TestRecieverOperator testRecieverOperator = new TestRecieverOperator();
        TestRecieverOperator testRecieverOperator2 = new TestRecieverOperator();
        try {
            logicalPlan.addStream("Stream1", addOperator.outport, addOperator2.inport1).persistUsing("Stream1_persister", testRecieverOperator, testRecieverOperator2.inport);
            Assert.fail("should throw Illegal argument exception: Port passed does not belong to operator class");
        } catch (IllegalArgumentException e) {
        }
        logicalPlan.removeOperator(logicalPlan.getOperatorMeta("Stream1_persister").getOperator());
    }

    @Test
    public void testPersistStreamOperatorIsRemovedWhenStreamIsRemoved() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("input1", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("x", new GenericTestOperator());
        TestRecieverOperator testRecieverOperator = new TestRecieverOperator();
        LogicalPlan.StreamMeta addStream = logicalPlan.addStream("Stream1", addOperator.outport, addOperator2.inport1);
        addStream.persistUsing("Stream1_persister", testRecieverOperator, testRecieverOperator.inport);
        addStream.remove();
        Assert.assertEquals("Persist operator should be removed from dag after stream.remove", (Object) null, logicalPlan.getOperatorMeta("Stream1_persister"));
    }

    @Test
    public void testPersistStreamOperatorIsRemovedWhenSinkIsRemoved() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("input1", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("x1", new GenericTestOperator());
        GenericTestOperator addOperator3 = logicalPlan.addOperator("x2", new GenericTestOperator());
        GenericTestOperator addOperator4 = logicalPlan.addOperator("x3", new GenericTestOperator());
        TestRecieverOperator testRecieverOperator = new TestRecieverOperator();
        TestRecieverOperator testRecieverOperator2 = new TestRecieverOperator();
        TestRecieverOperator testRecieverOperator3 = new TestRecieverOperator();
        LogicalPlan.StreamMeta addStream = logicalPlan.addStream("Stream1", addOperator.outport, new Operator.InputPort[]{addOperator2.inport1, addOperator3.inport1, addOperator4.inport1});
        addStream.persistUsing("Stream1_persister", testRecieverOperator, testRecieverOperator.inport);
        addStream.persistUsing("Stream1_x1_persister", testRecieverOperator2, testRecieverOperator2.inport, addOperator2.inport1);
        addStream.persistUsing("Stream1_x2_persister", testRecieverOperator3, testRecieverOperator3.inport, addOperator3.inport1);
        Assert.assertEquals("Persist operator not added to dag ", testRecieverOperator, logicalPlan.getOperatorMeta("Stream1_persister").getOperator());
        Assert.assertEquals("Persist operator not added to dag ", testRecieverOperator2, logicalPlan.getOperatorMeta("Stream1_x1_persister").getOperator());
        Assert.assertEquals("Persist operator not added to dag ", testRecieverOperator3, logicalPlan.getOperatorMeta("Stream1_x2_persister").getOperator());
        logicalPlan.removeOperator(addOperator2);
        Assert.assertEquals("Persist operator should be removed from dag after sink is removed", (Object) null, logicalPlan.getOperatorMeta("Stream1_x1_persister"));
        Assert.assertEquals("Persist operator not added to dag ", testRecieverOperator, logicalPlan.getOperatorMeta("Stream1_persister").getOperator());
        Assert.assertEquals("Persist operator not added to dag ", testRecieverOperator3, logicalPlan.getOperatorMeta("Stream1_x2_persister").getOperator());
    }

    @Test
    public void testPersistStreamOperatorIsRemovedWhenAllSinksAreRemoved() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("input1", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("x1", new GenericTestOperator());
        GenericTestOperator addOperator3 = logicalPlan.addOperator("x2", new GenericTestOperator());
        GenericTestOperator addOperator4 = logicalPlan.addOperator("x3", new GenericTestOperator());
        TestRecieverOperator testRecieverOperator = new TestRecieverOperator();
        logicalPlan.addStream("Stream1", addOperator.outport, new Operator.InputPort[]{addOperator2.inport1, addOperator3.inport1, addOperator4.inport1}).persistUsing("Stream1_persister", testRecieverOperator, testRecieverOperator.inport);
        Assert.assertNotNull("Stream persister operator should be present", logicalPlan.getOperatorMeta("Stream1_persister"));
        logicalPlan.removeOperator(addOperator2);
        logicalPlan.removeOperator(addOperator3);
        logicalPlan.removeOperator(addOperator4);
        Assert.assertNull("Persister operator should have been removed after all sinks are removed", logicalPlan.getOperatorMeta("Stream1_persister"));
    }

    @Test
    public void testPersistStreamOperatorGeneratesIdenticalOutputAsSink() throws ClassNotFoundException, IOException, InterruptedException {
        LogicalPlan logicalPlan = new LogicalPlan();
        AscendingNumbersOperator addOperator = logicalPlan.addOperator("input1", AscendingNumbersOperator.class);
        TestRecieverOperator testRecieverOperator = (TestRecieverOperator) logicalPlan.addOperator("x", new TestRecieverOperator());
        LogicalPlan.StreamMeta addStream = logicalPlan.addStream("Stream1", addOperator.outputPort, testRecieverOperator.inport);
        TestPersistanceOperator testPersistanceOperator = new TestPersistanceOperator();
        addStream.persistUsing("Stream1_persister", testPersistanceOperator, testPersistanceOperator.inport);
        runLocalClusterAndValidate(logicalPlan, testRecieverOperator, testPersistanceOperator);
    }

    /* JADX WARN: Type inference failed for: r0v10, types: [com.datatorrent.stram.plan.StreamPersistanceTests$1] */
    private void runLocalClusterAndValidate(LogicalPlan logicalPlan, final TestRecieverOperator testRecieverOperator, TestPersistanceOperator testPersistanceOperator) throws IOException, ClassNotFoundException {
        try {
            TestRecieverOperator.results.clear();
            TestPersistanceOperator.results.clear();
            final StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan);
            new Thread("LocalClusterController") { // from class: com.datatorrent.stram.plan.StreamPersistanceTests.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    long currentTimeMillis = System.currentTimeMillis();
                    while (System.currentTimeMillis() - currentTimeMillis < 100000) {
                        try {
                            try {
                                TestRecieverOperator testRecieverOperator2 = testRecieverOperator;
                                if (TestRecieverOperator.results.size() >= 1000) {
                                    break;
                                } else {
                                    Thread.sleep(10L);
                                }
                            } catch (Exception e) {
                                DTThrowable.rethrow(e);
                                stramLocalCluster.shutdown();
                                return;
                            }
                        } finally {
                            stramLocalCluster.shutdown();
                        }
                    }
                }
            }.start();
            stramLocalCluster.run();
            int size = TestRecieverOperator.results.size() > TestPersistanceOperator.results.size() ? TestPersistanceOperator.results.size() : TestRecieverOperator.results.size();
            for (int i = 0; i < size; i++) {
                logger.debug("Tuple = " + TestRecieverOperator.results.get(i) + " - " + TestPersistanceOperator.results.get(i));
                Assert.assertEquals("Mismatch observed for tuple ", TestRecieverOperator.results.get(i), TestPersistanceOperator.results.get(i));
            }
        } finally {
            TestRecieverOperator.results.clear();
            TestPersistanceOperator.results.clear();
        }
    }

    @Test
    public void testPersistStreamWithFiltering() throws ClassNotFoundException, IOException, InterruptedException {
        LogicalPlan logicalPlan = new LogicalPlan();
        AscendingNumbersOperator addOperator = logicalPlan.addOperator("ascend", new AscendingNumbersOperator());
        PassThruOperatorWithCodec addOperator2 = logicalPlan.addOperator("PassThrough", new PassThruOperatorWithCodec(2));
        TestRecieverOperator testRecieverOperator = (TestRecieverOperator) logicalPlan.addOperator("console", new TestRecieverOperator());
        TestPersistanceOperator testPersistanceOperator = new TestPersistanceOperator();
        logicalPlan.addStream("Stream1", addOperator.outputPort, addOperator2.input).persistUsing("Stream1_persister", testPersistanceOperator, testPersistanceOperator.inport);
        logicalPlan.addStream("Stream2", addOperator2.output, testRecieverOperator.inport);
        runLocalClusterAndValidate(logicalPlan, testRecieverOperator, testPersistanceOperator);
    }

    @Test
    public void testPersistStreamOnSingleSinkWithFiltering() throws ClassNotFoundException, IOException, InterruptedException {
        LogicalPlan logicalPlan = new LogicalPlan();
        AscendingNumbersOperator addOperator = logicalPlan.addOperator("ascend", new AscendingNumbersOperator());
        PassThruOperatorWithCodec addOperator2 = logicalPlan.addOperator("PassThrough", new PassThruOperatorWithCodec(2));
        TestRecieverOperator testRecieverOperator = (TestRecieverOperator) logicalPlan.addOperator("console", new TestRecieverOperator());
        TestPersistanceOperator testPersistanceOperator = new TestPersistanceOperator();
        logicalPlan.addStream("Stream1", addOperator.outputPort, addOperator2.input).persistUsing("Stream1_persister", testPersistanceOperator, testPersistanceOperator.inport, addOperator2.input);
        logicalPlan.addStream("Stream2", addOperator2.output, testRecieverOperator.inport);
        runLocalClusterAndValidate(logicalPlan, testRecieverOperator, testPersistanceOperator);
    }

    /* JADX WARN: Type inference failed for: r0v33, types: [com.datatorrent.stram.plan.StreamPersistanceTests$2] */
    @Test
    public void testPersistStreamOnSingleSinkWithFilteringContainerLocal() throws ClassNotFoundException, IOException, InterruptedException {
        LogicalPlan logicalPlan = new LogicalPlan();
        AscendingNumbersOperator addOperator = logicalPlan.addOperator("ascend", new AscendingNumbersOperator());
        PassThruOperatorWithCodec addOperator2 = logicalPlan.addOperator("PassThrough", new PassThruOperatorWithCodec(2));
        PassThruOperatorWithCodec addOperator3 = logicalPlan.addOperator("Multiples_of_3", new PassThruOperatorWithCodec(3));
        final TestRecieverOperator addOperator4 = logicalPlan.addOperator("console", new TestRecieverOperator());
        TestRecieverOperator addOperator5 = logicalPlan.addOperator("console1", new TestRecieverOperator());
        TestPersistanceOperator testPersistanceOperator = new TestPersistanceOperator();
        logicalPlan.addStream("Stream1", addOperator.outputPort, addOperator2.input, addOperator3.input).setLocality(DAG.Locality.CONTAINER_LOCAL).persistUsing("persister", testPersistanceOperator, testPersistanceOperator.inport);
        logicalPlan.addStream("Stream2", addOperator2.output, addOperator4.inport);
        logicalPlan.addStream("Stream3", addOperator3.output, addOperator5.inport);
        TestPersistanceOperator.results.clear();
        TestRecieverOperator.results.clear();
        TestRecieverOperator.results.clear();
        final StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan);
        new Thread("LocalClusterController") { // from class: com.datatorrent.stram.plan.StreamPersistanceTests.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                long currentTimeMillis = System.currentTimeMillis();
                while (System.currentTimeMillis() - currentTimeMillis < 1000000) {
                    try {
                        try {
                            TestRecieverOperator testRecieverOperator = addOperator4;
                            if (TestRecieverOperator.results.size() >= 6) {
                                TestRecieverOperator testRecieverOperator2 = addOperator4;
                                if (TestRecieverOperator.results.size() >= 6) {
                                    break;
                                }
                            }
                            Thread.sleep(10L);
                        } catch (Exception e) {
                            DTThrowable.rethrow(e);
                            stramLocalCluster.shutdown();
                            return;
                        }
                    } finally {
                        stramLocalCluster.shutdown();
                    }
                }
            }
        }.start();
        stramLocalCluster.run();
        try {
            Integer[] numArr = {0, 2, 3, 4, 6, 8, 9, 10, 12};
            for (int i = 0; i < numArr.length; i++) {
                logger.debug(TestPersistanceOperator.results.get(i) + " " + numArr[i]);
                Assert.assertEquals("Mismatch observed for tuple ", numArr[i], TestPersistanceOperator.results.get(i));
            }
        } finally {
            TestPersistanceOperator.results.clear();
            TestRecieverOperator.results.clear();
            TestRecieverOperator.results.clear();
        }
    }

    /* JADX WARN: Type inference failed for: r0v32, types: [com.datatorrent.stram.plan.StreamPersistanceTests$3] */
    @Test
    public void testPersistStreamOperatorGeneratesUnionOfAllSinksOutput() throws ClassNotFoundException, IOException {
        LogicalPlan logicalPlan = new LogicalPlan();
        AscendingNumbersOperator addOperator = logicalPlan.addOperator("ascend", new AscendingNumbersOperator());
        PassThruOperatorWithCodec addOperator2 = logicalPlan.addOperator("PassThrough1", new PassThruOperatorWithCodec(2));
        PassThruOperatorWithCodec addOperator3 = logicalPlan.addOperator("PassThrough2", new PassThruOperatorWithCodec(3));
        final TestRecieverOperator addOperator4 = logicalPlan.addOperator("console", new TestRecieverOperator());
        TestRecieverOperator addOperator5 = logicalPlan.addOperator("console1", new TestRecieverOperator());
        TestPersistanceOperator testPersistanceOperator = new TestPersistanceOperator();
        logicalPlan.addStream("Stream1", addOperator.outputPort, addOperator2.input, addOperator3.input).persistUsing("persister", testPersistanceOperator, testPersistanceOperator.inport);
        logicalPlan.addStream("Stream2", addOperator2.output, addOperator4.inport);
        logicalPlan.addStream("Stream3", addOperator3.output, addOperator5.inport);
        TestPersistanceOperator.results.clear();
        TestRecieverOperator.results.clear();
        TestRecieverOperator.results.clear();
        final StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan);
        new Thread("LocalClusterController") { // from class: com.datatorrent.stram.plan.StreamPersistanceTests.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                long currentTimeMillis = System.currentTimeMillis();
                while (System.currentTimeMillis() - currentTimeMillis < 10000) {
                    try {
                        try {
                            TestRecieverOperator testRecieverOperator = addOperator4;
                            if (TestRecieverOperator.results.size() >= 6) {
                                TestRecieverOperator testRecieverOperator2 = addOperator4;
                                if (TestRecieverOperator.results.size() >= 6) {
                                    break;
                                }
                            }
                            Thread.sleep(10L);
                        } catch (Exception e) {
                            DTThrowable.rethrow(e);
                            stramLocalCluster.shutdown();
                            return;
                        }
                    } finally {
                        stramLocalCluster.shutdown();
                    }
                }
            }
        }.start();
        stramLocalCluster.run();
        try {
            Integer[] numArr = {0, 2, 3, 4, 6, 8, 9, 10, 12};
            for (int i = 0; i < numArr.length; i++) {
                logger.debug(TestPersistanceOperator.results.get(i) + " " + numArr[i]);
                Assert.assertEquals("Mismatch observed for tuple ", numArr[i], TestPersistanceOperator.results.get(i));
            }
        } finally {
            TestPersistanceOperator.results.clear();
            TestRecieverOperator.results.clear();
            TestRecieverOperator.results.clear();
        }
    }

    /* JADX WARN: Type inference failed for: r0v19, types: [com.datatorrent.stram.plan.StreamPersistanceTests$4] */
    @Test
    public void testPersistStreamOperatorMultiplePhysicalOperatorsForSink() throws ClassNotFoundException, IOException {
        LogicalPlan logicalPlan = new LogicalPlan();
        AscendingNumbersOperator addOperator = logicalPlan.addOperator("ascend", new AscendingNumbersOperator());
        PartitionedTestOperatorWithFiltering addOperator2 = logicalPlan.addOperator("partition", new PartitionedTestOperatorWithFiltering());
        final TestRecieverOperator addOperator3 = logicalPlan.addOperator("console", new TestRecieverOperator());
        final TestPersistanceOperator testPersistanceOperator = new TestPersistanceOperator();
        LogicalPlan.StreamMeta addStream = logicalPlan.addStream("Stream1", addOperator.outputPort, addOperator2.input);
        logicalPlan.setInputPortAttribute(addOperator2.input, Context.PortContext.STREAM_CODEC, new TestPartitionCodec());
        addStream.persistUsing("persister", testPersistanceOperator, testPersistanceOperator.inport);
        logicalPlan.addStream("Stream2", addOperator2.output, addOperator3.inport);
        final StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan);
        new Thread("LocalClusterController") { // from class: com.datatorrent.stram.plan.StreamPersistanceTests.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                long currentTimeMillis = System.currentTimeMillis();
                while (System.currentTimeMillis() - currentTimeMillis < 100000) {
                    try {
                        try {
                            TestRecieverOperator testRecieverOperator = addOperator3;
                            if (TestRecieverOperator.results.size() >= 6) {
                                TestPersistanceOperator testPersistanceOperator2 = testPersistanceOperator;
                                if (TestPersistanceOperator.results.size() >= 6) {
                                    break;
                                }
                            }
                            Thread.sleep(10L);
                        } catch (Exception e) {
                            DTThrowable.rethrow(e);
                            stramLocalCluster.shutdown();
                            return;
                        }
                    } finally {
                        stramLocalCluster.shutdown();
                    }
                }
            }
        }.start();
        stramLocalCluster.run();
        try {
            Integer[] numArr = {0, 1, 4, 5, 8, 9, 12, 13, 16};
            for (int i = 0; i < numArr.length; i++) {
                logger.debug(TestPersistanceOperator.results.get(i) + " " + numArr[i]);
                Assert.assertEquals("Mismatch observed for tuple ", numArr[i], TestPersistanceOperator.results.get(i));
            }
        } finally {
            TestPersistanceOperator.results.clear();
            TestRecieverOperator.results.clear();
        }
    }

    /* JADX WARN: Type inference failed for: r0v20, types: [com.datatorrent.stram.plan.StreamPersistanceTests$5] */
    @Test
    public void testPartitionedPersistOperator() throws ClassNotFoundException, IOException {
        LogicalPlan logicalPlan = new LogicalPlan();
        AscendingNumbersOperator addOperator = logicalPlan.addOperator("ascend", new AscendingNumbersOperator());
        PartitionedTestOperatorWithFiltering addOperator2 = logicalPlan.addOperator("partition", new PartitionedTestOperatorWithFiltering());
        TestRecieverOperator addOperator3 = logicalPlan.addOperator("console", new TestRecieverOperator());
        final PartitionedTestPersistanceOperator partitionedTestPersistanceOperator = new PartitionedTestPersistanceOperator();
        LogicalPlan.StreamMeta addStream = logicalPlan.addStream("Stream1", addOperator.outputPort, addOperator2.input);
        logicalPlan.setInputPortAttribute(addOperator2.input, Context.PortContext.STREAM_CODEC, new TestPartitionCodec());
        addStream.persistUsing("persister", partitionedTestPersistanceOperator, partitionedTestPersistanceOperator.inport);
        logicalPlan.setInputPortAttribute(partitionedTestPersistanceOperator.inport, Context.PortContext.STREAM_CODEC, new TestPartitionCodec());
        logicalPlan.addStream("Stream2", addOperator2.output, addOperator3.inport);
        final StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan);
        new Thread("LocalClusterController") { // from class: com.datatorrent.stram.plan.StreamPersistanceTests.5
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                long currentTimeMillis = System.currentTimeMillis();
                while (System.currentTimeMillis() - currentTimeMillis < 100000) {
                    try {
                        try {
                            PartitionedTestPersistanceOperator partitionedTestPersistanceOperator2 = partitionedTestPersistanceOperator;
                            if (PartitionedTestPersistanceOperator.results.size() >= 6) {
                                break;
                            } else {
                                Thread.sleep(10L);
                            }
                        } catch (Exception e) {
                            DTThrowable.rethrow(e);
                            stramLocalCluster.shutdown();
                            return;
                        }
                    } finally {
                        stramLocalCluster.shutdown();
                    }
                }
            }
        }.start();
        stramLocalCluster.run();
        try {
            Integer[] numArr = {0, 4, 8, 12, 16, 20};
            for (int i = 0; i < numArr.length; i++) {
                logger.debug(PartitionedTestPersistanceOperator.results.get(i) + " " + numArr[i]);
                Assert.assertEquals("Mismatch observed for tuple ", numArr[i], PartitionedTestPersistanceOperator.results.get(i));
            }
        } finally {
            PartitionedTestPersistanceOperator.results.clear();
            TestRecieverOperator.results.clear();
        }
    }

    @Test
    public void testDynamicPartitioning() throws ClassNotFoundException, IOException {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.DAGContext.APPLICATION_PATH, this.testMeta.dir);
        AscendingNumbersOperator addOperator = logicalPlan.addOperator("ascend", new AscendingNumbersOperator());
        TestRecieverOperator addOperator2 = logicalPlan.addOperator("console", new TestRecieverOperator());
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        logicalPlan.setAttribute(addOperator2, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
        PartitionedTestPersistanceOperator partitionedTestPersistanceOperator = new PartitionedTestPersistanceOperator();
        LogicalPlan.StreamMeta addStream = logicalPlan.addStream("Stream1", addOperator.outputPort, addOperator2.inport);
        logicalPlan.setInputPortAttribute(addOperator2.inport, Context.PortContext.STREAM_CODEC, new TestPartitionCodec());
        addStream.persistUsing("persister", partitionedTestPersistanceOperator, partitionedTestPersistanceOperator.inport);
        logicalPlan.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(logicalPlan);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List containers = physicalPlan.getContainers();
        Assert.assertEquals("number containers", 4L, containers.size());
        for (int i = 0; i < containers.size(); i++) {
            StreamingContainerManagerTest.assignContainer(streamingContainerManager, "container" + (i + 1));
        }
        List<PTOperator> operators = physicalPlan.getOperators(logicalPlan.getMeta(addOperator2));
        PTOperator pTOperator = null;
        Iterator it = physicalPlan.getContainers().iterator();
        while (it.hasNext()) {
            for (PTOperator pTOperator2 : ((PTContainer) it.next()).getOperators()) {
                pTOperator2.setState(PTOperator.State.ACTIVE);
                if (pTOperator2.getName().equals("persister")) {
                    pTOperator = pTOperator2;
                }
            }
        }
        Set dependents = physicalPlan.getDependents(operators);
        logger.debug("Operators to be re-deployed = {}", dependents);
        Assert.assertTrue("persist operator should be part of the operators to be redeployed", dependents.contains(pTOperator));
        StreamCodecWrapperForPersistance streamCodecWrapperForPersistance = (StreamCodec) addStream.getPersistOperatorInputPort().getValue(Context.PortContext.STREAM_CODEC);
        Assert.assertEquals("Codec should be instance of StreamCodecWrapper", Boolean.valueOf(streamCodecWrapperForPersistance instanceof StreamCodecWrapperForPersistance), true);
        StreamCodecWrapperForPersistance streamCodecWrapperForPersistance2 = streamCodecWrapperForPersistance;
        logger.debug(((Map.Entry) streamCodecWrapperForPersistance2.inputPortToPartitionMap.entrySet().iterator().next()).toString());
        Assert.assertEquals("Size of partitions should be 2", 2L, ((Collection) r0.getValue()).size());
        for (PTOperator pTOperator3 : operators) {
            PartitioningTest.PartitionLoadWatch.put(pTOperator3, -1);
            physicalPlan.onStatusUpdate(pTOperator3);
        }
        streamingContainerManager.processEvents();
        Assert.assertEquals("Input port map", streamCodecWrapperForPersistance2.inputPortToPartitionMap.size(), 1L);
        Map.Entry entry = (Map.Entry) streamCodecWrapperForPersistance2.inputPortToPartitionMap.entrySet().iterator().next();
        Assert.assertEquals("Size of partitions should be 1 after repartition", 1L, ((Collection) entry.getValue()).size());
        logger.debug(entry.toString());
    }
}
