package org.apache.apex.malhar.lib.window.impl;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
import java.util.Random;
import java.util.concurrent.Callable;
import javax.validation.constraints.Min;
import org.apache.apex.malhar.lib.window.ControlTuple;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.WindowState;
import org.apache.apex.malhar.lib.window.WindowedStorage;
import org.apache.apex.malhar.lib.window.accumulation.PojoInnerJoin;
import org.apache.hadoop.conf.Configuration;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication.class */
public class PojoInnerJoinTestApplication implements StreamingApplication {
    private static int records = 0;
    private static int SalesCount = 0;
    private static int ProductCount = 0;

    /* loaded from: input_file:org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication$POJOGenerator.class */
    public static class POJOGenerator implements InputOperator {

        @Min(1)
        private int maxProductId;

        @Min(1)
        private int maxCustomerId;

        @Min(1)
        int totalTuples;
        private int maxProductCategories;
        private double maxAmount;
        private long tuplesCounter;
        private long time;
        private long timeIncrement;
        private boolean isSalesEvent;

        @Min(0)
        private long maxTuplesPerWindow;
        private final Random random;
        public final transient DefaultOutputPort<Tuple.WindowedTuple<SalesEvent>> outputsales;
        public final transient DefaultOutputPort<Tuple.WindowedTuple<ProductEvent>> outputproduct;
        private static final long windowDuration = 1000;
        private static WindowedStorage.WindowedPlainStorage<WindowState> windowStateMap = new InMemoryWindowedStorage();
        public final transient DefaultOutputPort<ControlTuple> watermarkDefaultOutputPort;
        private long watermarkTime;
        private long startingTime;

        /* loaded from: input_file:org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication$POJOGenerator$OutputEvent.class */
        public static class OutputEvent {
            public int customerId;
            public int productId;
            public int productCategory;
            public long timestamp;
            public double amount;
            public long timestamps;

            public int getCustomerId() {
                return this.customerId;
            }

            public void setCustomerId(int i) {
                this.customerId = i;
            }

            public int getProductId() {
                return this.productId;
            }

            public void setProductId(int i) {
                this.productId = i;
            }

            public int getProductCategory() {
                return this.productCategory;
            }

            public void setProductCategory(int i) {
                this.productCategory = i;
            }

            public long getTimestamp() {
                return this.timestamp;
            }

            public void setTimestamp(long j) {
                this.timestamp = j;
            }

            public double getAmount() {
                return this.amount;
            }

            public void setAmount(double d) {
                this.amount = d;
            }

            public long getTimestamps() {
                return this.timestamps;
            }

            public void setTimestamps(long j) {
                this.timestamps = j;
            }
        }

        /* loaded from: input_file:org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication$POJOGenerator$ProductEvent.class */
        public static class ProductEvent {
            public int productId;
            public int productCategory;
            public long timestamp;

            public int getProductId() {
                return this.productId;
            }

            public void setProductId(int i) {
                this.productId = i;
            }

            public int getProductCategory() {
                return this.productCategory;
            }

            public void setProductCategory(int i) {
                this.productCategory = i;
            }

            public long getTimestamp() {
                return this.timestamp;
            }

            public void setTimestamp(long j) {
                this.timestamp = j;
            }
        }

        /* loaded from: input_file:org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication$POJOGenerator$SalesEvent.class */
        public static class SalesEvent {
            public int customerId;
            public int productId;
            public int productCategory;
            public double amount;
            public long timestamps;

            public int getCustomerId() {
                return this.customerId;
            }

            public void setCustomerId(int i) {
                this.customerId = i;
            }

            public int getProductId() {
                return this.productId;
            }

            public void setProductId(int i) {
                this.productId = i;
            }

            public int getProductCategory() {
                return this.productCategory;
            }

            public void setProductCategory(int i) {
                this.productCategory = i;
            }

            public double getAmount() {
                return this.amount;
            }

