package com.datatorrent.stram.engine;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Sink;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.stram.engine.GenericNodeTest;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.ResetWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
import com.google.common.collect.Sets;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/engine/InputNodeTest.class */
public class InputNodeTest {

    @Rule
    public GenericNodeTest.FSTestWatcher testMeta = new GenericNodeTest.FSTestWatcher();
    private static final Logger LOG = LoggerFactory.getLogger(InputNodeTest.class);

    /* loaded from: input_file:com/datatorrent/stram/engine/InputNodeTest$InputCheckpointOperator.class */
    public static class InputCheckpointOperator extends GenericNodeTest.GenericCheckpointOperator implements InputOperator {
        public Set<Long> checkpointedWindows = Sets.newHashSet();
        public volatile boolean checkpointTwice = false;
        public volatile int numWindows = 0;

        @Override // com.datatorrent.stram.engine.GenericNodeTest.GenericCheckpointOperator, com.datatorrent.stram.engine.GenericNodeTest.GenericOperator
        public void beginWindow(long j) {
            super.beginWindow(j);
        }

        @Override // com.datatorrent.stram.engine.GenericNodeTest.GenericCheckpointOperator, com.datatorrent.stram.engine.GenericNodeTest.GenericOperator
        public void endWindow() {
            super.endWindow();
        }

        @Override // com.datatorrent.stram.engine.GenericNodeTest.GenericCheckpointOperator
        public void checkpointed(long j) {
            super.checkpointed(j);
        }

        @Override // com.datatorrent.stram.engine.GenericNodeTest.GenericCheckpointOperator
        public void committed(long j) {
            super.committed(j);
        }

        public void emitTuples() {
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/InputNodeTest$TestInputOperator.class */
    public static class TestInputOperator implements InputOperator, Operator.IdleTimeHandler {
        public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
        public boolean trueEmitTuplesFalseHandleIdleTime = true;
        private long lastTimestamp;

        public void emitTuples() {
            if (this.trueEmitTuplesFalseHandleIdleTime) {
                emit(100L);
            }
        }

        public void beginWindow(long j) {
        }

        public void endWindow() {
        }

        public void setup(Context.OperatorContext operatorContext) {
        }

        public void teardown() {
        }

        public void handleIdleTime() {
            if (this.trueEmitTuplesFalseHandleIdleTime) {
                return;
            }
            emit(100L);
        }

        private void emit(long j) {
            if (System.currentTimeMillis() - this.lastTimestamp > j) {
                this.lastTimestamp = System.currentTimeMillis();
                this.output.emit(1L);
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/InputNodeTest$TestWindowGenerator.class */
    public static class TestWindowGenerator implements SweepableReservoir {
        private Tuple currentTuple;
        private long lastTime;
        private static final Logger LOG = LoggerFactory.getLogger(TestWindowGenerator.class);
        private final long baseSeconds = (System.currentTimeMillis() / 1000) << 32;
        private long windowId = 0;
        private Sink<Object> oldSink = null;
        private State currentState = State.RESET_WINDOW_NO_TUPLE;

        /* loaded from: input_file:com/datatorrent/stram/engine/InputNodeTest$TestWindowGenerator$State.class */
        public enum State {
            RESET_WINDOW_NO_TUPLE,
            RESET_WINDOW_TUPLE,
            BEGIN_WINDOW,
            END_WINDOW
        }

        public Sink<Object> setSink(Sink<Object> sink) {
            Sink<Object> sink2 = this.oldSink;
            this.oldSink = sink;
            return sink2;
        }

        public Tuple sweep() {
            switch (this.currentState) {
                case RESET_WINDOW_NO_TUPLE:
                    this.currentTuple = new ResetWindowTuple(this.baseSeconds | 500);
                    this.currentState = State.RESET_WINDOW_TUPLE;
                    break;
                case RESET_WINDOW_TUPLE:
                    if (this.currentTuple == null) {
                        this.currentState = State.BEGIN_WINDOW;
                        break;
                    }
                    break;
                case BEGIN_WINDOW:
                    if (System.currentTimeMillis() - this.lastTime > 1000) {
                        this.lastTime = System.currentTimeMillis();
                        this.windowId++;
                        this.currentTuple = new Tuple(MessageType.BEGIN_WINDOW, this.baseSeconds | this.windowId);
                        this.currentState = State.END_WINDOW;
                        break;
                    }
                    break;
                case END_WINDOW:
                    this.currentTuple = new EndWindowTuple(this.baseSeconds | this.windowId);
                    this.currentState = State.BEGIN_WINDOW;
                    break;
            }
            return this.currentTuple;
        }

        public int getCount(boolean z) {
            return 0;
        }

        public int size(boolean z) {
            return this.currentTuple != null ? 1 : 0;
        }

        public boolean isEmpty() {
            return this.currentTuple == null;
        }

        public Object remove() {
            Tuple tuple = this.currentTuple;
            this.currentTuple = null;
            return tuple;
        }
    }

    @Test
    public void testEmitTuplesOutsideStreamingWindow() throws Exception {
        emitTestHelper(true);
    }

    @Test
    public void testHandleIdleTimeOutsideStreamingWindow() throws Exception {
        emitTestHelper(false);
    }

    private void emitTestHelper(boolean z) throws Exception {
        TestInputOperator testInputOperator = new TestInputOperator();
        testInputOperator.trueEmitTuplesFalseHandleIdleTime = z;
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.OperatorContext.APPLICATION_WINDOW_COUNT, 10);
        defaultAttributeMap.put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 10);
        final InputNode inputNode = new InputNode(testInputOperator, new OperatorContext(0, defaultAttributeMap, (Context) null));
        inputNode.setId(1);
        TestSink testSink = new TestSink();
        inputNode.connectInputPort("input", new TestWindowGenerator());
        inputNode.connectOutputPort("output", testSink);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: com.datatorrent.stram.engine.InputNodeTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                atomicBoolean.set(true);
                inputNode.activate();
                inputNode.run();
                inputNode.deactivate();
            }
        };
        thread.start();
        Thread.sleep(3000L);
        thread.stop();
        Assert.assertTrue("Should have emitted some tuples", testSink.collectedTuples.size() > 0);
        boolean z2 = false;
        Iterator<Object> it = testSink.collectedTuples.iterator();
        while (it.hasNext()) {
            Object next = it.next();
            if (next instanceof Tuple) {
                MessageType type = ((Tuple) next).getType();
                if (type == MessageType.RESET_WINDOW) {
                    Assert.assertFalse(z2);
                } else if (type == MessageType.BEGIN_WINDOW) {
                    Assert.assertFalse(z2);
                    z2 = true;
                } else if (type == MessageType.END_WINDOW) {
                    Assert.assertTrue(z2);
                    z2 = false;
                }
            } else {
                Assert.assertTrue(z2);
            }
        }
    }

    @Test
    public void testDoubleCheckpointAtleastOnce() throws Exception {
        NodeTest.testDoubleCheckpointHandling(Operator.ProcessingMode.AT_LEAST_ONCE, false, this.testMeta.getDir());
    }

    @Test
    public void testDoubleCheckpointAtMostOnce() throws Exception {
        NodeTest.testDoubleCheckpointHandling(Operator.ProcessingMode.AT_MOST_ONCE, false, this.testMeta.getDir());
    }

    @Test
    public void testDoubleCheckpointExactlyOnce() throws Exception {
        NodeTest.testDoubleCheckpointHandling(Operator.ProcessingMode.EXACTLY_ONCE, false, this.testMeta.getDir());
    }
}
