package com.datatorrent.stram.engine;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Sink;
import com.datatorrent.api.Stats;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.bufferserver.packet.PayloadTuple;
import com.datatorrent.bufferserver.server.Server;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.stram.CustomControlTupleTest;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
import com.datatorrent.stram.stream.BufferServerPublisher;
import com.datatorrent.stram.stream.BufferServerSubscriber;
import com.datatorrent.stram.stream.OiOStream;
import com.datatorrent.stram.tuple.CustomControlTuple;
import com.datatorrent.stram.tuple.EndStreamTuple;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.lang.Thread;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/engine/GenericNodeTest.class */
public class GenericNodeTest {

    @Rule
    public FSTestWatcher testMeta = new FSTestWatcher();
    private static final Logger LOG = LoggerFactory.getLogger(GenericNodeTest.class);

    /* loaded from: input_file:com/datatorrent/stram/engine/GenericNodeTest$CheckpointDistanceOperator.class */
    public static class CheckpointDistanceOperator extends GenericOperator {
        List<Integer> distances = new ArrayList();
        int numWindows = 0;
        int maxWindows = 0;

        @Override // com.datatorrent.stram.engine.GenericNodeTest.GenericOperator
        public void beginWindow(long j) {
            super.beginWindow(j);
            int i = this.numWindows;
            this.numWindows = i + 1;
            if (i < this.maxWindows) {
                this.distances.add(Integer.valueOf(this.context.getWindowsFromCheckpoint()));
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/GenericNodeTest$DelayAsyncFSStorageAgent.class */
    public static class DelayAsyncFSStorageAgent extends AsyncFSStorageAgent {
        private static final long serialVersionUID = 201511301205L;
        private long delayMS;

        public DelayAsyncFSStorageAgent(String str, String str2, Configuration configuration) {
            super(str, str2, configuration);
            this.delayMS = 2000L;
        }

        public DelayAsyncFSStorageAgent(String str, Configuration configuration) {
            super(str, configuration);
            this.delayMS = 2000L;
        }

        public void save(Object obj, int i, long j) throws IOException {
        }

        public void copyToHDFS(int i, long j) throws IOException {
            try {
                Thread.sleep(this.delayMS);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        public long getDelayMS() {
            return this.delayMS;
        }

        public void setDelayMS(long j) {
            this.delayMS = j;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/GenericNodeTest$FSTestWatcher.class */
    public static class FSTestWatcher extends TestWatcher {
        private String dir;

        public String getDir() {
            return this.dir;
        }

        protected void starting(Description description) {
            this.dir = "target/" + description.getClassName() + "/" + description.getMethodName();
        }

        protected void finished(Description description) {
            super.finished(description);
            FileUtils.deleteQuietly(new File(this.dir));
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/GenericNodeTest$GenericOperator.class */
    public static class GenericOperator implements Operator {
        Context.OperatorContext context;
        long beginWindowId;
        long endWindowId;
        public final transient DefaultInputPort<Object> ip1 = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.engine.GenericNodeTest.GenericOperator.1
            public void process(Object obj) {
                GenericOperator.this.op.emit(obj);
            }
        };

        @InputPortFieldAnnotation(optional = true)
        public final transient DefaultInputPort<Object> ip2 = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.engine.GenericNodeTest.GenericOperator.2
            public void process(Object obj) {
                GenericOperator.this.op.emit(obj);
            }
        };

        @OutputPortFieldAnnotation(optional = true)
        DefaultOutputPort<Object> op = new DefaultOutputPort<>();

        public void beginWindow(long j) {
            this.beginWindowId = j;
        }

        public void endWindow() {
            this.endWindowId = this.beginWindowId;
        }

        public void setup(Context.OperatorContext operatorContext) {
            this.context = operatorContext;
        }

        public void teardown() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/GenericNodeTest$TestStatsOperatorContext.class */
    public static class TestStatsOperatorContext extends OperatorContext {
        private static final long serialVersionUID = 201511301206L;
        public volatile List<Checkpoint> checkpoints;

        public TestStatsOperatorContext(int i, String str, Attribute.AttributeMap attributeMap, Context context) {
            super(i, str, attributeMap, context);
            this.checkpoints = Lists.newArrayList();
        }

        public void report(Stats.OperatorStats operatorStats, long j) {
            super.report(operatorStats, j);
            if (operatorStats.checkpoint != null) {
                this.checkpoints.add((Checkpoint) operatorStats.checkpoint);
            }
        }
    }

    @Test
    public void testSynchingLogic() throws InterruptedException {
        final ArrayList arrayList = new ArrayList();
        final GenericNode genericNode = new GenericNode(new GenericOperator(), new OperatorContext(0, "operator", new Attribute.AttributeMap.DefaultAttributeMap(), (Context) null));
        genericNode.setId(1);
        AbstractReservoir newReservoir = AbstractReservoir.newReservoir("ip1Res", 1024);
        AbstractReservoir newReservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024);
        Sink<Object> sink = new Sink<Object>() { // from class: com.datatorrent.stram.engine.GenericNodeTest.1
            public void put(Object obj) {
                arrayList.add(obj);
            }

            public int getCount(boolean z) {
                return 0;
            }
        };
        genericNode.connectInputPort("ip1", newReservoir);
        genericNode.connectInputPort("ip2", newReservoir2);
        genericNode.connectOutputPort("op", sink);
        genericNode.firstWindowMillis = 0L;
        genericNode.windowWidthMillis = 100L;
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: com.datatorrent.stram.engine.GenericNodeTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                atomicBoolean.set(true);
                genericNode.activate();
                genericNode.run();
                genericNode.deactivate();
            }
        };
        thread.start();
        do {
            Thread.sleep(25L);
        } while (!atomicBoolean.get());
        Tuple tuple = new Tuple(MessageType.BEGIN_WINDOW, 1L);
        newReservoir.add(tuple);
        Thread.sleep(25L);
        Assert.assertEquals(1L, arrayList.size());
        newReservoir2.add(tuple);
        Thread.sleep(25L);
        Assert.assertEquals(1L, arrayList.size());
        EndWindowTuple endWindowTuple = new EndWindowTuple(1L);
        newReservoir.add(endWindowTuple);
        Thread.sleep(25L);
        Assert.assertEquals(1L, arrayList.size());
        Tuple tuple2 = new Tuple(MessageType.BEGIN_WINDOW, 2L);
        newReservoir.add(tuple2);
        Thread.sleep(25L);
        Assert.assertEquals(1L, arrayList.size());
        newReservoir2.add(endWindowTuple);
        Thread.sleep(25L);
        Assert.assertEquals(3L, arrayList.size());
        newReservoir2.add(tuple2);
        Thread.sleep(25L);
        Assert.assertEquals(3L, arrayList.size());
        EndWindowTuple endWindowTuple2 = new EndWindowTuple(2L);
        newReservoir2.add(endWindowTuple2);
        Thread.sleep(25L);
        Assert.assertEquals(3L, arrayList.size());
        newReservoir.add(endWindowTuple2);
        Thread.sleep(25L);
        Assert.assertEquals(4L, arrayList.size());
        EndStreamTuple endStreamTuple = new EndStreamTuple(0L);
        newReservoir.add(endStreamTuple);
        Thread.sleep(25L);
        Assert.assertEquals(4L, arrayList.size());
        newReservoir2.add(new Tuple(MessageType.BEGIN_WINDOW, 3L));
        Thread.sleep(25L);
        Assert.assertEquals(5L, arrayList.size());
        newReservoir2.add(new EndWindowTuple(3L));
        Thread.sleep(25L);
        Assert.assertEquals(6L, arrayList.size());
        Assert.assertNotSame(Thread.State.TERMINATED, thread.getState());
        newReservoir2.add(endStreamTuple);
        Thread.sleep(25L);
        Assert.assertEquals(7L, arrayList.size());
        Thread.sleep(25L);
        Assert.assertEquals(Thread.State.TERMINATED, thread.getState());
    }

    @Test
    public void testBufferServerSubscriberActivationBeforeOperator() throws InterruptedException, IOException {
        DefaultEventLoop createEventLoop = DefaultEventLoop.createEventLoop("StreamTestEventLoop");
        createEventLoop.start();
        Server server = new Server(createEventLoop, 0);
        int port = server.run().getPort();
        DefaultStatefulStreamCodec defaultStatefulStreamCodec = new DefaultStatefulStreamCodec();
        final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10);
        final GenericNode genericNode = new GenericNode(new GenericTestOperator(), new OperatorContext(0, "operator", new Attribute.AttributeMap.DefaultAttributeMap(), (Context) null));
        genericNode.setId(1);
        Sink<Object> sink = new Sink<Object>() { // from class: com.datatorrent.stram.engine.GenericNodeTest.3
            public void put(Object obj) {
                arrayBlockingQueue.add(obj);
            }

            public int getCount(boolean z) {
                return 0;
            }
        };
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", port);
        StreamContext streamContext = new StreamContext("streamName");
        streamContext.setSourceId("upstreamNodeId");
        streamContext.setSinkId("downStreamNodeId");
        streamContext.setFinishedWindowId(-1L);
        streamContext.setBufferServerAddress(inetSocketAddress);
        streamContext.put(StreamContext.CODEC, defaultStatefulStreamCodec);
        streamContext.put(StreamContext.EVENT_LOOP, createEventLoop);
        StreamContext streamContext2 = new StreamContext("streamName");
        streamContext2.setSourceId("upstreamNodeId");
        streamContext2.setSinkId("downStreamNodeId");
        streamContext2.setBufferServerAddress(inetSocketAddress);
        streamContext2.put(StreamContext.CODEC, defaultStatefulStreamCodec);
        streamContext2.put(StreamContext.EVENT_LOOP, createEventLoop);
        BufferServerPublisher bufferServerPublisher = new BufferServerPublisher("upstreamNodeId", 1024);
        bufferServerPublisher.setup(streamContext2);
        bufferServerPublisher.activate(streamContext2);
        bufferServerPublisher.put(new Tuple(MessageType.BEGIN_WINDOW, 1L));
        byte[] serializedTuple = PayloadTuple.getSerializedTuple(0, 1);
        serializedTuple[serializedTuple.length - 1] = 1;
        bufferServerPublisher.put(serializedTuple);
        bufferServerPublisher.put(new EndWindowTuple(1L));
        bufferServerPublisher.put(new Tuple(MessageType.BEGIN_WINDOW, 2L));
        byte[] serializedTuple2 = PayloadTuple.getSerializedTuple(0, 1);
        serializedTuple2[serializedTuple2.length - 1] = 2;
        bufferServerPublisher.put(serializedTuple2);
        bufferServerPublisher.put(new EndWindowTuple(2L));
        bufferServerPublisher.put(new Tuple(MessageType.BEGIN_WINDOW, 3L));
        byte[] serializedTuple3 = PayloadTuple.getSerializedTuple(0, 1);
        serializedTuple3[serializedTuple3.length - 1] = 3;
        bufferServerPublisher.put(serializedTuple3);
        bufferServerPublisher.put(new EndWindowTuple(3L));
        bufferServerPublisher.put(new EndStreamTuple(0L));
        BufferServerSubscriber bufferServerSubscriber = new BufferServerSubscriber("downStreamNodeId", 1024);
        bufferServerSubscriber.setup(streamContext);
        genericNode.connectInputPort(GenericTestOperator.IPORT1, bufferServerSubscriber.acquireReservoir("testReservoir", 10));
        genericNode.connectOutputPort(GenericTestOperator.OPORT1, sink);
        SweepableReservoir acquireReservoir = bufferServerSubscriber.acquireReservoir("testReservoir2", 10);
        bufferServerSubscriber.activate(streamContext);
        while (acquireReservoir.sweep() == null) {
            Thread.sleep(100L);
        }
        genericNode.firstWindowMillis = 0L;
        genericNode.windowWidthMillis = 100L;
        Thread thread = new Thread() { // from class: com.datatorrent.stram.engine.GenericNodeTest.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                genericNode.activate();
                genericNode.run();
                genericNode.deactivate();
            }
        };
        thread.start();
        thread.join();
        Assert.assertEquals(10L, arrayBlockingQueue.size());
        ArrayList arrayList = new ArrayList(arrayBlockingQueue);
        Assert.assertEquals("Payload Tuple 1", 1L, ((byte[]) arrayList.get(1))[5]);
        Assert.assertEquals("Payload Tuple 2", 2L, ((byte[]) arrayList.get(4))[5]);
        Assert.assertEquals("Payload Tuple 3", 3L, ((byte[]) arrayList.get(7))[5]);
        if (server != null) {
            server.stop();
        }
        createEventLoop.stop();
    }

    @Test
    public void testPrematureTermination() throws InterruptedException {
        GenericOperator genericOperator = new GenericOperator();
        final GenericNode genericNode = new GenericNode(genericOperator, new OperatorContext(0, "operator", new Attribute.AttributeMap.DefaultAttributeMap(), (Context) null));
        genericNode.setId(1);
        AbstractReservoir newReservoir = AbstractReservoir.newReservoir("ip1Res", 1024);
        AbstractReservoir newReservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024);
        genericNode.connectInputPort("ip1", newReservoir);
        genericNode.connectInputPort("ip2", newReservoir2);
        genericNode.connectOutputPort("op", Sink.BLACKHOLE);
        genericNode.firstWindowMillis = 0L;
        genericNode.windowWidthMillis = 100L;
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: com.datatorrent.stram.engine.GenericNodeTest.5
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                atomicBoolean.set(true);
                genericNode.activate();
                genericNode.run();
                genericNode.deactivate();
            }
        };
        thread.start();
        long j = 0;
        do {
            Thread.sleep(25L);
            j += 25;
            if (atomicBoolean.get()) {
                break;
            }
        } while (j < 5000);
        int i = genericNode.controlTupleCount;
        Tuple tuple = new Tuple(MessageType.BEGIN_WINDOW, 1L);
        newReservoir.add(tuple);
        newReservoir2.add(tuple);
        long j2 = 0;
        do {
            Thread.sleep(25L);
            j2 += 25;
            if (genericNode.controlTupleCount != i) {
                break;
            }
        } while (j2 < 5000);
        Assert.assertTrue("Begin window called", genericOperator.endWindowId != genericOperator.beginWindowId);
        int i2 = genericNode.controlTupleCount;
        EndWindowTuple endWindowTuple = new EndWindowTuple(1L);
        newReservoir.add(endWindowTuple);
        newReservoir2.add(endWindowTuple);
        long j3 = 0;
        do {
            Thread.sleep(25L);
            j3 += 25;
            if (genericNode.controlTupleCount != i2) {
                break;
            }
        } while (j3 < 5000);
        Assert.assertTrue("End window called", genericOperator.endWindowId == genericOperator.beginWindowId);
        int i3 = genericNode.controlTupleCount;
        Tuple tuple2 = new Tuple(MessageType.BEGIN_WINDOW, 2L);
        newReservoir.add(tuple2);
        newReservoir2.add(tuple2);
        long j4 = 0;
        do {
            Thread.sleep(25L);
            j4 += 25;
            if (genericNode.controlTupleCount != i3) {
                break;
            }
        } while (j4 < 5000);
        genericNode.shutdown();
        thread.join();
        Assert.assertTrue("End window not called", genericOperator.endWindowId != genericOperator.beginWindowId);
    }

