package com.datatorrent.stram.plan.logical;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.DefaultDelayOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Stack;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.validation.ValidationException;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:57)
    */
/* loaded from: input_file:com/datatorrent/stram/plan/logical/DelayOperatorTest.class */
public class DelayOperatorTest {

    @Rule
    public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
    private static Lock sequential = new ReentrantLock();
    public static final Long[] FIBONACCI_NUMBERS = {1L, 1L, 2L, 3L, 5L, 8L, 13L, 21L, 34L, 55L, 89L, 144L, 233L, 377L, 610L, 987L, 1597L, 2584L, 4181L, 6765L, 10946L, 17711L, 28657L, 46368L, 75025L, 121393L, 196418L, 317811L, 514229L, 832040L, 1346269L, 2178309L, 3524578L, 5702887L, 9227465L, 14930352L, 24157817L, 39088169L, 63245986L, 102334155L};
    private static final Logger LOG = LoggerFactory.getLogger(DelayOperatorTest.class);

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/DelayOperatorTest$FailableDelayOperator.class */
    public static class FailableDelayOperator extends DefaultDelayOperator implements Operator.CheckpointListener {
        private boolean committed = false;
        private int simulateFailureWindows = 0;
        private boolean simulateFailureAfterCommit = false;
        private int windowCount = 0;
        private static volatile boolean failureSimulated = false;

        public void beginWindow(long j) {
            super.beginWindow(j);
            if (this.simulateFailureWindows > 0) {
                if ((!(this.simulateFailureAfterCommit && this.committed) && this.simulateFailureAfterCommit) || failureSimulated) {
                    return;
                }
                DelayOperatorTest.LOG.debug("FailableDelayOperator beginWindow {} {} {}", new Object[]{Long.valueOf(j), Integer.valueOf(this.windowCount), Integer.valueOf(this.simulateFailureWindows)});
                int i = this.windowCount;
                this.windowCount = i + 1;
                if (i == this.simulateFailureWindows) {
                    failureSimulated = true;
                    DelayOperatorTest.LOG.debug("FailableDelayOperator is simulating failure {}", Long.valueOf(j));
                    throw new RuntimeException("simulating failure");
                }
            }
        }

        public void checkpointed(long j) {
            DelayOperatorTest.LOG.debug("FailableDelayOperator is checkpointed {}", Long.valueOf(j));
        }

        public void committed(long j) {
            DelayOperatorTest.LOG.debug("FailableDelayOperator is committed {}", Long.valueOf(j));
            this.committed = true;
        }

