package com.datatorrent.stram.plan.logical;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.DefaultDelayOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import com.google.common.collect.Sets;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Stack;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.validation.ValidationException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/plan/logical/DelayOperatorTest.class */
public class DelayOperatorTest {

    @Rule
    public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
    private static final Lock sequential = new ReentrantLock();
    private static final Logger LOG = LoggerFactory.getLogger(DelayOperatorTest.class);

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/DelayOperatorTest$ExitCondition.class */
    private static class ExitCondition implements Callable<Boolean> {
        private static boolean failed;
        private static String message;
        private final int size;
        private final Callable<Boolean> exitCondition;

        ExitCondition(int i, Callable<Boolean> callable) {
            FailableFibonacciOperator.results.clear();
            failed = false;
            this.size = i;
            this.exitCondition = callable;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() throws Exception {
            return Boolean.valueOf(failed || (FailableFibonacciOperator.results.size() >= this.size && (this.exitCondition == null || this.exitCondition.call().booleanValue())));
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/DelayOperatorTest$FailableDelayOperator.class */
    static class FailableDelayOperator extends DefaultDelayOperator implements Operator.CheckpointListener {
        private FailableOperator failableOperator;

        FailableDelayOperator() {
        }

        public void beginWindow(long j) {
            super.beginWindow(j);
            this.failableOperator.beginWindow(j);
        }

        public void checkpointed(long j) {
            this.failableOperator.checkpointed(j);
        }

        public void committed(long j) {
            this.failableOperator.committed(j);
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/plan/logical/DelayOperatorTest$FailableFibonacciOperator.class */
    static class FailableFibonacciOperator extends FibonacciOperator implements Operator.CheckpointListener {
        private FailableOperator failableOperator;

        FailableFibonacciOperator() {
        }

        public void beginWindow(long j) {
            this.failableOperator.beginWindow(j);
        }

        public void checkpointed(long j) {
            this.failableOperator.checkpointed(j);
        }

        public void committed(long j) {
            this.failableOperator.committed(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/stram/plan/logical/DelayOperatorTest$FailableOperator.class */
    public static class FailableOperator {
        private final String baseOperator;
        private boolean committed;
        private final int simulateFailureWindows;
        private final boolean simulateFailureAfterCommit;
        private int windowCount;
        private static volatile boolean failureSimulated;
        private static final Callable<Boolean> isFailureSimulated = new Callable<Boolean>() { // from class: com.datatorrent.stram.plan.logical.DelayOperatorTest.FailableOperator.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(FailableOperator.failureSimulated);
            }
        };

        private FailableOperator() {
            this.committed = false;
            this.windowCount = 0;
            this.baseOperator = null;
            this.simulateFailureWindows = 0;
            this.simulateFailureAfterCommit = false;
        }

        FailableOperator(BaseOperator baseOperator, int i, boolean z) {
            this.committed = false;
            this.windowCount = 0;
            failureSimulated = false;
            this.baseOperator = baseOperator.getClass().getSimpleName();
            this.simulateFailureWindows = i;
            this.simulateFailureAfterCommit = z;
        }

        void beginWindow(long j) {
            if (this.simulateFailureWindows <= 0 || failureSimulated) {
                return;
            }
            if (this.simulateFailureAfterCommit && !this.committed) {
                if (((int) j) > 16) {
                    DelayOperatorTest.LOG.warn("{} window {} is not committed", this.baseOperator, Codec.getStringWindowId(j));
                    return;
                }
                return;
            }
            DelayOperatorTest.LOG.debug("{} beginWindow {} {} {}", new Object[]{this.baseOperator, Codec.getStringWindowId(j), Integer.valueOf(this.windowCount), Integer.valueOf(this.simulateFailureWindows)});
            int i = this.windowCount;
            this.windowCount = i + 1;
            if (i == this.simulateFailureWindows) {
                failureSimulated = true;
                DelayOperatorTest.LOG.debug("{} is simulating failure", this.baseOperator);
                throw new RuntimeException("simulating " + this.baseOperator + " failure");
            }
        }

        void checkpointed(long j) {
            DelayOperatorTest.LOG.debug("{} checkpointed at {}", this.baseOperator, Codec.getStringWindowId(j));
        }

        void committed(long j) {
            DelayOperatorTest.LOG.debug("{} committed at {}", this.baseOperator, Codec.getStringWindowId(j));
            this.committed = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datatorrent/stram/plan/logical/DelayOperatorTest$FibonacciOperator.class */
    public static class FibonacciOperator extends BaseOperator {
        static final List<BigInteger> results = new ArrayList();
        private int index = 0;
        private BigInteger currentNumber = BigInteger.ONE;
        private transient BigInteger tempNum = BigInteger.ZERO;
        final transient DefaultInputPort<Object> dummyInputPort = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.plan.logical.DelayOperatorTest.FibonacciOperator.1
            public void process(Object obj) {
            }
        };
        final transient DefaultInputPort<BigInteger> input = new DefaultInputPort<BigInteger>() { // from class: com.datatorrent.stram.plan.logical.DelayOperatorTest.FibonacciOperator.2
            public void process(BigInteger bigInteger) {
                FibonacciOperator.this.tempNum = bigInteger;
            }
        };
        final transient DefaultOutputPort<BigInteger> output = new DefaultOutputPort<>();

        FibonacciOperator() {
        }

        public void endWindow() {
            if (ExitCondition.failed) {
                return;
            }
            if (this.index > results.size()) {
                boolean unused = ExitCondition.failed = true;
                String unused2 = ExitCondition.message = "index " + this.index + " > result.size() " + results.size();
                return;
            }
            this.output.emit(this.currentNumber);
            if (this.index == results.size()) {
                results.add(this.currentNumber);
            } else if (!results.get(this.index).equals(this.currentNumber)) {
                boolean unused3 = ExitCondition.failed = true;
                String unused4 = ExitCondition.message = "current number " + this.currentNumber + " does not match result " + results.get(this.index) + " at position " + this.index;
                return;
            }
            this.index++;
            this.currentNumber = this.currentNumber.add(this.tempNum);
        }

        static void assertFibonacci() {
            for (int i = 2; i < results.size(); i++) {
                if (!results.get(i).equals(results.get(i - 1).add(results.get(i - 2)))) {
                    Assert.fail("Not a Fibonacci number " + results.get(i) + " [" + StringUtils.join(results, ",") + "]");
                }
            }
        }
    }

    @Before
    public void setup() {
        sequential.lock();
    }

    @After
    public void teardown() {
        sequential.unlock();
    }

    @Test
    public void testInvalidDelayDetection() {
        LogicalPlan logicalPlan = new LogicalPlan();
        GenericTestOperator addOperator = logicalPlan.addOperator("B", GenericTestOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("C", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("D", GenericTestOperator.class);
        DefaultDelayOperator addOperator4 = logicalPlan.addOperator("opDelay", DefaultDelayOperator.class);
        logicalPlan.addStream("BtoC", addOperator.outport1, addOperator2.inport1);
        logicalPlan.addStream("CtoD", addOperator2.outport1, addOperator3.inport1);
        logicalPlan.addStream("CtoDelay", addOperator2.outport2, addOperator4.input);
        logicalPlan.addStream("DelayToD", addOperator4.output, addOperator3.inport2);
        logicalPlan.findInvalidDelays(logicalPlan.getMeta(addOperator), new ArrayList(), new Stack());
        Assert.assertEquals("operator invalid delay", 1L, r0.size());
        try {
            logicalPlan.validate();
            Assert.fail("validation should fail");
        } catch (ValidationException e) {
        }
        LogicalPlan logicalPlan2 = new LogicalPlan();
        GenericTestOperator addOperator5 = logicalPlan2.addOperator("B", GenericTestOperator.class);
        GenericTestOperator addOperator6 = logicalPlan2.addOperator("C", GenericTestOperator.class);
        GenericTestOperator addOperator7 = logicalPlan2.addOperator("D", GenericTestOperator.class);
        DefaultDelayOperator addOperator8 = logicalPlan2.addOperator("opDelay", DefaultDelayOperator.class);
        logicalPlan2.setOperatorAttribute(addOperator8, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 2);
        logicalPlan2.addStream("BtoC", addOperator5.outport1, addOperator6.inport1);
        logicalPlan2.addStream("CtoD", addOperator6.outport1, addOperator7.inport1);
        logicalPlan2.addStream("CtoDelay", addOperator6.outport2, addOperator8.input);
        logicalPlan2.addStream("DelayToC", addOperator8.output, addOperator6.inport2);
        logicalPlan2.findInvalidDelays(logicalPlan2.getMeta(addOperator5), new ArrayList(), new Stack());
        Assert.assertEquals("operator invalid delay", 1L, r0.size());
        try {
            logicalPlan2.validate();
            Assert.fail("validation should fail");
        } catch (ValidationException e2) {
        }
        LogicalPlan logicalPlan3 = new LogicalPlan();
        GenericTestOperator addOperator9 = logicalPlan3.addOperator("B", GenericTestOperator.class);
        GenericTestOperator addOperator10 = logicalPlan3.addOperator("C", GenericTestOperator.class);
        GenericTestOperator addOperator11 = logicalPlan3.addOperator("D", GenericTestOperator.class);
        DefaultDelayOperator addOperator12 = logicalPlan3.addOperator("opDelay", DefaultDelayOperator.class);
        logicalPlan3.addStream("BtoC", addOperator9.outport1, addOperator10.inport1);
        logicalPlan3.addStream("CtoD", addOperator10.outport1, addOperator11.inport1);
        logicalPlan3.addStream("CtoDelay", addOperator10.outport2, addOperator12.input).setLocality(DAG.Locality.THREAD_LOCAL);
        logicalPlan3.addStream("DelayToC", addOperator12.output, addOperator10.inport2).setLocality(DAG.Locality.THREAD_LOCAL);
        try {
            logicalPlan3.validate();
            Assert.fail("validation should fail");
        } catch (ValidationException e3) {
        }
    }

    @Test
    public void testValidDelay() {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("A", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = logicalPlan.addOperator("B", GenericTestOperator.class);
        GenericTestOperator addOperator3 = logicalPlan.addOperator("C", GenericTestOperator.class);
        GenericTestOperator addOperator4 = logicalPlan.addOperator("D", GenericTestOperator.class);
        DefaultDelayOperator addOperator5 = logicalPlan.addOperator("opDelay", DefaultDelayOperator.class);
        logicalPlan.addStream("AtoB", addOperator.outport, addOperator2.inport1);
        logicalPlan.addStream("BtoC", addOperator2.outport1, addOperator3.inport1);
        logicalPlan.addStream("CtoD", addOperator3.outport1, addOperator4.inport1);
        logicalPlan.addStream("CtoDelay", addOperator3.outport2, addOperator5.input);
        logicalPlan.addStream("DelayToB", addOperator5.output, addOperator2.inport2);
        logicalPlan.validate();
    }

    @Test(timeout = 60000)
    public void testFibonacci() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        TestGeneratorInputOperator addOperator = logicalPlan.addOperator("DUMMY", TestGeneratorInputOperator.class);
        FibonacciOperator addOperator2 = logicalPlan.addOperator("FIB", FibonacciOperator.class);
        DefaultDelayOperator addOperator3 = logicalPlan.addOperator("opDelay", DefaultDelayOperator.class);
        logicalPlan.addStream("dummy_to_operator", addOperator.outport, addOperator2.dummyInputPort);
        logicalPlan.addStream("operator_to_delay", addOperator2.output, addOperator3.input);
        logicalPlan.addStream("delay_to_operator", addOperator3.output, addOperator2.input);
        new StramLocalCluster(logicalPlan).run(new ExitCondition(10, null));
        Assert.assertFalse(ExitCondition.message, ExitCondition.failed);
        FibonacciOperator.assertFibonacci();
    }

    @Test(timeout = 60000)
    public void testFibonacciRecovery1() throws Exception {
        LogicalPlan createDAG = StramTestSupport.createDAG(this.testMeta);
        TestGeneratorInputOperator addOperator = createDAG.addOperator("DUMMY", TestGeneratorInputOperator.class);
        FailableFibonacciOperator addOperator2 = createDAG.addOperator("FIB", FailableFibonacciOperator.class);
        DefaultDelayOperator addOperator3 = createDAG.addOperator("opDelay", DefaultDelayOperator.class);
        addOperator2.failableOperator = new FailableOperator(addOperator2, 3, true);
        createDAG.addStream("dummy_to_operator", addOperator.outport, addOperator2.dummyInputPort);
        createDAG.addStream("operator_to_delay", addOperator2.output, addOperator3.input);
        createDAG.addStream("delay_to_operator", addOperator3.output, addOperator2.input);
        createDAG.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
        createDAG.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
        createDAG.getAttributes().put(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(createDAG);
        stramLocalCluster.setPerContainerBufferServer(true);
        stramLocalCluster.run(new ExitCondition(30, FailableOperator.isFailureSimulated));
        Assert.assertFalse(ExitCondition.message, ExitCondition.failed);
        Assert.assertTrue(FibonacciOperator.results.size() >= 30);
        FibonacciOperator.assertFibonacci();
    }

    @Test(timeout = 60000)
    public void testFibonacciRecovery2() throws Exception {
        LogicalPlan createDAG = StramTestSupport.createDAG(this.testMeta);
        TestGeneratorInputOperator addOperator = createDAG.addOperator("DUMMY", TestGeneratorInputOperator.class);
        FibonacciOperator addOperator2 = createDAG.addOperator("FIB", FibonacciOperator.class);
        FailableDelayOperator addOperator3 = createDAG.addOperator("opDelay", FailableDelayOperator.class);
        addOperator3.failableOperator = new FailableOperator(addOperator3, 5, true);
        createDAG.addStream("dummy_to_operator", addOperator.outport, addOperator2.dummyInputPort);
        createDAG.addStream("operator_to_delay", addOperator2.output, addOperator3.input);
        createDAG.addStream("delay_to_operator", addOperator3.output, addOperator2.input);
        createDAG.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
        createDAG.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
        createDAG.getAttributes().put(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(createDAG);
        stramLocalCluster.setPerContainerBufferServer(true);
        stramLocalCluster.run(new ExitCondition(30, FailableOperator.isFailureSimulated));
        Assert.assertFalse(ExitCondition.message, ExitCondition.failed);
        Assert.assertTrue(FibonacciOperator.results.size() >= 30);
        FibonacciOperator.assertFibonacci();
    }

    @Test
    public void testCheckpointUpdate() {
        LogicalPlan createDAG = StramTestSupport.createDAG(this.testMeta);
        TestGeneratorInputOperator addOperator = createDAG.addOperator("A", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = createDAG.addOperator("B", GenericTestOperator.class);
        GenericTestOperator addOperator3 = createDAG.addOperator("C", GenericTestOperator.class);
        GenericTestOperator addOperator4 = createDAG.addOperator("D", GenericTestOperator.class);
        DefaultDelayOperator addOperator5 = createDAG.addOperator("opDelay", new DefaultDelayOperator());
        createDAG.addStream("AtoB", addOperator.outport, addOperator2.inport1);
        createDAG.addStream("BtoC", addOperator2.outport1, addOperator3.inport1);
        createDAG.addStream("CtoD", addOperator3.outport1, addOperator4.inport1);
        createDAG.addStream("CtoDelay", addOperator3.outport2, addOperator5.input);
        createDAG.addStream("DelayToB", addOperator5.output, addOperator2.inport2);
        createDAG.validate();
        createDAG.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(createDAG);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        Iterator it = physicalPlan.getAllOperators().values().iterator();
        while (it.hasNext()) {
            ((PTOperator) it.next()).setState(PTOperator.State.ACTIVE);
        }
        SystemClock systemClock = new SystemClock();
        PTOperator pTOperator = (PTOperator) physicalPlan.getOperators(createDAG.getMeta(addOperator)).get(0);
        PTOperator pTOperator2 = (PTOperator) physicalPlan.getOperators(createDAG.getMeta(addOperator2)).get(0);
        PTOperator pTOperator3 = (PTOperator) physicalPlan.getOperators(createDAG.getMeta(addOperator3)).get(0);
        PTOperator pTOperator4 = (PTOperator) physicalPlan.getOperators(createDAG.getMeta(addOperator5)).get(0);
        PTOperator pTOperator5 = (PTOperator) physicalPlan.getOperators(createDAG.getMeta(addOperator4)).get(0);
        Checkpoint checkpoint = new Checkpoint(3L, 0, 0);
        Checkpoint checkpoint2 = new Checkpoint(5L, 0, 0);
        Checkpoint checkpoint3 = new Checkpoint(4L, 0, 0);
        pTOperator2.checkpoints.add(checkpoint);
        pTOperator3.checkpoints.add(checkpoint);
        pTOperator3.checkpoints.add(checkpoint3);
        pTOperator4.checkpoints.add(checkpoint);
        pTOperator4.checkpoints.add(checkpoint2);
        pTOperator5.checkpoints.add(checkpoint2);
        HashSet newHashSet = Sets.newHashSet(new LogicalPlan.OperatorMeta[]{createDAG.getMeta(addOperator2), createDAG.getMeta(addOperator3), createDAG.getMeta(addOperator5)});
        HashMap hashMap = new HashMap();
        Iterator it2 = newHashSet.iterator();
        while (it2.hasNext()) {
            hashMap.put((LogicalPlan.OperatorMeta) it2.next(), newHashSet);
        }
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator2, new StreamingContainerManager.UpdateCheckpointsContext(systemClock, false, hashMap), false);
        Assert.assertEquals("checkpoint " + pTOperator, Checkpoint.INITIAL_CHECKPOINT, pTOperator.getRecoveryCheckpoint());
        Assert.assertEquals("checkpoint " + pTOperator2, checkpoint, pTOperator3.getRecoveryCheckpoint());
        Assert.assertEquals("checkpoint " + pTOperator3, checkpoint, pTOperator3.getRecoveryCheckpoint());
        Assert.assertEquals("checkpoint " + pTOperator5, checkpoint2, pTOperator5.getRecoveryCheckpoint());
    }

    @Test
    public void testValidationWithMultipleStreamLoops() {
        LogicalPlan createDAG = StramTestSupport.createDAG(this.testMeta);
        TestGeneratorInputOperator addOperator = createDAG.addOperator("A", TestGeneratorInputOperator.class);
        GenericTestOperator addOperator2 = createDAG.addOperator("Op1", GenericTestOperator.class);
        GenericTestOperator addOperator3 = createDAG.addOperator("Op2", GenericTestOperator.class);
        DefaultDelayOperator addOperator4 = createDAG.addOperator("Delay", DefaultDelayOperator.class);
        createDAG.addStream("Source", addOperator.outport, addOperator2.inport1);
        createDAG.addStream("Stream1", addOperator2.outport1, addOperator3.inport1);
        createDAG.addStream("Stream2", addOperator2.outport2, addOperator3.inport2);
        createDAG.addStream("Op to Delay", addOperator3.outport1, addOperator4.input);
        createDAG.addStream("Delay to Op", addOperator4.output, addOperator2.inport2);
        createDAG.validate();
    }
}
