package com.datatorrent.stram.engine;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Sink;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/engine/ProcessingModeTests.class */
public class ProcessingModeTests {
    private LogicalPlan dag;
    Operator.ProcessingMode processingMode;
    private static final Logger logger = LoggerFactory.getLogger(ProcessingModeTests.class);

    @Rule
    public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
    int maxTuples = 30;

    /* loaded from: input_file:com/datatorrent/stram/engine/ProcessingModeTests$CollectorOperator.class */
    public static class CollectorOperator extends BaseOperator implements Operator.CheckpointListener {
        public static HashSet<Long> collection = new HashSet<>(20);
        public static ArrayList<Long> duplicates = new ArrayList<>();
        private boolean simulateFailure;
        private long checkPointWindowId;
        public final transient DefaultInputPort<Long> input = new DefaultInputPort<Long>() { // from class: com.datatorrent.stram.engine.ProcessingModeTests.CollectorOperator.1
            public void process(Long l) {
                ProcessingModeTests.logger.debug("adding the tuple {}", Codec.getStringWindowId(l.longValue()));
                if (CollectorOperator.collection.contains(l)) {
                    CollectorOperator.duplicates.add(l);
                } else {
                    CollectorOperator.collection.add(l);
                }
            }
        };

        public void setSimulateFailure(boolean z) {
            this.simulateFailure = z;
        }

        public void setup(Context.OperatorContext operatorContext) {
            this.simulateFailure &= this.checkPointWindowId == 0;
            ProcessingModeTests.logger.debug("simulateFailure = {}", Boolean.valueOf(this.simulateFailure));
        }

        public void checkpointed(long j) {
            if (this.checkPointWindowId == 0) {
                this.checkPointWindowId = j;
            }
            ProcessingModeTests.logger.debug("{} checkpointed at {}", this, Codec.getStringWindowId(j));
        }

