package com.datatorrent.stram.engine;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Sink;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.stram.tuple.EndStreamTuple;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/stram/engine/GenericNodeTest.class */
public class GenericNodeTest {

    /* loaded from: input_file:com/datatorrent/stram/engine/GenericNodeTest$GenericOperator.class */
    public static class GenericOperator implements Operator {
        long beginWindowId;
        long endWindowId;
        public final transient DefaultInputPort<Object> ip1 = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.engine.GenericNodeTest.GenericOperator.1
            public void process(Object obj) {
                GenericOperator.this.op.emit(obj);
            }
        };

        @InputPortFieldAnnotation(optional = true)
        public final transient DefaultInputPort<Object> ip2 = new DefaultInputPort<Object>() { // from class: com.datatorrent.stram.engine.GenericNodeTest.GenericOperator.2
            public void process(Object obj) {
                GenericOperator.this.op.emit(obj);
            }
        };

        @OutputPortFieldAnnotation(optional = true)
        DefaultOutputPort<Object> op = new DefaultOutputPort<>();

        public void beginWindow(long j) {
            this.beginWindowId = j;
        }

        public void endWindow() {
            this.endWindowId = this.beginWindowId;
        }

        public void setup(Context.OperatorContext operatorContext) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        public void teardown() {
            throw new UnsupportedOperationException("Not supported yet.");
        }
    }

    @Test
    public void testSynchingLogic() throws InterruptedException {
        final ArrayList arrayList = new ArrayList();
        final GenericNode genericNode = new GenericNode(new GenericOperator(), new OperatorContext(0, new Attribute.AttributeMap.DefaultAttributeMap(), (Context) null));
        genericNode.setId(1);
        DefaultReservoir defaultReservoir = new DefaultReservoir("ip1Res", 1024);
        DefaultReservoir defaultReservoir2 = new DefaultReservoir("ip2Res", 1024);
        Sink<Object> sink = new Sink<Object>() { // from class: com.datatorrent.stram.engine.GenericNodeTest.1
            public void put(Object obj) {
                arrayList.add(obj);
            }

            public int getCount(boolean z) {
                return 0;
            }
        };
        genericNode.connectInputPort("ip1", defaultReservoir);
        genericNode.connectInputPort("ip2", defaultReservoir2);
        genericNode.connectOutputPort("op", sink);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: com.datatorrent.stram.engine.GenericNodeTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                atomicBoolean.set(true);
                genericNode.activate();
                genericNode.run();
                genericNode.deactivate();
            }
        };
        thread.start();
        do {
            Thread.sleep(25L);
        } while (!atomicBoolean.get());
        Tuple tuple = new Tuple(MessageType.BEGIN_WINDOW, 1L);
        defaultReservoir.add(tuple);
        Thread.sleep(25L);
        Assert.assertEquals(1L, arrayList.size());
        defaultReservoir2.add(tuple);
        Thread.sleep(25L);
        Assert.assertEquals(1L, arrayList.size());
        EndWindowTuple endWindowTuple = new EndWindowTuple(1L);
        defaultReservoir.add(endWindowTuple);
        Thread.sleep(25L);
        Assert.assertEquals(1L, arrayList.size());
        Tuple tuple2 = new Tuple(MessageType.BEGIN_WINDOW, 2L);
        defaultReservoir.add(tuple2);
        Thread.sleep(25L);
        Assert.assertEquals(1L, arrayList.size());
        defaultReservoir2.add(endWindowTuple);
        Thread.sleep(25L);
        Assert.assertEquals(3L, arrayList.size());
        defaultReservoir2.add(tuple2);
        Thread.sleep(25L);
        Assert.assertEquals(3L, arrayList.size());
        EndWindowTuple endWindowTuple2 = new EndWindowTuple(2L);
        defaultReservoir2.add(endWindowTuple2);
        Thread.sleep(25L);
        Assert.assertEquals(3L, arrayList.size());
        defaultReservoir.add(endWindowTuple2);
        Thread.sleep(25L);
        Assert.assertEquals(4L, arrayList.size());
        EndStreamTuple endStreamTuple = new EndStreamTuple(0L);
        defaultReservoir.add(endStreamTuple);
        Thread.sleep(25L);
        Assert.assertEquals(4L, arrayList.size());
        defaultReservoir2.add(new Tuple(MessageType.BEGIN_WINDOW, 3L));
        Thread.sleep(25L);
        Assert.assertEquals(5L, arrayList.size());
        defaultReservoir2.add(new EndWindowTuple(3L));
        Thread.sleep(25L);
        Assert.assertEquals(6L, arrayList.size());
        Assert.assertNotSame(Thread.State.TERMINATED, thread.getState());
        defaultReservoir2.add(endStreamTuple);
        Thread.sleep(25L);
        Assert.assertEquals(7L, arrayList.size());
        Thread.sleep(25L);
        Assert.assertEquals(Thread.State.TERMINATED, thread.getState());
    }
}
