package com.datatorrent.stram.engine;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.engine.GenericNodeTest;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/stram/engine/NodeTest.class */
public class NodeTest {

    /* loaded from: input_file:com/datatorrent/stram/engine/NodeTest$CheckpointTestOperator.class */
    private static class CheckpointTestOperator extends GenericNodeTest.GenericOperator implements Operator.CheckpointNotificationListener {
        public Set<Long> checkpointedWindows;
        public volatile boolean checkpointTwice;
        public volatile int numWindows;

        private CheckpointTestOperator() {
            this.checkpointedWindows = Sets.newHashSet();
            this.checkpointTwice = false;
            this.numWindows = 0;
        }

        @Override // com.datatorrent.stram.engine.GenericNodeTest.GenericOperator
        public void endWindow() {
            super.endWindow();
            this.numWindows++;
        }

        public void checkpointed(long j) {
            this.checkpointTwice = this.checkpointTwice || !this.checkpointedWindows.add(Long.valueOf(j));
        }

        public void committed(long j) {
        }

        public void beforeCheckpoint(long j) {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/NodeTest$InputCheckpointTestOperator.class */
    private static class InputCheckpointTestOperator extends CheckpointTestOperator implements InputOperator {
        private InputCheckpointTestOperator() {
            super();
        }

        public void emitTuples() {
        }
    }

    @Stateless
    /* loaded from: input_file:com/datatorrent/stram/engine/NodeTest$StatelessOperator.class */
    public static class StatelessOperator implements Operator {
        public void beginWindow(long j) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        public void endWindow() {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        public void setup(Context.OperatorContext operatorContext) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        public void teardown() {
            throw new UnsupportedOperationException("Not supported yet.");
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/NodeTest$StorageAgentImpl.class */
    public static class StorageAgentImpl implements StorageAgent {
        static final ArrayList<Call> calls = new ArrayList<>();

        /* loaded from: input_file:com/datatorrent/stram/engine/NodeTest$StorageAgentImpl$Call.class */
        static class Call {
            Call(String str, int i, long j) {
            }
        }

        public void save(Object obj, int i, long j) throws IOException {
            calls.add(new Call("getSaveStream", i, j));
        }

        public Object load(int i, long j) throws IOException {
            calls.add(new Call("getLoadStream", i, j));
            return null;
        }

        public void delete(int i, long j) throws IOException {
            calls.add(new Call("delete", i, j));
        }

        public long[] getWindowIds(int i) throws IOException {
            calls.add(new Call("getWindowsIds", i, 0L));
            return new long[0];
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/NodeTest$TestGenericOperator.class */
    static class TestGenericOperator implements Operator {
        static int beginWindows;
        static int endWindows;

        TestGenericOperator() {
        }

        public void beginWindow(long j) {
            beginWindows++;
        }

        public void endWindow() {
            endWindows++;
        }

        public void setup(Context.OperatorContext operatorContext) {
            beginWindows = 0;
            endWindows = 0;
        }

        public void teardown() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/NodeTest$TestInputOperator.class */
    static class TestInputOperator implements InputOperator {
        static int beginWindows;
        static int endWindows;

        TestInputOperator() {
        }

        public void emitTuples() {
        }

        public void beginWindow(long j) {
            beginWindows++;
        }

        public void endWindow() {
            endWindows++;
        }

        public void setup(Context.OperatorContext operatorContext) {
            beginWindows = 0;
            endWindows = 0;
        }

        public void teardown() {
        }
    }

    @Test
    @Ignore
    public void testStreamingWindowGenericNode() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 10);
        logicalPlan.addOperator("GenericOperator", new TestGenericOperator());
        new StramLocalCluster(logicalPlan).run(2000L);
    }

    @Test
    public void testStatelessOperatorCheckpointing() {
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.OperatorContext.STORAGE_AGENT, new StorageAgentImpl());
        defaultAttributeMap.put(Context.OperatorContext.STATELESS, true);
        Node<StatelessOperator> node = new Node<StatelessOperator>(new StatelessOperator(), new OperatorContext(0, "operator", defaultAttributeMap, (Context) null)) { // from class: com.datatorrent.stram.engine.NodeTest.1
            public void connectInputPort(String str, SweepableReservoir sweepableReservoir) {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            public void run() {
                throw new UnsupportedOperationException("Not supported yet.");
            }
        };
        node.activate();
        synchronized (StorageAgentImpl.calls) {
            StorageAgentImpl.calls.clear();
            node.checkpoint(0L);
            Assert.assertEquals("Calls to StorageAgent", 0L, StorageAgentImpl.calls.size());
        }
        node.deactivate();
    }

    @Test
    public void testOperatorCheckpointing() {
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.OperatorContext.STORAGE_AGENT, new StorageAgentImpl());
        Node<TestGenericOperator> node = new Node<TestGenericOperator>(new TestGenericOperator(), new OperatorContext(0, "operator", defaultAttributeMap, (Context) null)) { // from class: com.datatorrent.stram.engine.NodeTest.2
            public void connectInputPort(String str, SweepableReservoir sweepableReservoir) {
                throw new UnsupportedOperationException("Not supported yet.");
            }

            public void run() {
                throw new UnsupportedOperationException("Not supported yet.");
            }
        };
        node.activate();
        synchronized (StorageAgentImpl.calls) {
            StorageAgentImpl.calls.clear();
            node.checkpoint(0L);
            Assert.assertEquals("Calls to StorageAgent", 1L, StorageAgentImpl.calls.size());
        }
        node.deactivate();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v45, types: [com.datatorrent.stram.engine.NodeTest$CheckpointTestOperator] */
    public static void testDoubleCheckpointHandling(Operator.ProcessingMode processingMode, boolean z, String str) throws Exception {
        Attribute.AttributeMap attributeMap;
        GenericNode inputNode;
        WindowGenerator windowGenerator = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024);
        windowGenerator.setResetWindow(0L);
        windowGenerator.setFirstWindow(0L);
        windowGenerator.setWindowWidth(100);
        windowGenerator.setCheckpointCount(1, 0);
        InputCheckpointTestOperator checkpointTestOperator = z ? new CheckpointTestOperator() : new InputCheckpointTestOperator();
        Attribute.AttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(OperatorContext.APPLICATION_WINDOW_COUNT, 2);
        defaultAttributeMap.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 2);
        defaultAttributeMap.put(OperatorContext.PROCESSING_MODE, processingMode);
        defaultAttributeMap.put(OperatorContext.STORAGE_AGENT, new FSStorageAgent(str, new Configuration()));
        if (z) {
            attributeMap = defaultAttributeMap;
            inputNode = new GenericNode(checkpointTestOperator, new OperatorContext(0, "operator", attributeMap, (Context) null));
        } else {
            attributeMap = defaultAttributeMap;
            inputNode = new InputNode(checkpointTestOperator, new OperatorContext(0, "operator", attributeMap, (Context) null));
        }
        inputNode.setId(1);
        TestSink testSink = new TestSink();
        inputNode.connectInputPort(z ? "ip1" : "input", windowGenerator.acquireReservoir(String.valueOf(((Node) inputNode).id), 1024));
        inputNode.connectOutputPort("output", testSink);
        ((Node) inputNode).firstWindowMillis = 0L;
        ((Node) inputNode).windowWidthMillis = 100L;
        windowGenerator.activate((StreamContext) null);
        final GenericNode genericNode = inputNode;
        Thread thread = new Thread() { // from class: com.datatorrent.stram.engine.NodeTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                genericNode.activate();
                genericNode.run();
                genericNode.deactivate();
            }
        };
        thread.start();
        long currentTimeMillis = System.currentTimeMillis();
        long j = 0;
        while (checkpointTestOperator.numWindows < 3) {
            long currentTimeMillis2 = System.currentTimeMillis();
            j = currentTimeMillis2;
            if (currentTimeMillis2 - currentTimeMillis >= 6000) {
                break;
            } else {
                Thread.sleep(50L);
            }
        }
        inputNode.shutdown();
        thread.join();
        windowGenerator.deactivate();
        Assert.assertFalse(checkpointTestOperator.checkpointTwice);
        Assert.assertTrue("Timed out", j - currentTimeMillis < 5000);
    }
}
