package com.datatorrent.stram;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.common.util.BaseOperator;
import java.util.concurrent.Callable;
import javax.validation.ConstraintViolationException;
import org.apache.apex.api.ControlAwareDefaultInputPort;
import org.apache.apex.api.ControlAwareDefaultOutputPort;
import org.apache.apex.api.operator.ControlTuple;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/CustomControlTupleTest.class */
public class CustomControlTupleTest {
    public static final Logger LOG = LoggerFactory.getLogger(CustomControlTupleTest.class);
    private static long controlIndex = 0;
    private static int numControlTuples = 0;
    private static boolean done = false;
    private static boolean endApp = false;
    private static long endingWindowId = 0;
    private static boolean immediate = false;

    @ApplicationAnnotation(name = "TestDefaultPropagation")
    /* loaded from: input_file:com/datatorrent/stram/CustomControlTupleTest$Application1.class */
    public static class Application1 implements StreamingApplication {
        public void populateDAG(DAG dag, Configuration configuration) {
            Generator addOperator = dag.addOperator("randomGenerator", Generator.class);
            DefaultProcessor addOperator2 = dag.addOperator("process", DefaultProcessor.class);
            ControlAwareReceiver addOperator3 = dag.addOperator("receiver", ControlAwareReceiver.class);
            dag.addStream("genToProcessor", addOperator.out, addOperator2.input);
            dag.addStream("ProcessorToReceiver", addOperator2.output, addOperator3.input);
        }
    }

    @ApplicationAnnotation(name = "TestExplicitPropagation")
    /* loaded from: input_file:com/datatorrent/stram/CustomControlTupleTest$Application2.class */
    public static class Application2 implements StreamingApplication {
        public void populateDAG(DAG dag, Configuration configuration) {
            Generator addOperator = dag.addOperator("randomGenerator", Generator.class);
            ControlAwareProcessor addOperator2 = dag.addOperator("process", ControlAwareProcessor.class);
            ControlAwareReceiver addOperator3 = dag.addOperator("receiver", ControlAwareReceiver.class);
            dag.addStream("genToProcessor", addOperator.out, addOperator2.input);
            dag.addStream("ProcessorToReceiver", addOperator2.output, addOperator3.input);
        }
    }

    @ApplicationAnnotation(name = "TestDuplicateControlTuples")
    /* loaded from: input_file:com/datatorrent/stram/CustomControlTupleTest$Application3.class */
    public static class Application3 implements StreamingApplication {
        public void populateDAG(DAG dag, Configuration configuration) {
            Generator addOperator = dag.addOperator("randomGenerator", Generator.class);
            DefaultProcessor addOperator2 = dag.addOperator("process", DefaultProcessor.class);
            ControlAwareReceiver addOperator3 = dag.addOperator("receiver", ControlAwareReceiver.class);
            dag.addStream("genToProcessor", addOperator.out, addOperator2.input);
            dag.addStream("ProcessorToReceiver", addOperator2.output, addOperator3.input);
            dag.setOperatorAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        }
    }

    @ApplicationAnnotation(name = "TestThreadLocal")
    /* loaded from: input_file:com/datatorrent/stram/CustomControlTupleTest$Application4.class */
    public static class Application4 implements StreamingApplication {
        public void populateDAG(DAG dag, Configuration configuration) {
            Generator addOperator = dag.addOperator("randomGenerator", Generator.class);
            DefaultProcessor addOperator2 = dag.addOperator("process", DefaultProcessor.class);
            ControlAwareReceiver addOperator3 = dag.addOperator("receiver", ControlAwareReceiver.class);
            dag.addStream("genToProcessor", addOperator.out, addOperator2.input).setLocality(DAG.Locality.THREAD_LOCAL);
            dag.addStream("ProcessorToReceiver", addOperator2.output, addOperator3.input).setLocality(DAG.Locality.THREAD_LOCAL);
        }
    }