        public void committed(long j) {
            ProcessingModeTests.logger.debug("{} committed at {}", this, Codec.getStringWindowId(j));
            if (this.simulateFailure && j > this.checkPointWindowId && this.checkPointWindowId > 0) {
                throw new RuntimeException("Failure Simulation from " + this + " checkpointWindowId=" + Codec.getStringWindowId(this.checkPointWindowId));
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/ProcessingModeTests$MultiInputOperator.class */
    public static class MultiInputOperator implements Operator {
        public final transient MyInputPort input1 = new MyInputPort(100);
        public final transient MyInputPort input2 = new MyInputPort(200);
        public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();

        /* loaded from: input_file:com/datatorrent/stram/engine/ProcessingModeTests$MultiInputOperator$MyInputPort.class */
        public class MyInputPort extends DefaultInputPort<Integer> {
            private final int id;

            public MyInputPort(int i) {
                this.id = i;
            }

            public void process(Integer num) {
                MultiInputOperator.this.output.emit(Integer.valueOf(this.id + num.intValue()));
            }
        }

        public void beginWindow(long j) {
        }

        public void endWindow() {
        }

        public void setup(Context.OperatorContext operatorContext) {
        }

        public void teardown() {
        }
    }

    public ProcessingModeTests(Operator.ProcessingMode processingMode) {
        this.processingMode = processingMode;
    }

    @Before
    public void setup() throws IOException {
        this.dag = StramTestSupport.createDAG(this.testMeta);
        StreamingContainer.eventloop.start();
    }

    @After
    public void teardown() {
        StreamingContainer.eventloop.stop();
    }

    public void testLinearInputOperatorRecovery() throws Exception {
        RecoverableInputOperator.initGenTuples();
        CollectorOperator.collection.clear();
        CollectorOperator.duplicates.clear();
        this.dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
        this.dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
        this.dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
        RecoverableInputOperator addOperator = this.dag.addOperator("LongGenerator", RecoverableInputOperator.class);
        addOperator.setMaximumTuples(this.maxTuples);
        addOperator.setSimulateFailure(true);
        this.dag.getMeta(addOperator).getAttributes().put(Context.OperatorContext.PROCESSING_MODE, this.processingMode);
        CollectorOperator addOperator2 = this.dag.addOperator("LongCollector", CollectorOperator.class);
        if (Operator.ProcessingMode.EXACTLY_ONCE.equals(this.processingMode)) {
            this.dag.getMeta(addOperator2).getAttributes().put(Context.OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE);
        }
        this.dag.addStream("connection", addOperator.output, addOperator2.input);
        new StramLocalCluster(this.dag).run();
    }

    public void testLinearOperatorRecovery() throws Exception {
        RecoverableInputOperator.initGenTuples();
        CollectorOperator.collection.clear();
        CollectorOperator.duplicates.clear();
        this.dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
        this.dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
        this.dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
        RecoverableInputOperator addOperator = this.dag.addOperator("LongGenerator", RecoverableInputOperator.class);
        addOperator.setMaximumTuples(this.maxTuples);
        addOperator.setSimulateFailure(true);
        CollectorOperator addOperator2 = this.dag.addOperator("LongCollector", CollectorOperator.class);
        addOperator2.setSimulateFailure(true);
        this.dag.getMeta(addOperator2).getAttributes().put(Context.OperatorContext.PROCESSING_MODE, this.processingMode);
        this.dag.addStream("connection", addOperator.output, addOperator2.input);
        new StramLocalCluster(this.dag).run();
    }

    public void testLinearInlineOperatorsRecovery() throws Exception {
        RecoverableInputOperator.initGenTuples();
        CollectorOperator.collection.clear();
        CollectorOperator.duplicates.clear();
        this.dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
        this.dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
        this.dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
        RecoverableInputOperator addOperator = this.dag.addOperator("LongGenerator", RecoverableInputOperator.class);
        addOperator.setMaximumTuples(this.maxTuples);
        addOperator.setSimulateFailure(true);
        CollectorOperator addOperator2 = this.dag.addOperator("LongCollector", CollectorOperator.class);
        addOperator2.setSimulateFailure(true);
        this.dag.getMeta(addOperator2).getAttributes().put(Context.OperatorContext.PROCESSING_MODE, this.processingMode);
        this.dag.addStream("connection", addOperator.output, addOperator2.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
        new StramLocalCluster(this.dag).run();
    }

    public void testNonLinearOperatorRecovery() throws InterruptedException {
        final HashSet hashSet = new HashSet();
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, 0);
        defaultAttributeMap.put(Context.OperatorContext.PROCESSING_MODE, this.processingMode);
        final GenericNode genericNode = new GenericNode(new MultiInputOperator(), new OperatorContext(1, "operator", defaultAttributeMap, (Context) null));
        AbstractReservoir newReservoir = AbstractReservoir.newReservoir("input1", 1024);
        AbstractReservoir newReservoir2 = AbstractReservoir.newReservoir("input1", 1024);
        genericNode.connectInputPort("input1", newReservoir);
        genericNode.connectInputPort("input2", newReservoir2);
        genericNode.connectOutputPort("output", new Sink<Object>() { // from class: com.datatorrent.stram.engine.ProcessingModeTests.1
            public void put(Object obj) {
                if (hashSet.contains(obj)) {
                    throw new RuntimeException("Duplicate Found!");
                }
                hashSet.add(obj);
            }

            public int getCount(boolean z) {
                return 0;
            }
        });
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: com.datatorrent.stram.engine.ProcessingModeTests.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                atomicBoolean.set(true);
                genericNode.activate();
                genericNode.run();
                genericNode.deactivate();
            }
        };
        thread.start();
        for (int i = 0; i < 100 && !atomicBoolean.get(); i++) {
            Thread.sleep(5L);
        }
        newReservoir.add(new Tuple(MessageType.BEGIN_WINDOW, 1L));
        newReservoir.add(1);
        newReservoir2.add(new Tuple(MessageType.BEGIN_WINDOW, 1L));
        newReservoir.add(new EndWindowTuple(1L));
        newReservoir2.add(1);
        newReservoir2.add(new EndWindowTuple(1L));
        for (int i2 = 0; i2 < 100 && hashSet.size() < 4; i2++) {
            Thread.sleep(5L);
        }
        newReservoir.add(new Tuple(MessageType.BEGIN_WINDOW, 2L));
        newReservoir.add(2);
        newReservoir.add(new EndWindowTuple(2L));
        for (int i3 = 0; i3 < 100 && hashSet.size() < 6; i3++) {
            Thread.sleep(5L);
        }
        newReservoir2.add(new Tuple(MessageType.BEGIN_WINDOW, 4L));
        newReservoir2.add(4);
        newReservoir2.add(new EndWindowTuple(4L));
        for (int i4 = 0; i4 < 100 && hashSet.size() < 9; i4++) {
            Thread.sleep(5L);
        }
        newReservoir.add(new Tuple(MessageType.BEGIN_WINDOW, 3L));
        newReservoir.add(3);
        newReservoir.add(new EndWindowTuple(3L));
        Thread.sleep(500L);
        newReservoir.add(new Tuple(MessageType.BEGIN_WINDOW, 5L));
        newReservoir.add(5);
        newReservoir2.add(new Tuple(MessageType.BEGIN_WINDOW, 5L));
        newReservoir.add(new EndWindowTuple(5L));
        newReservoir2.add(5);
        newReservoir2.add(new EndWindowTuple(5L));
        for (int i5 = 0; i5 < 100 && hashSet.size() < 14; i5++) {
            Thread.sleep(5L);
        }
        thread.interrupt();
        thread.join();
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            Object next = it.next();
            if (!(next instanceof Tuple)) {
                switch (((Integer) next).intValue()) {
                    case 101:
                    case 102:
                    case 105:
                    case 201:
                    case 204:
                    case 205:
                        break;
                    default:
                        Assert.fail("Unexpected Data Tuple: " + next);
                        break;
                }
            } else {
                Tuple tuple = (Tuple) next;
                long windowId = tuple.getWindowId();
                Assert.assertTrue("Valid Window Id", windowId == 1 || windowId == 2 || windowId == 4 || windowId == 5);
                Assert.assertTrue("Valid Tuple Type", tuple.getType() == MessageType.BEGIN_WINDOW || tuple.getType() == MessageType.END_WINDOW);
            }
        }
    }
}
