package com.datatorrent.stram.stream;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Sink;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.engine.AbstractReservoir;
import com.datatorrent.stram.engine.GenericNode;
import com.datatorrent.stram.engine.Node;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.engine.StreamContext;
import com.datatorrent.stram.engine.SweepableReservoir;
import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.tuple.Tuple;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/stream/InlineStreamTest.class */
public class InlineStreamTest {
    private Object prev;
    final AtomicInteger counter = new AtomicInteger(0);
    private static final Logger logger = LoggerFactory.getLogger(InlineStreamTest.class);

    /* loaded from: input_file:com/datatorrent/stram/stream/InlineStreamTest$PassThroughNode.class */
    public static class PassThroughNode<T> extends BaseOperator {
        public final DefaultInputPort<T> input = new DefaultInputPort<T>() { // from class: com.datatorrent.stram.stream.InlineStreamTest.PassThroughNode.1
            public void process(T t) {
                PassThroughNode.this.output.emit(t);
            }
        };
        public final DefaultOutputPort<T> output = new DefaultOutputPort<>();
        private boolean logMessages = false;

        public boolean isLogMessages() {
            return this.logMessages;
        }

        public void setLogMessages(boolean z) {
            this.logMessages = z;
        }
    }

    @Test
    public void test() throws Exception {
        PassThroughNode passThroughNode = new PassThroughNode();
        GenericNode genericNode = new GenericNode(passThroughNode, new OperatorContext(1, "operator1", new Attribute.AttributeMap.DefaultAttributeMap(), (Context) null));
        genericNode.setId(1);
        passThroughNode.setup(genericNode.context);
        PassThroughNode passThroughNode2 = new PassThroughNode();
        GenericNode genericNode2 = new GenericNode(passThroughNode2, new OperatorContext(2, "operator2", new Attribute.AttributeMap.DefaultAttributeMap(), (Context) null));
        genericNode2.setId(2);
        passThroughNode2.setup(genericNode2.context);
        StreamContext streamContext = new StreamContext("node1->node2");
        final InlineStream inlineStream = new InlineStream(1024);
        inlineStream.setup(streamContext);
        genericNode.connectOutputPort("output", inlineStream);
        genericNode2.connectInputPort("input", inlineStream.getReservoir());
        this.prev = null;
        genericNode2.connectOutputPort("output", new Sink<Object>() { // from class: com.datatorrent.stram.stream.InlineStreamTest.1
            public void put(Object obj) {
                if (obj instanceof Tuple) {
                    return;
                }
                if (InlineStreamTest.this.prev == null) {
                    InlineStreamTest.this.prev = obj;
                } else {
                    if (Integer.valueOf(obj.toString()).intValue() - Integer.valueOf(InlineStreamTest.this.prev.toString()).intValue() != 1) {
                        synchronized (InlineStreamTest.this) {
                            InlineStreamTest.this.notify();
                        }
                    }
                    InlineStreamTest.this.prev = obj;
                }
                if (Integer.valueOf(InlineStreamTest.this.prev.toString()).intValue() == 4999) {
                    synchronized (InlineStreamTest.this) {
                        InlineStreamTest.this.notify();
                    }
                }
            }

            public int getCount(boolean z) {
                return 0;
            }
        });
        AbstractReservoir newReservoir = AbstractReservoir.newReservoir("input", 5120);
        genericNode.connectInputPort("input", newReservoir);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        launchNodeThread(genericNode, concurrentHashMap);
        launchNodeThread(genericNode2, concurrentHashMap);
        inlineStream.activate(streamContext);
        newReservoir.put(StramTestSupport.generateBeginWindowTuple("irrelevant", 0));
        for (int i = 0; i < 5000; i++) {
            newReservoir.put(Integer.valueOf(i));
        }
        newReservoir.put(StramTestSupport.generateEndWindowTuple("irrelevant", 0));
        synchronized (this) {
            wait(200L);
        }
        Assert.assertNotNull(this.prev);
        Assert.assertEquals("processing complete", 5000L, Integer.valueOf(this.prev.toString()).intValue() + 1);
        Assert.assertEquals("active operators", 2L, concurrentHashMap.size());
        Assert.assertTrue("operator should finish processing all events within 1 second", StramTestSupport.awaitCompletion(new StramTestSupport.WaitCondition() { // from class: com.datatorrent.stram.stream.InlineStreamTest.2
            @Override // com.datatorrent.stram.support.StramTestSupport.WaitCondition
            public boolean isComplete() {
                SweepableReservoir reservoir = inlineStream.getReservoir();
                InlineStreamTest.logger.debug("stream {} empty {}, size {}", new Object[]{inlineStream, Boolean.valueOf(reservoir.isEmpty()), Integer.valueOf(reservoir.size(false))});
                return reservoir.isEmpty();
            }
        }, 1000L));
        inlineStream.deactivate();
        Iterator<Node<?>> it = concurrentHashMap.values().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        for (int i2 = 0; i2 < 10; i2++) {
            Thread.sleep(20L);
            if (concurrentHashMap.isEmpty()) {
                break;
            }
        }
        inlineStream.teardown();
        passThroughNode2.teardown();
        passThroughNode.teardown();
        Assert.assertEquals("active operators", 0L, concurrentHashMap.size());
    }

    private void launchNodeThread(final Node<?> node, final Map<Integer, Node<?>> map) {
        new Thread(new Runnable() { // from class: com.datatorrent.stram.stream.InlineStreamTest.3
            @Override // java.lang.Runnable
            public void run() {
                int incrementAndGet = InlineStreamTest.this.counter.incrementAndGet();
                map.put(Integer.valueOf(incrementAndGet), node);
                node.activate();
                node.run();
                node.deactivate();
                map.remove(Integer.valueOf(incrementAndGet));
            }
        }).start();
    }
}