    @ApplicationAnnotation(name = "TestContainerLocal")
    /* loaded from: input_file:com/datatorrent/stram/CustomControlTupleTest$Application5.class */
    public static class Application5 implements StreamingApplication {
        public void populateDAG(DAG dag, Configuration configuration) {
            Generator addOperator = dag.addOperator("randomGenerator", Generator.class);
            DefaultProcessor addOperator2 = dag.addOperator("process", DefaultProcessor.class);
            ControlAwareReceiver addOperator3 = dag.addOperator("receiver", ControlAwareReceiver.class);
            dag.addStream("genToProcessor", addOperator.out, addOperator2.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
            dag.addStream("ProcessorToReceiver", addOperator2.output, addOperator3.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/CustomControlTupleTest$ControlAwareProcessor.class */
    public static class ControlAwareProcessor extends BaseOperator {
        public final transient ControlAwareDefaultInputPort<Double> input = new ControlAwareDefaultInputPort<Double>() { // from class: com.datatorrent.stram.CustomControlTupleTest.ControlAwareProcessor.1
            public void process(Double d) {
                ControlAwareProcessor.this.output.emit(d);
            }

            public boolean processControl(ControlTuple controlTuple) {
                ControlAwareProcessor.this.output.emitControl(controlTuple);
                return true;
            }
        };
        public final transient ControlAwareDefaultOutputPort<Double> output = new ControlAwareDefaultOutputPort<>();
    }

    /* loaded from: input_file:com/datatorrent/stram/CustomControlTupleTest$ControlAwareReceiver.class */
    public static class ControlAwareReceiver extends BaseOperator {
        private long currentWindowId;
        public final transient ControlAwareDefaultInputPort<Double> input = new ControlAwareDefaultInputPort<Double>() { // from class: com.datatorrent.stram.CustomControlTupleTest.ControlAwareReceiver.1
            public boolean processControl(ControlTuple controlTuple) {
                CustomControlTupleTest.access$408();
                return false;
            }

            public void process(Double d) {
            }
        };

        public void beginWindow(long j) {
            this.currentWindowId = j;
        }

        public void endWindow() {
            if (!CustomControlTupleTest.done || this.currentWindowId <= CustomControlTupleTest.endingWindowId) {
                return;
            }
            boolean unused = CustomControlTupleTest.endApp = true;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/CustomControlTupleTest$DefaultProcessor.class */
    public static class DefaultProcessor extends BaseOperator {
        public final transient DefaultInputPort<Double> input = new DefaultInputPort<Double>() { // from class: com.datatorrent.stram.CustomControlTupleTest.DefaultProcessor.1
            public void process(Double d) {
                DefaultProcessor.this.output.emit(d);
            }
        };
        public final transient DefaultOutputPort<Double> output = new DefaultOutputPort<>();
    }

    /* loaded from: input_file:com/datatorrent/stram/CustomControlTupleTest$Generator.class */
    public static class Generator extends BaseOperator implements InputOperator {
        private long currentWindowId;
        public final transient ControlAwareDefaultOutputPort<Double> out = new ControlAwareDefaultOutputPort<>();

        public void beginWindow(long j) {
            if (CustomControlTupleTest.done) {
                return;
            }
            this.currentWindowId = j;
            this.out.emitControl(new TestControlTuple(CustomControlTupleTest.access$108(), CustomControlTupleTest.immediate));
        }

        public void emitTuples() {
            if (CustomControlTupleTest.done) {
                return;
            }
            this.out.emitControl(new TestControlTuple(CustomControlTupleTest.access$108(), CustomControlTupleTest.immediate));
        }

        public void endWindow() {
            if (CustomControlTupleTest.done) {
                return;
            }
            this.out.emitControl(new TestControlTuple(CustomControlTupleTest.access$108(), CustomControlTupleTest.immediate));
            long unused = CustomControlTupleTest.endingWindowId = this.currentWindowId;
            boolean unused2 = CustomControlTupleTest.done = true;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/CustomControlTupleTest$TestControlTuple.class */
    public static class TestControlTuple implements ControlTuple {
        public long data;
        public boolean immediate;

        public TestControlTuple() {
            this.data = 0L;
        }

        public TestControlTuple(long j, boolean z) {
            this.data = j;
            this.immediate = z;
        }

        public boolean equals(Object obj) {
            return (obj instanceof TestControlTuple) && ((TestControlTuple) obj).data == this.data;
        }

        public String toString() {
            return this.data + "";
        }

        public ControlTuple.DeliveryType getDeliveryType() {
            return this.immediate ? ControlTuple.DeliveryType.IMMEDIATE : ControlTuple.DeliveryType.END_WINDOW;
        }
    }

    @Before
    public void starting() {
        controlIndex = 0L;
        numControlTuples = 0;
        done = false;
        endApp = false;
        endingWindowId = 0L;
    }

    public void testApp(StreamingApplication streamingApplication) throws Exception {
        try {
            LocalMode newInstance = LocalMode.newInstance();
            newInstance.prepareDAG(streamingApplication, new Configuration(false));
            StramLocalCluster controller = newInstance.getController();
            controller.setExitCondition(new Callable<Boolean>() { // from class: com.datatorrent.stram.CustomControlTupleTest.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Boolean call() throws Exception {
                    return Boolean.valueOf(CustomControlTupleTest.endApp);
                }
            });
            controller.run(200000L);
            LOG.info("Control Tuples received {} expected {}", Integer.valueOf(numControlTuples), Long.valueOf(controlIndex));
            Assert.assertTrue("Incorrect Control Tuples", ((long) numControlTuples) == controlIndex);
        } catch (ConstraintViolationException e) {
            Assert.fail("constraint violations: " + e.getConstraintViolations());
        }
    }

    @Test
    public void testDefaultPropagation() throws Exception {
        immediate = false;
        testApp(new Application1());
    }

    @Test
    public void testExplicitPropagation() throws Exception {
        immediate = false;
        testApp(new Application2());
    }

    @Test
    public void testDuplicateControlTuples() throws Exception {
        immediate = false;
        testApp(new Application3());
    }

    @Test
    public void testThreadLocal() throws Exception {
        immediate = false;
        testApp(new Application4());
    }

    @Test
    public void testContainerLocal() throws Exception {
        immediate = false;
        testApp(new Application5());
    }

    @Test
    public void testDefaultPropagationImmediate() throws Exception {
        immediate = true;
        testApp(new Application1());
    }

    @Test
    public void testExplicitPropagationImmediate() throws Exception {
        immediate = true;
        testApp(new Application2());
    }

    @Test
    public void testDuplicateControlTuplesImmediate() throws Exception {
        immediate = true;
        testApp(new Application3());
    }

    @Test
    public void testThreadLocalImmediate() throws Exception {
        immediate = true;
        testApp(new Application4());
    }

    @Test
    public void testContainerLocalImmediate() throws Exception {
        immediate = true;
        testApp(new Application5());
    }

    static /* synthetic */ long access$108() {
        long j = controlIndex;
        controlIndex = j + 1;
        return j;
    }

    static /* synthetic */ int access$408() {
        int i = numControlTuples;
        numControlTuples = i + 1;
        return i;
    }
}