    @Test
    public void testControlTuplesDeliveryGenericNode() throws InterruptedException {
        final GenericNode genericNode = new GenericNode(new GenericOperator(), new OperatorContext(0, "operator", new Attribute.AttributeMap.DefaultAttributeMap(), (Context) null));
        genericNode.setId(1);
        AbstractReservoir newReservoir = AbstractReservoir.newReservoir("ip1Res", 1024);
        genericNode.connectInputPort("ip1", newReservoir);
        TestSink testSink = new TestSink();
        genericNode.connectOutputPort("op", testSink);
        genericNode.firstWindowMillis = 0L;
        genericNode.windowWidthMillis = 100L;
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: com.datatorrent.stram.engine.GenericNodeTest.6
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                atomicBoolean.set(true);
                genericNode.activate();
                genericNode.run();
                genericNode.deactivate();
            }
        };
        thread.start();
        long j = 0;
        do {
            Thread.sleep(25L);
            j += 25;
            if (atomicBoolean.get()) {
                break;
            }
        } while (j < 5000000);
        int i = genericNode.controlTupleCount;
        newReservoir.add(new Tuple(MessageType.BEGIN_WINDOW, 1L));
        long j2 = 0;
        do {
            Thread.sleep(25L);
            j2 += 25;
            if (genericNode.controlTupleCount != i) {
                break;
            }
        } while (j2 < 5000000);
        int i2 = genericNode.controlTupleCount;
        CustomControlTuple customControlTuple = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(1L, false));
        CustomControlTuple customControlTuple2 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(2L, true));
        CustomControlTuple customControlTuple3 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(3L, false));
        CustomControlTuple customControlTuple4 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(4L, true));
        newReservoir.add(customControlTuple);
        newReservoir.add(customControlTuple2);
        newReservoir.add(customControlTuple3);
        newReservoir.add(customControlTuple4);
        long j3 = 0;
        do {
            Thread.sleep(25L);
            j3 += 25;
            if (genericNode.controlTupleCount != i2) {
                break;
            }
        } while (j3 < 5000000);
        Assert.assertTrue("Custom control tuples emitted immediately", testSink.getResultCount() == 3);
        int i3 = genericNode.controlTupleCount;
        newReservoir.add(new Tuple(MessageType.END_WINDOW, 1L));
        long j4 = 0;
        do {
            Thread.sleep(25L);
            j4 += 25;
            if (genericNode.controlTupleCount != i3) {
                break;
            }
        } while (j4 < 5000000);
        genericNode.shutdown();
        thread.join();
        Assert.assertTrue("Total control tuples", testSink.getResultCount() == 6);
        long j5 = 0;
        Iterator<Object> it = testSink.collectedTuples.iterator();
        while (it.hasNext()) {
            if (it.next() instanceof CustomControlTuple) {
                j5++;
            }
        }
        Assert.assertTrue("Number of Custom control tuples", j5 == 4);
    }

    @Test
    public void testControlTuplesDeliveryOiONode() throws InterruptedException {
        OiONode oiONode = new OiONode(new GenericOperator(), new OperatorContext(0, "operator", new Attribute.AttributeMap.DefaultAttributeMap(), (Context) null));
        oiONode.setId(1);
        OiOStream.OiOReservoir reservoir = new OiOStream().getReservoir();
        reservoir.setControlSink(oiONode.getControlSink(reservoir));
        oiONode.connectInputPort("ip1", reservoir);
        Sink controlSink = oiONode.getControlSink(reservoir);
        TestSink testSink = new TestSink();
        oiONode.connectOutputPort("op", testSink);
        oiONode.firstWindowMillis = 0L;
        oiONode.windowWidthMillis = 100L;
        oiONode.activate();
        controlSink.put(new Tuple(MessageType.BEGIN_WINDOW, 1L));
        Assert.assertTrue("Begin window", testSink.getResultCount() == 1);
        CustomControlTuple customControlTuple = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(1L, false));
        CustomControlTuple customControlTuple2 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(2L, true));
        CustomControlTuple customControlTuple3 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(3L, false));
        CustomControlTuple customControlTuple4 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(4L, true));
        controlSink.put(customControlTuple);
        controlSink.put(customControlTuple2);
        controlSink.put(customControlTuple3);
        controlSink.put(customControlTuple4);
        Assert.assertTrue("Custom control tuples emitted immediately", testSink.getResultCount() == 3);
        controlSink.put(new Tuple(MessageType.END_WINDOW, 1L));
        oiONode.deactivate();
        oiONode.shutdown();
        Assert.assertTrue("Total control tuples", testSink.getResultCount() == 6);
        long j = 0;
        Iterator<Object> it = testSink.collectedTuples.iterator();
        while (it.hasNext()) {
            if (it.next() instanceof CustomControlTuple) {
                j++;
            }
        }
        Assert.assertTrue("Number of Custom control tuples", j == 4);
    }

    @Test
    public void testReservoirPortMapping() throws InterruptedException {
        final GenericNode genericNode = new GenericNode(new GenericOperator(), new OperatorContext(0, "operator", new Attribute.AttributeMap.DefaultAttributeMap(), (Context) null));
        genericNode.setId(1);
        AbstractReservoir newReservoir = AbstractReservoir.newReservoir("ip1Res", 1024);
        AbstractReservoir newReservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024);
        genericNode.connectInputPort("ip1", newReservoir);
        genericNode.connectInputPort("ip2", newReservoir2);
        genericNode.connectOutputPort("op", Sink.BLACKHOLE);
        genericNode.firstWindowMillis = 0L;
        genericNode.windowWidthMillis = 100L;
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: com.datatorrent.stram.engine.GenericNodeTest.7
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                atomicBoolean.set(true);
                genericNode.activate();
                genericNode.run();
                genericNode.deactivate();
            }
        };
        thread.start();
        long j = 0;
        do {
            Thread.sleep(25L);
            j += 25;
            if (atomicBoolean.get()) {
                break;
            }
        } while (j < 5000);
        genericNode.populateReservoirInputPortMap();
        genericNode.shutdown();
        thread.join();
        Assert.assertTrue("Port Mapping Size", genericNode.reservoirPortMap.size() == 2);
        Assert.assertTrue("Sink 1 is not a port", genericNode.reservoirPortMap.get(newReservoir) instanceof Operator.InputPort);
        Assert.assertTrue("Sink 2 is not a port", genericNode.reservoirPortMap.get(newReservoir2) instanceof Operator.InputPort);
    }

    @Test
    public void testDoubleCheckpointAtleastOnce() throws Exception {
        NodeTest.testDoubleCheckpointHandling(Operator.ProcessingMode.AT_LEAST_ONCE, true, this.testMeta.getDir());
    }

    @Test
    public void testDoubleCheckpointAtMostOnce() throws Exception {
        NodeTest.testDoubleCheckpointHandling(Operator.ProcessingMode.AT_MOST_ONCE, true, this.testMeta.getDir());
    }

    @Test
    public void testDoubleCheckpointExactlyOnce() throws Exception {
        NodeTest.testDoubleCheckpointHandling(Operator.ProcessingMode.EXACTLY_ONCE, true, this.testMeta.getDir());
    }

    @Test
    public void testCheckpointApplicationWindowCountAtleastOnce() throws Exception {
        testCheckpointApplicationWindowCount(Operator.ProcessingMode.AT_LEAST_ONCE);
    }

    @Test
    public void testCheckpointApplicationWindowCountAtMostOnce() throws Exception {
        testCheckpointApplicationWindowCount(Operator.ProcessingMode.AT_MOST_ONCE);
    }

    private void testCheckpointApplicationWindowCount(Operator.ProcessingMode processingMode) throws Exception {
        WindowGenerator windowGenerator = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024);
        windowGenerator.setResetWindow(0L);
        windowGenerator.setFirstWindow(1448909287863L);
        windowGenerator.setWindowWidth(100);
        windowGenerator.setCheckpointCount(1, 0);
        GenericOperator genericOperator = new GenericOperator();
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(OperatorContext.APPLICATION_WINDOW_COUNT, 5);
        defaultAttributeMap.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 5);
        defaultAttributeMap.put(OperatorContext.PROCESSING_MODE, processingMode);
        DelayAsyncFSStorageAgent delayAsyncFSStorageAgent = new DelayAsyncFSStorageAgent(this.testMeta.getDir(), new Configuration());
        delayAsyncFSStorageAgent.setDelayMS(200L);
        defaultAttributeMap.put(OperatorContext.STORAGE_AGENT, delayAsyncFSStorageAgent);
        TestStatsOperatorContext testStatsOperatorContext = new TestStatsOperatorContext(0, "operator", defaultAttributeMap, null);
        final GenericNode genericNode = new GenericNode(genericOperator, testStatsOperatorContext);
        genericNode.setId(1);
        TestSink testSink = new TestSink();
        genericNode.connectInputPort("ip1", windowGenerator.acquireReservoir(String.valueOf(genericNode.id), 1024));
        genericNode.connectOutputPort("output", testSink);
        genericNode.firstWindowMillis = 1448909287863L;
        genericNode.windowWidthMillis = 100;
        windowGenerator.activate((StreamContext) null);
        Thread thread = new Thread() { // from class: com.datatorrent.stram.engine.GenericNodeTest.8
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                genericNode.activate();
                genericNode.run();
                genericNode.deactivate();
            }
        };
        thread.start();
        long currentTimeMillis = System.currentTimeMillis();
        while (testStatsOperatorContext.checkpoints.size() < 8 && System.currentTimeMillis() - currentTimeMillis < 10000) {
            Thread.sleep(25L);
        }
        genericNode.shutdown();
        thread.join();
        windowGenerator.deactivate();
        Assert.assertTrue(!testStatsOperatorContext.checkpoints.isEmpty());
        for (int i = 0; i < testStatsOperatorContext.checkpoints.size(); i++) {
            if (testStatsOperatorContext.checkpoints.get(i) != null) {
                Assert.assertEquals(0L, testStatsOperatorContext.checkpoints.get(i).applicationWindowCount);
                Assert.assertEquals(0L, testStatsOperatorContext.checkpoints.get(i).checkpointWindowCount);
            }
        }
    }

    @Test
    public void testDefaultCheckPointDistance() throws InterruptedException {
        testCheckpointDistance(((Integer) Context.DAGContext.CHECKPOINT_WINDOW_COUNT.defaultValue).intValue(), ((Integer) Context.OperatorContext.CHECKPOINT_WINDOW_COUNT.defaultValue).intValue());
    }

    @Test
    public void testDAGGreaterCheckPointDistance() throws InterruptedException {
        testCheckpointDistance(7, 5);
    }

    @Test
    public void testOpGreaterCheckPointDistance() throws InterruptedException {
        testCheckpointDistance(3, 5);
    }

    private void testCheckpointDistance(int i, int i2) throws InterruptedException {
        long j = (50 * 60) + 5000;
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, "default");
        final WindowGenerator windowGenerator = new WindowGenerator(scheduledThreadPoolExecutor, 1024);
        windowGenerator.setWindowWidth(50);
        windowGenerator.setFirstWindow(scheduledThreadPoolExecutor.getCurrentTimeMillis());
        windowGenerator.setCheckpointCount(i, 0);
        CheckpointDistanceOperator checkpointDistanceOperator = new CheckpointDistanceOperator();
        checkpointDistanceOperator.maxWindows = 60;
        ArrayList arrayList = new ArrayList();
        int i3 = 0;
        while (i3 < 60) {
            i3 = ((int) Math.ceil((((int) Math.ceil((i3 + 1) / i)) * i) / i2)) * i2;
            arrayList.add(Integer.valueOf(i3));
        }
        final StreamContext streamContext = new StreamContext("s1");
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, Integer.valueOf(i));
        defaultAttributeMap.put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, Integer.valueOf(i2));
        final OperatorContext operatorContext = new OperatorContext(0, "operator", defaultAttributeMap, (Context) null);
        final GenericNode genericNode = new GenericNode(checkpointDistanceOperator, operatorContext);
        genericNode.setId(1);
        genericNode.connectInputPort("ip1", windowGenerator.acquireReservoir("ip1", 1024));
        genericNode.connectInputPort("ip2", windowGenerator.acquireReservoir("ip2", 1024));
        genericNode.connectOutputPort("op", Sink.BLACKHOLE);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: com.datatorrent.stram.engine.GenericNodeTest.9
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                genericNode.setup(operatorContext);
                windowGenerator.activate(streamContext);
                genericNode.activate();
                atomicBoolean.set(true);
                genericNode.run();
                windowGenerator.deactivate();
                genericNode.deactivate();
                genericNode.teardown();
            }
        };
        thread.start();
        long j2 = 0;
        do {
            Thread.sleep(25L);
            j2 += 25;
            if (checkpointDistanceOperator.numWindows >= 60) {
                break;
            }
        } while (j2 < j);
        Assert.assertEquals("Number distances", 60, checkpointDistanceOperator.numWindows);
        int i4 = 0 + 1;
        int intValue = ((Integer) arrayList.get(0)).intValue();
        for (int i5 = 0; i5 < 60; i5++) {
            if (i5 + 1 > intValue) {
                int i6 = i4;
                i4++;
                intValue = ((Integer) arrayList.get(i6)).intValue();
            }
            Assert.assertEquals("Windows from checkpoint for " + i5, intValue - i5, checkpointDistanceOperator.distances.get(i5).intValue());
        }
        genericNode.shutdown();
        thread.join();
    }
}
