package org.apache.apex.malhar.lib.window.impl;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import java.util.ArrayList;
import java.util.List;
import org.apache.apex.malhar.lib.window.ControlTuple;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.WindowState;
import org.apache.apex.malhar.lib.window.WindowedStorage;
import org.apache.apex.malhar.lib.window.accumulation.CoGroup;
import org.apache.hadoop.conf.Configuration;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication.class */
public class WindowedMergeOperatorTestApplication implements StreamingApplication {
    private static WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap = new InMemoryWindowedStorage();
    private static final long windowDuration = 1000;

    /* loaded from: input_file:org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication$Collector.class */
    public static class Collector extends BaseOperator {
        public static List<List<List<Integer>>> result = new ArrayList();
        public final transient DefaultOutputPort<Tuple<List<List<Integer>>>> output = new DefaultOutputPort<>();
        public final transient DefaultInputPort<Tuple<List<List<Integer>>>> input = new DefaultInputPort<Tuple<List<List<Integer>>>>() { // from class: org.apache.apex.malhar.lib.window.impl.WindowedMergeOperatorTestApplication.Collector.1
            /* JADX WARN: Multi-variable type inference failed */
            public void process(Tuple<List<List<Integer>>> tuple) {
                Collector.result.add(tuple.getValue());
                Collector.this.output.emit(tuple);
            }
        };
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication$NumGen1.class */
    public static class NumGen1 extends BaseOperator implements InputOperator {
        private int i;
        private long watermarkTime;
        private long startingTime;
        public final transient DefaultOutputPort<Tuple.WindowedTuple<Integer>> output = new DefaultOutputPort<>();
        public final transient DefaultOutputPort<ControlTuple> watermarkDefaultOutputPort = new DefaultOutputPort<>();

        public void setup(Context.OperatorContext operatorContext) {
            super.setup(operatorContext);
            this.startingTime = System.currentTimeMillis();
            this.watermarkTime = System.currentTimeMillis() + 10000;
            this.i = 1;
        }

        public void emitTuples() {
            while (this.i <= 20) {
                if (System.currentTimeMillis() - this.startingTime >= (this.i + 1) * 400) {
                    this.output.emit(new Tuple.WindowedTuple(WindowedMergeOperatorTestApplication.assignTestWindow(System.currentTimeMillis()), Integer.valueOf(this.i)));
                    this.i++;
                }
            }
        }

        public void endWindow() {
            if (this.i <= 20) {
                this.watermarkDefaultOutputPort.emit(new WatermarkImpl(this.watermarkTime));
            }
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTestApplication$NumGen2.class */
    public static class NumGen2 extends BaseOperator implements InputOperator {
        private int i;
        private long watermarkTime;
        private long startingTime;
        public final transient DefaultOutputPort<Tuple.WindowedTuple<Integer>> output = new DefaultOutputPort<>();
        public final transient DefaultOutputPort<ControlTuple> watermarkDefaultOutputPort = new DefaultOutputPort<>();

        public void setup(Context.OperatorContext operatorContext) {
            super.setup(operatorContext);
            this.startingTime = System.currentTimeMillis();
            this.watermarkTime = System.currentTimeMillis() + 10000;
            this.i = 1;
        }

        public void emitTuples() {
            while (this.i <= 20) {
                if (System.currentTimeMillis() - this.startingTime >= (this.i + 1) * 400) {
                    this.output.emit(new Tuple.WindowedTuple(WindowedMergeOperatorTestApplication.assignTestWindow(System.currentTimeMillis()), Integer.valueOf(10 * this.i)));
                    this.i++;
                }
            }
        }

        public void endWindow() {
            if (this.i <= 20) {
                this.watermarkDefaultOutputPort.emit(new WatermarkImpl(this.watermarkTime));
            }
        }
    }

    public static Window.TimeWindow assignTestWindow(long j) {
        Window.TimeWindow timeWindow = new Window.TimeWindow(j - (j % windowDuration), windowDuration);
        if (!windowStateMap.containsWindow(timeWindow)) {
            windowStateMap.put(timeWindow, new WindowState());
        }
        return timeWindow;
    }

    public void populateDAG(DAG dag, Configuration configuration) {
        WindowedMergeOperatorImpl addOperator = dag.addOperator("Merge", new WindowedMergeOperatorImpl());
        addOperator.setAccumulation(new CoGroup());
        addOperator.setDataStorage(new InMemoryWindowedStorage());
        addOperator.setRetractionStorage(new InMemoryWindowedStorage());
        addOperator.setWindowStateStorage(windowStateMap);
        addOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(2000L)));
        addOperator.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1L).accumulatingFiredPanes());
        addOperator.setAllowedLateness(Duration.millis(500L));
        NumGen1 addOperator2 = dag.addOperator("numGen1", new NumGen1());
        NumGen2 addOperator3 = dag.addOperator("numGen2", new NumGen2());
        Collector addOperator4 = dag.addOperator("collector", new Collector());
        ConsoleOutputOperator addOperator5 = dag.addOperator("console", new ConsoleOutputOperator());
        dag.addStream("num1", addOperator2.output, addOperator.input);
        dag.addStream("num2", addOperator3.output, addOperator.input2);
        dag.addStream("wm1", addOperator2.watermarkDefaultOutputPort, addOperator.controlInput);
        dag.addStream("wm2", addOperator3.watermarkDefaultOutputPort, addOperator.controlInput2);
        dag.addStream("MergedResult", addOperator.output, addOperator4.input);
        dag.addStream("output", addOperator4.output, addOperator5.input);
    }

    public static void main(String[] strArr) throws Exception {
        LocalMode newInstance = LocalMode.newInstance();
        newInstance.prepareDAG(new WindowedMergeOperatorTestApplication(), new Configuration(false));
        newInstance.getController().run(20000L);
    }
}