            public void setAmount(double d) {
                this.amount = d;
            }

            public long getTimestamps() {
                return this.timestamps;
            }

            public void setTimestamps(long j) {
                this.timestamps = j;
            }
        }

        public static Window.TimeWindow assignTestWindow(long j) {
            Window.TimeWindow timeWindow = new Window.TimeWindow(j - (j % windowDuration), windowDuration);
            if (!windowStateMap.containsWindow(timeWindow)) {
                windowStateMap.put(timeWindow, new WindowState());
            }
            return timeWindow;
        }

        public POJOGenerator(int i, int i2) {
            this.maxProductId = 1;
            this.maxCustomerId = 100000;
            this.maxProductCategories = 100;
            this.maxAmount = 100.0d;
            this.isSalesEvent = true;
            this.maxTuplesPerWindow = 100L;
            this.random = new Random();
            this.outputsales = new DefaultOutputPort<>();
            this.outputproduct = new DefaultOutputPort<>();
            this.watermarkDefaultOutputPort = new DefaultOutputPort<>();
            this.maxProductId = i;
            this.totalTuples = i2;
        }

        public POJOGenerator() {
            this.maxProductId = 1;
            this.maxCustomerId = 100000;
            this.maxProductCategories = 100;
            this.maxAmount = 100.0d;
            this.isSalesEvent = true;
            this.maxTuplesPerWindow = 100L;
            this.random = new Random();
            this.outputsales = new DefaultOutputPort<>();
            this.outputproduct = new DefaultOutputPort<>();
            this.watermarkDefaultOutputPort = new DefaultOutputPort<>();
        }

        public void beginWindow(long j) {
        }

        public void endWindow() {
            if (this.tuplesCounter < this.totalTuples) {
                this.watermarkDefaultOutputPort.emit(new WatermarkImpl(this.watermarkTime));
            }
            this.time += this.timeIncrement;
        }