        public void setSimulateFailureWindows(int i, boolean z) {
            this.simulateFailureAfterCommit = z;
            this.simulateFailureWindows = i;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/DelayOperatorTest$FailableFibonacciOperator.class */
    public static class FailableFibonacciOperator extends FibonacciOperator implements Operator.CheckpointListener {
        private boolean committed = false;
        private int simulateFailureWindows = 0;
        private boolean simulateFailureAfterCommit = false;
        private int windowCount = 0;
        public static volatile boolean failureSimulated = false;

        public void beginWindow(long j) {
            if (this.simulateFailureWindows > 0) {
                if ((!(this.simulateFailureAfterCommit && this.committed) && this.simulateFailureAfterCommit) || failureSimulated) {
                    return;
                }
                DelayOperatorTest.LOG.debug("FailableFibonacciOperator beginWindow {} {} {}", new Object[]{Long.valueOf(j), Integer.valueOf(this.windowCount), Integer.valueOf(this.simulateFailureWindows)});
                int i = this.windowCount;
                this.windowCount = i + 1;
                if (i == this.simulateFailureWindows) {
                    failureSimulated = true;
                    DelayOperatorTest.LOG.debug("FailableFibonacciOperator is simulating failure");
                    throw new RuntimeException("simulating failure");
                }
            }
        }

        public void checkpointed(long j) {
            DelayOperatorTest.LOG.debug("FailableFibonacciOperator is checkpointed {}", Long.valueOf(j));
        }

        public void committed(long j) {
            DelayOperatorTest.LOG.debug("FailableFibonacciOperator is committed {}", Long.valueOf(j));
            this.committed = true;
        }

        public void setSimulateFailureWindows(int i, boolean z) {
            this.simulateFailureAfterCommit = z;
            this.simulateFailureWindows = i;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/DelayOperatorTest$FibonacciOperator.class */
    public static class FibonacciOperator extends BaseOperator {
        public static List<Long> results = new ArrayList();
        private transient long tempNum;
        public long currentNumber = 1;
        public transient DefaultInputPort<Object> dummyInputPort = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.logical.DelayOperatorTest.FibonacciOperator.1
            public void process(Object obj) {
            }
        };
        public transient DefaultInputPort<Long> input = new DefaultInputPort<Long>() { // from class: com.datatorrent.stram.plan.logical.DelayOperatorTest.FibonacciOperator.2
            /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
                jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: com.datatorrent.stram.plan.logical.DelayOperatorTest.FibonacciOperator.access$002(com.datatorrent.stram.plan.logical.DelayOperatorTest$FibonacciOperator, long):long
                	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
                	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
                Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: com.datatorrent.stram.plan.logical.DelayOperatorTest
                	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
                	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
                	... 1 more
                */
            public void process(java.lang.Long r5) {
                /*
                    r4 = this;
                    r0 = r4
                    com.datatorrent.stram.plan.logical.DelayOperatorTest$FibonacciOperator r0 = com.datatorrent.stram.plan.logical.DelayOperatorTest.FibonacciOperator.this
                    r1 = r5
                    long r1 = r1.longValue()
                    long r0 = com.datatorrent.stram.plan.logical.DelayOperatorTest.FibonacciOperator.access$002(r0, r1)
                    return
                */
                throw new UnsupportedOperationException("Method not decompiled: com.datatorrent.stram.plan.logical.DelayOperatorTest.FibonacciOperator.AnonymousClass2.process(java.lang.Long):void");
            }
        };
        public transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();

        public FibonacciOperator() {
        }

        public void endWindow() {
            this.output.emit(Long.valueOf(this.currentNumber));
            results.add(Long.valueOf(this.currentNumber));
            this.currentNumber += this.tempNum;
            if (this.currentNumber <= 0) {
                this.currentNumber = 1L;
            }
        }

        /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: com.datatorrent.stram.plan.logical.DelayOperatorTest.FibonacciOperator.access$002(com.datatorrent.stram.plan.logical.DelayOperatorTest$FibonacciOperator, long):long
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        static /* synthetic */ long access$002(com.datatorrent.stram.plan.logical.DelayOperatorTest.FibonacciOperator r6, long r7) {
            /*
                r0 = r6
                r1 = r7
                // decode failed: arraycopy: source index -1 out of bounds for object array[6]
                r0.tempNum = r1
                return r-1
            */
            throw new UnsupportedOperationException("Method not decompiled: com.datatorrent.stram.plan.logical.DelayOperatorTest.FibonacciOperator.access$002(com.datatorrent.stram.plan.logical.DelayOperatorTest$FibonacciOperator, long):long");
        }

        static {
        }
    }

    public DelayOperatorTest() {
    }

    @Before
    public void setup() {
        sequential.lock();
    }

    @After
    public void teardown() {
        sequential.unlock();
    }

    @Test
    public void testInvalidDelayDetection() {
        LogicalPlan logicalPlan = new LogicalPlan();
        GenericTestOperator addOperator = logicalPlan.addOperator("B", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("C", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("D", GenericTestOperator.class);
        DefaultDelayOperator addOperator4 = logicalPlan.addOperator("opDelay", DefaultDelayOperator.class);
        logicalPlan.addStream("BtoC", addOperator.outport1, addOperator2.inport1);
        logicalPlan.addStream("CtoD", addOperator2.outport1, addOperator3.inport1);
        logicalPlan.addStream("CtoDelay", addOperator2.outport2, addOperator4.input);
        logicalPlan.addStream("DelayToD", addOperator4.output, addOperator3.inport2);
        logicalPlan.findInvalidDelays(logicalPlan.getMeta(addOperator), new ArrayList(), new Stack());
        Assert.assertEquals("operator invalid delay", 1L, r0.size());
        try {
            logicalPlan.validate();
            Assert.fail("validation should fail");
        } catch (ValidationException e) {
        }
        LogicalPlan logicalPlan2 = new LogicalPlan();
        GenericTestOperator addOperator5 = logicalPlan2.addOperator("B", GenericTestOperator.class);
        GenericTestOperator addOperator6 = logicalPlan2.addOperator("C", GenericTestOperator.class);
        GenericTestOperator addOperator7 = logicalPlan2.addOperator("D", GenericTestOperator.class);
        DefaultDelayOperator addOperator8 = logicalPlan2.addOperator("opDelay", DefaultDelayOperator.class);
        logicalPlan2.setOperatorAttribute(addOperator8, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 2);
        logicalPlan2.addStream("BtoC", addOperator5.outport1, addOperator6.inport1);
        logicalPlan2.addStream("CtoD", addOperator6.outport1, addOperator7.inport1);
        logicalPlan2.addStream("CtoDelay", addOperator6.outport2, addOperator8.input);
        logicalPlan2.addStream("DelayToC", addOperator8.output, addOperator6.inport2);
        logicalPlan2.findInvalidDelays(logicalPlan2.getMeta(addOperator5), new ArrayList(), new Stack());
        Assert.assertEquals("operator invalid delay", 1L, r0.size());
        try {
            logicalPlan2.validate();
            Assert.fail("validation should fail");
        } catch (ValidationException e2) {
        }
        LogicalPlan logicalPlan3 = new LogicalPlan();
        GenericTestOperator addOperator9 = logicalPlan3.addOperator("B", GenericTestOperator.class);
        GenericTestOperator addOperator10 = logicalPlan3.addOperator("C", GenericTestOperator.class);
        GenericTestOperator addOperator11 = logicalPlan3.addOperator("D", GenericTestOperator.class);
        DefaultDelayOperator addOperator12 = logicalPlan3.addOperator("opDelay", DefaultDelayOperator.class);
        logicalPlan3.addStream("BtoC", addOperator9.outport1, addOperator10.inport1);
        logicalPlan3.addStream("CtoD", addOperator10.outport1, addOperator11.inport1);
        logicalPlan3.addStream("CtoDelay", addOperator10.outport2, addOperator12.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan3.addStream("DelayToC", addOperator12.output, addOperator10.inport2).setLocality(DAG.Locality.THREAD_LOCAL);
        try {
            logicalPlan3.validate();
            Assert.fail("validation should fail");
        } catch (ValidationException e3) {
        }
    }

    @Test
    public void testValidDelay() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("A", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("B", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("C", GenericTestOperator.class);
        GenericTestOperator addOperator4 = logicalPlan.addOperator("D", GenericTestOperator.class);
        DefaultDelayOperator addOperator5 = logicalPlan.addOperator("opDelay", DefaultDelayOperator.class);
        logicalPlan.addStream("AtoB", addOperator.outport, addOperator2.inport1);
        logicalPlan.addStream("BtoC", addOperator2.outport1, addOperator3.inport1);
        logicalPlan.addStream("CtoD", addOperator3.outport1, addOperator4.inport1);
        logicalPlan.addStream("CtoDelay", addOperator3.outport2, addOperator5.input);
        logicalPlan.addStream("DelayToB", addOperator5.output, addOperator2.inport2);
        logicalPlan.validate();
    }

    @Test
    public void testFibonacci() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("DUMMY", TestGeneratorInputOperator.class);
        FibonacciOperator addOperator2 = logicalPlan.addOperator("FIB", FibonacciOperator.class);
        DefaultDelayOperator addOperator3 = logicalPlan.addOperator("opDelay", DefaultDelayOperator.class);
        logicalPlan.addStream("dummy_to_operator", addOperator.outport, addOperator2.dummyInputPort);
        logicalPlan.addStream("operator_to_delay", addOperator2.output, addOperator3.input);
        logicalPlan.addStream("delay_to_operator", addOperator3.output, addOperator2.input);
        FibonacciOperator.results.clear();
        StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan);
        stramLocalCluster.setExitCondition(new Callable<Boolean>() { // from class: com.datatorrent.stram.plan.logical.DelayOperatorTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(FibonacciOperator.results.size() >= 10);
            }
        });
        stramLocalCluster.run(10000L);
        Assert.assertArrayEquals(Arrays.copyOfRange(FIBONACCI_NUMBERS, 0, 10), FibonacciOperator.results.subList(0, 10).toArray());
    }

    @Test
    public void testFibonacciRecovery1() throws Exception {
        LogicalPlan createDAG = StramTestSupport.createDAG(this.testMeta);
        TestGeneratorInputOperator addOperator = createDAG.addOperator("DUMMY", TestGeneratorInputOperator.class);
        FailableFibonacciOperator addOperator2 = createDAG.addOperator("FIB", FailableFibonacciOperator.class);
        DefaultDelayOperator addOperator3 = createDAG.addOperator("opDelay", DefaultDelayOperator.class);
        addOperator2.setSimulateFailureWindows(3, true);
        createDAG.addStream("dummy_to_operator", addOperator.outport, addOperator2.dummyInputPort);
        createDAG.addStream("operator_to_delay", addOperator2.output, addOperator3.input);
        createDAG.addStream("delay_to_operator", addOperator3.output, addOperator2.input);
        createDAG.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
        createDAG.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
        createDAG.getAttributes().put(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50);
        FailableFibonacciOperator.results.clear();
        FailableFibonacciOperator.failureSimulated = false;
        StramLocalCluster stramLocalCluster = new StramLocalCluster(createDAG);
        stramLocalCluster.setPerContainerBufferServer(true);
        stramLocalCluster.setExitCondition(new Callable<Boolean>() { // from class: com.datatorrent.stram.plan.logical.DelayOperatorTest.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(FailableFibonacciOperator.results.size() >= 30);
            }
        });
        stramLocalCluster.run(60000L);
        Assert.assertTrue("failure should be invoked", FailableFibonacciOperator.failureSimulated);
        Assert.assertArrayEquals(Arrays.copyOfRange(new TreeSet(Arrays.asList(FIBONACCI_NUMBERS)).toArray(), 0, 20), Arrays.copyOfRange(new TreeSet(FibonacciOperator.results).toArray(), 0, 20));
    }

    @Test
    public void testFibonacciRecovery2() throws Exception {
        LogicalPlan createDAG = StramTestSupport.createDAG(this.testMeta);
        TestGeneratorInputOperator addOperator = createDAG.addOperator("DUMMY", TestGeneratorInputOperator.class);
        FibonacciOperator addOperator2 = createDAG.addOperator("FIB", FibonacciOperator.class);
        FailableDelayOperator addOperator3 = createDAG.addOperator("opDelay", FailableDelayOperator.class);
        addOperator3.setSimulateFailureWindows(5, true);
        createDAG.addStream("dummy_to_operator", addOperator.outport, addOperator2.dummyInputPort);
        createDAG.addStream("operator_to_delay", addOperator2.output, addOperator3.input);
        createDAG.addStream("delay_to_operator", addOperator3.output, addOperator2.input);
        createDAG.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
        createDAG.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
        createDAG.getAttributes().put(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50);
        FibonacciOperator.results.clear();
        boolean unused = FailableDelayOperator.failureSimulated = false;
        StramLocalCluster stramLocalCluster = new StramLocalCluster(createDAG);
        stramLocalCluster.setPerContainerBufferServer(true);
        stramLocalCluster.setExitCondition(new Callable<Boolean>() { // from class: com.datatorrent.stram.plan.logical.DelayOperatorTest.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(FibonacciOperator.results.size() >= 30);
            }
        });
        stramLocalCluster.run(60000L);
        Assert.assertTrue("failure should be invoked", FailableDelayOperator.failureSimulated);
        Assert.assertArrayEquals(Arrays.copyOfRange(new TreeSet(Arrays.asList(FIBONACCI_NUMBERS)).toArray(), 0, 20), Arrays.copyOfRange(new TreeSet(FibonacciOperator.results).toArray(), 0, 20));
    }

    @Test
    public void testCheckpointUpdate() {
        LogicalPlan createDAG = StramTestSupport.createDAG(this.testMeta);
        TestGeneratorInputOperator addOperator = createDAG.addOperator("A", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = createDAG.addOperator("B", GenericTestOperator.class);
        GenericTestOperator addOperator3 = createDAG.addOperator("C", GenericTestOperator.class);
        GenericTestOperator addOperator4 = createDAG.addOperator("D", GenericTestOperator.class);
        DefaultDelayOperator addOperator5 = createDAG.addOperator("opDelay", new DefaultDelayOperator());
        createDAG.addStream("AtoB", addOperator.outport, addOperator2.inport1);
        createDAG.addStream("BtoC", addOperator2.outport1, addOperator3.inport1);
        createDAG.addStream("CtoD", addOperator3.outport1, addOperator4.inport1);
        createDAG.addStream("CtoDelay", addOperator3.outport2, addOperator5.input);
        createDAG.addStream("DelayToB", addOperator5.output, addOperator2.inport2);
        createDAG.validate();
        createDAG.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(createDAG);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        Iterator it = physicalPlan.getAllOperators().values().iterator();
        while (it.hasNext()) {
            ((PTOperator) it.next()).setState(PTOperator.State.ACTIVE);
        }
        SystemClock systemClock = new SystemClock();
        PTOperator pTOperator = (PTOperator) physicalPlan.getOperators(createDAG.getMeta(addOperator)).get(0);
        PTOperator pTOperator2 = (PTOperator) physicalPlan.getOperators(createDAG.getMeta(addOperator2)).get(0);
        PTOperator pTOperator3 = (PTOperator) physicalPlan.getOperators(createDAG.getMeta(addOperator3)).get(0);
        PTOperator pTOperator4 = (PTOperator) physicalPlan.getOperators(createDAG.getMeta(addOperator5)).get(0);
        PTOperator pTOperator5 = (PTOperator) physicalPlan.getOperators(createDAG.getMeta(addOperator4)).get(0);
        Checkpoint checkpoint = new Checkpoint(3L, 0, 0);
        Checkpoint checkpoint2 = new Checkpoint(5L, 0, 0);
        Checkpoint checkpoint3 = new Checkpoint(4L, 0, 0);
        pTOperator2.checkpoints.add(checkpoint);
        pTOperator3.checkpoints.add(checkpoint);
        pTOperator3.checkpoints.add(checkpoint3);
        pTOperator4.checkpoints.add(checkpoint);
        pTOperator4.checkpoints.add(checkpoint2);
        pTOperator5.checkpoints.add(checkpoint2);
        HashSet newHashSet = Sets.newHashSet(new LogicalPlan.OperatorMeta[]{createDAG.getMeta(addOperator2), createDAG.getMeta(addOperator3), createDAG.getMeta(addOperator5)});
        HashMap hashMap = new HashMap();
        Iterator it2 = newHashSet.iterator();
        while (it2.hasNext()) {
            hashMap.put((LogicalPlan.OperatorMeta) it2.next(), newHashSet);
        }
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator2, new StreamingContainerManager.UpdateCheckpointsContext(systemClock, false, hashMap), false);
        Assert.assertEquals("checkpoint " + pTOperator, Checkpoint.INITIAL_CHECKPOINT, pTOperator.getRecoveryCheckpoint());
        Assert.assertEquals("checkpoint " + pTOperator2, checkpoint, pTOperator3.getRecoveryCheckpoint());
        Assert.assertEquals("checkpoint " + pTOperator3, checkpoint, pTOperator3.getRecoveryCheckpoint());
        Assert.assertEquals("checkpoint " + pTOperator5, checkpoint2, pTOperator5.getRecoveryCheckpoint());
    }

    @Test
    public void testValidationWithMultipleStreamLoops() {
        LogicalPlan createDAG = StramTestSupport.createDAG(this.testMeta);
        TestGeneratorInputOperator addOperator = createDAG.addOperator("A", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = createDAG.addOperator("Op1", GenericTestOperator.class);
        GenericTestOperator addOperator3 = createDAG.addOperator("Op2", GenericTestOperator.class);
        DefaultDelayOperator addOperator4 = createDAG.addOperator("Delay", DefaultDelayOperator.class);
        createDAG.addStream("Source", addOperator.outport, addOperator2.inport1);
        createDAG.addStream("Stream1", addOperator2.outport1, addOperator3.inport1);
        createDAG.addStream("Stream2", addOperator2.outport2, addOperator3.inport2);
        createDAG.addStream("Op to Delay", addOperator3.outport1, addOperator4.input);
        createDAG.addStream("Delay to Op", addOperator4.output, addOperator2.inport2);
        createDAG.validate();
    }

    static {
    }
}
