package com.datatorrent.stram.engine;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.netlet.util.CircularBuffer;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/stram/engine/InputOperatorTest.class */
public class InputOperatorTest {
    static HashMap<String, List<?>> collections = new HashMap<>();
    static AtomicInteger tupleCount = new AtomicInteger();

    /* loaded from: input_file:com/datatorrent/stram/engine/InputOperatorTest$CollectorInputPort.class */
    public static class CollectorInputPort<T> extends DefaultInputPort<T> {
        ArrayList<T> list;
        final String id;

        public CollectorInputPort(String str, Operator operator) {
            this.id = str;
        }

        public void process(T t) {
            this.list.add(t);
            InputOperatorTest.tupleCount.incrementAndGet();
        }

        public void setConnected(boolean z) {
            if (z) {
                HashMap<String, List<?>> hashMap = InputOperatorTest.collections;
                String str = this.id;
                ArrayList<T> arrayList = new ArrayList<>();
                this.list = arrayList;
                hashMap.put(str, arrayList);
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/InputOperatorTest$CollectorModule.class */
    public static class CollectorModule<T> extends BaseOperator {
        public final transient CollectorInputPort<T> even = new CollectorInputPort<>("even", this);
        public final transient CollectorInputPort<T> odd = new CollectorInputPort<>("odd", this);
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/InputOperatorTest$EvenOddIntegerGeneratorInputOperator.class */
    public static class EvenOddIntegerGeneratorInputOperator implements InputOperator, Operator.ActivationListener<Context.OperatorContext> {
        public final transient DefaultOutputPort<Integer> even = new DefaultOutputPort<>();
        public final transient DefaultOutputPort<Integer> odd = new DefaultOutputPort<>();
        private final transient CircularBuffer<Integer> evenBuffer = new CircularBuffer<>(1024);
        private final transient CircularBuffer<Integer> oddBuffer = new CircularBuffer<>(1024);
        private volatile Thread dataGeneratorThread;

        public void beginWindow(long j) {
        }

        public void endWindow() {
        }

        public void setup(Context.OperatorContext operatorContext) {
        }

        public void teardown() {
        }

        public void activate(Context.OperatorContext operatorContext) {
            this.dataGeneratorThread = new Thread("Integer Emitter") { // from class: com.datatorrent.stram.engine.InputOperatorTest.EvenOddIntegerGeneratorInputOperator.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    int i = 0;
                    while (EvenOddIntegerGeneratorInputOperator.this.dataGeneratorThread != null) {
                        try {
                            CircularBuffer circularBuffer = i % 2 == 0 ? EvenOddIntegerGeneratorInputOperator.this.evenBuffer : EvenOddIntegerGeneratorInputOperator.this.oddBuffer;
                            int i2 = i;
                            i++;
                            circularBuffer.put(Integer.valueOf(i2));
                            Thread.sleep(20L);
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                }
            };
            this.dataGeneratorThread.start();
        }

        public void deactivate() {
            this.dataGeneratorThread = null;
        }

        public void emitTuples() {
            int size = this.evenBuffer.size();
            while (true) {
                int i = size;
                size--;
                if (i <= 0) {
                    break;
                } else {
                    this.even.emit(this.evenBuffer.pollUnsafe());
                }
            }
            int size2 = this.oddBuffer.size();
            while (true) {
                int i2 = size2;
                size2--;
                if (i2 <= 0) {
                    return;
                } else {
                    this.odd.emit(this.oddBuffer.pollUnsafe());
                }
            }
        }
    }

    @Test
    public void testSomeMethod() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new File("target").getAbsolutePath(), (Configuration) null));
        EvenOddIntegerGeneratorInputOperator addOperator = logicalPlan.addOperator("NumberGenerator", EvenOddIntegerGeneratorInputOperator.class);
        CollectorModule addOperator2 = logicalPlan.addOperator("NumberCollector", new CollectorModule());
        logicalPlan.addStream("EvenIntegers", addOperator.even, addOperator2.even).setLocality(DAG.Locality.CONTAINER_LOCAL);
        logicalPlan.addStream("OddIntegers", addOperator.odd, addOperator2.odd).setLocality(DAG.Locality.CONTAINER_LOCAL);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan);
        stramLocalCluster.setHeartbeatMonitoringEnabled(false);
        stramLocalCluster.runAsync();
        StramTestSupport.awaitCompletion(new StramTestSupport.WaitCondition() { // from class: com.datatorrent.stram.engine.InputOperatorTest.1
            @Override // com.datatorrent.stram.support.StramTestSupport.WaitCondition
            public boolean isComplete() {
                return InputOperatorTest.tupleCount.get() > 2;
            }
        }, 2000L);
        stramLocalCluster.shutdown();
        Assert.assertEquals("Collections size", 2L, collections.size());
        Assert.assertFalse("Zero tuple count", collections.get(addOperator2.even.id).isEmpty() && collections.get(addOperator2.odd.id).isEmpty());
        Assert.assertTrue("Tuple count", collections.get(addOperator2.even.id).size() - collections.get(addOperator2.odd.id).size() <= 1);
    }
}
