package com.datatorrent.stram.engine;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/stram/engine/SliderTest.class */
public class SliderTest {

    /* loaded from: input_file:com/datatorrent/stram/engine/SliderTest$Input.class */
    public static class Input extends BaseOperator implements InputOperator {
        private boolean emit;
        private int val = 1;
        public final transient DefaultOutputPort<Integer> defaultOutputPort = new DefaultOutputPort<>();

        public void beginWindow(long j) {
            this.emit = true;
        }

        public void emitTuples() {
            if (this.emit) {
                this.emit = false;
                this.defaultOutputPort.emit(Integer.valueOf(this.val));
                this.val++;
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/SliderTest$Sum.class */
    public static class Sum extends BaseOperator implements Operator.Unifier<Integer> {
        int sum;
        public final transient DefaultInputPort<Integer> inputPort = new DefaultInputPort<Integer>() { // from class: com.datatorrent.stram.engine.SliderTest.Sum.1
            public void process(Integer num) {
                Sum.this.process(num);
            }
        };
        public final transient DefaultOutputPort<Integer> outputPort = new DefaultOutputPort<Integer>() { // from class: com.datatorrent.stram.engine.SliderTest.Sum.2
            public Operator.Unifier<Integer> getUnifier() {
                return new Sum();
            }
        };

        public void beginWindow(long j) {
            this.sum = 0;
        }

        public void process(Integer num) {
            this.sum += num.intValue();
        }

        public void endWindow() {
            if (this.sum > 0) {
                this.outputPort.emit(Integer.valueOf(this.sum));
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/SliderTest$Validator.class */
    public static class Validator extends BaseOperator {
        public static int numbersValidated;
        public int numberOfIntegers;
        public int slideByNumbers;
        private int staticSum;
        private int startingInteger = 1;
        public final transient DefaultInputPort<Integer> validate = new DefaultInputPort<Integer>() { // from class: com.datatorrent.stram.engine.SliderTest.Validator.1
            public void process(Integer num) {
                int i = Validator.this.staticSum + (Validator.this.numberOfIntegers * Validator.this.startingInteger);
                if (i != num.intValue()) {
                    throw new RuntimeException("numbers not matching " + i + " " + num + " " + Validator.this.startingInteger + " " + Validator.this.numberOfIntegers);
                }
                Validator.numbersValidated++;
                Validator.access$112(Validator.this, Validator.this.slideByNumbers);
            }
        };

        public void setup(Context.OperatorContext operatorContext) {
            this.staticSum = (this.numberOfIntegers * (this.numberOfIntegers - 1)) / 2;
        }

        public void beginWindow(long j) {
        }

        static /* synthetic */ int access$112(Validator validator, int i) {
            int i2 = validator.startingInteger + i;
            validator.startingInteger = i2;
            return i2;
        }
    }

    private void test(int i, int i2) throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new File("target/sliderTest").getAbsolutePath(), (Configuration) null));
        logicalPlan.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 100);
        Input addOperator = logicalPlan.addOperator("Input", new Input());
        Sum addOperator2 = logicalPlan.addOperator("Sum", new Sum());
        logicalPlan.setOperatorAttribute(addOperator2, OperatorContext.APPLICATION_WINDOW_COUNT, Integer.valueOf(i));
        logicalPlan.setOperatorAttribute(addOperator2, OperatorContext.SLIDE_BY_WINDOW_COUNT, Integer.valueOf(i2));
        Validator addOperator3 = logicalPlan.addOperator("validator", new Validator());
        Validator.numbersValidated = 0;
        addOperator3.numberOfIntegers = i;
        addOperator3.slideByNumbers = i2;
        logicalPlan.addStream("input-sum", addOperator.defaultOutputPort, addOperator2.inputPort);
        logicalPlan.addStream("sum-validator", addOperator2.outputPort, addOperator3.validate);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan);
        stramLocalCluster.runAsync();
        long currentTimeMillis = System.currentTimeMillis();
        while (StramTestSupport.DEFAULT_TIMEOUT_MILLIS > System.currentTimeMillis() - currentTimeMillis && Validator.numbersValidated <= 5) {
            Thread.sleep(100L);
        }
        stramLocalCluster.shutdown();
        Assert.assertTrue("numbers validated more than zero ", Validator.numbersValidated > 0);
    }

    @Test
    public void testSlider() throws Exception {
        test(5, 1);
    }

    @Test
    public void testSliderWithPrimeNumbers() throws Exception {
        test(5, 2);
    }

    @Test
    public void testSliderWithProperDivisor() throws Exception {
        test(4, 2);
    }
}