        public void setup(Context.OperatorContext operatorContext) {
            this.startingTime = System.currentTimeMillis();
            this.watermarkTime = System.currentTimeMillis() + 10000;
            this.tuplesCounter = 0L;
            this.time = System.currentTimeMillis();
            this.timeIncrement = ((Integer) operatorContext.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)).intValue() * ((Integer) operatorContext.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)).intValue();
        }

        public void teardown() {
        }

        SalesEvent generateSalesEvent() throws Exception {
            SalesEvent salesEvent = new SalesEvent();
            salesEvent.productId = randomId(this.maxProductId);
            salesEvent.customerId = randomId(this.maxCustomerId);
            salesEvent.amount = randomAmount();
            salesEvent.timestamps = this.time;
            return salesEvent;
        }

        ProductEvent generateProductEvent() throws Exception {
            ProductEvent productEvent = new ProductEvent();
            productEvent.productId = randomId(this.maxProductId);
            productEvent.productCategory = randomId(this.maxProductCategories);
            productEvent.timestamp = this.time;
            return productEvent;
        }

        private int randomId(int i) {
            if (i < 1) {
                return 1;
            }
            return 1 + this.random.nextInt(i);
        }

        private double randomAmount() {
            return this.maxAmount * this.random.nextDouble();
        }

        public void emitTuples() {
            while (this.tuplesCounter < this.totalTuples) {
                try {
                    if (this.isSalesEvent) {
                        this.outputsales.emit(new Tuple.WindowedTuple(assignTestWindow(System.currentTimeMillis()), generateSalesEvent()));
                        PojoInnerJoinTestApplication.access$008();
                    } else {
                        this.outputproduct.emit(new Tuple.WindowedTuple(assignTestWindow(System.currentTimeMillis()), generateProductEvent()));
                        PojoInnerJoinTestApplication.access$108();
                    }
                    this.tuplesCounter++;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }

        public int getMaxProductId() {
            return this.maxProductId;
        }

        public void setMaxProductId(int i) {
            this.maxProductId = i;
        }

        public int getMaxCustomerId() {
            return this.maxCustomerId;
        }

        public void setMaxCustomerId(int i) {
            this.maxCustomerId = i;
        }

        public int getMaxProductCategories() {
            return this.maxProductCategories;
        }

        public void setMaxProductCategories(int i) {
            this.maxProductCategories = i;
        }

        public double getMaxAmount() {
            return this.maxAmount;
        }

        public void setMaxAmount(double d) {
            this.maxAmount = d;
        }

        public boolean isSalesEvent() {
            return this.isSalesEvent;
        }

        public void setSalesEvent(boolean z) {
            this.isSalesEvent = z;
        }

        public long getMaxTuplesPerWindow() {
            return this.maxTuplesPerWindow;
        }

        public void setMaxTuplesPerWindow(long j) {
            this.maxTuplesPerWindow = j;
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/window/impl/PojoInnerJoinTestApplication$ResultCollector.class */
    public static class ResultCollector extends BaseOperator {
        public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() { // from class: org.apache.apex.malhar.lib.window.impl.PojoInnerJoinTestApplication.ResultCollector.1
            public void process(Object obj) {
                PojoInnerJoinTestApplication.access$208();
            }
        };
    }

    public void populateDAG(DAG dag, Configuration configuration) {
        POJOGenerator addOperator = dag.addOperator("Input1", new POJOGenerator(1, 1));
        POJOGenerator addOperator2 = dag.addOperator("Input2", new POJOGenerator(1, 1));
        addOperator2.setSalesEvent(false);
        WindowedMergeOperatorImpl addOperator3 = dag.addOperator("Merge", new WindowedMergeOperatorImpl());
        addOperator3.setAccumulation(new PojoInnerJoin(2, POJOGenerator.OutputEvent.class, new String[]{"productId", "productId"}));
        addOperator3.setDataStorage(new InMemoryWindowedStorage());
        addOperator3.setWindowStateStorage(new InMemoryWindowedStorage());
        addOperator3.setWindowOption(new WindowOption.TimeWindows(Duration.millis(10L)));
        addOperator3.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1L).accumulatingFiredPanes());
        addOperator3.setAllowedLateness(Duration.millis(500L));
        ResultCollector addOperator4 = dag.addOperator("results", new ResultCollector());
        dag.addStream("SalesToJoin", addOperator.outputsales, addOperator3.input);
        dag.addStream("ProductToJoin", addOperator2.outputproduct, addOperator3.input2);
        dag.addStream("results", addOperator3.output, addOperator4.input);
        dag.addStream("wm1", addOperator.watermarkDefaultOutputPort, addOperator3.controlInput);
        dag.addStream("wm2", addOperator2.watermarkDefaultOutputPort, addOperator3.controlInput2);
    }

    @Test
    public void testApplication() throws Exception {
        LocalMode newInstance = LocalMode.newInstance();
        newInstance.prepareDAG(new PojoInnerJoinTestApplication(), new Configuration(false));
        StramLocalCluster controller = newInstance.getController();
        controller.setExitCondition(new Callable<Boolean>() { // from class: org.apache.apex.malhar.lib.window.impl.PojoInnerJoinTestApplication.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                return Boolean.valueOf(PojoInnerJoinTestApplication.SalesCount == 1 && PojoInnerJoinTestApplication.ProductCount == 1 && PojoInnerJoinTestApplication.records == 2);
            }
        });
        controller.run(20000L);
        Assert.assertEquals(2L, records);
    }

    static /* synthetic */ int access$008() {
        int i = SalesCount;
        SalesCount = i + 1;
        return i;
    }

    static /* synthetic */ int access$108() {
        int i = ProductCount;
        ProductCount = i + 1;
        return i;
    }

    static /* synthetic */ int access$208() {
        int i = records;
        records = i + 1;
        return i;
    }
}
