package org.apache.apex.malhar.lib.dedup;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.google.common.collect.Maps;
import java.util.Date;
import java.util.HashMap;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.class */
public class DeduperPartitioningTest {
    public static final int NUM_DEDUP_PARTITIONS = 5;
    private static boolean testFailed = false;

    /* loaded from: input_file:org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest$TestDedupApp.class */
    public static class TestDedupApp implements StreamingApplication {
        TestDeduper dedup;

        public void populateDAG(DAG dag, Configuration configuration) {
            TestGenerator addOperator = dag.addOperator("Generator", new TestGenerator());
            this.dedup = dag.addOperator("Deduper", new TestDeduper());
            this.dedup.setKeyExpression("id");
            this.dedup.setTimeExpression("eventTime.getTime()");
            this.dedup.setBucketSpan(60L);
            this.dedup.setExpireBefore(600L);
            ConsoleOutputOperator addOperator2 = dag.addOperator("Console", new ConsoleOutputOperator());
            dag.addStream("Generator to Dedup", addOperator.output, this.dedup.input);
            dag.addStream("Dedup to Console", this.dedup.unique, addOperator2.input);
            dag.setInputPortAttribute(this.dedup.input, Context.PortContext.TUPLE_CLASS, TestEvent.class);
            dag.setOutputPortAttribute(this.dedup.unique, Context.PortContext.TUPLE_CLASS, TestEvent.class);
            dag.setAttribute(this.dedup, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(5));
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest$TestDeduper.class */
    public static class TestDeduper extends TimeBasedDedupOperator implements StatsListener {
        int operatorId;
        HashMap<Integer, Integer> partitionMap = Maps.newHashMap();
        transient CountDownLatch latch = new CountDownLatch(1);
        int tuplesProcessed = 0;

        @AutoMetric
        int tuplesProcessedCompletely = 0;

        public void setup(Context.OperatorContext operatorContext) {
            super.setup(operatorContext);
            this.operatorId = operatorContext.getId();
        }

        protected void processTuple(Object obj) {
            TestEvent testEvent = (TestEvent) obj;
            if (!this.partitionMap.containsKey(Integer.valueOf(testEvent.id))) {
                this.partitionMap.put(Integer.valueOf(testEvent.id), Integer.valueOf(this.operatorId));
            } else if (this.partitionMap.get(Integer.valueOf(testEvent.id)).intValue() != this.operatorId) {
                boolean unused = DeduperPartitioningTest.testFailed = true;
                throw new RuntimeException("Wrong tuple assignment");
            }
            this.tuplesProcessed++;
        }

        public void endWindow() {
            super.endWindow();
            this.tuplesProcessedCompletely = this.tuplesProcessed;
        }

        public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
            this.tuplesProcessedCompletely = ((Integer) ((Stats.OperatorStats) batchedOperatorStats.getLastWindowedStats().get(batchedOperatorStats.getLastWindowedStats().size() - 1)).metrics.get("tuplesProcessedCompletely")).intValue();
            if (this.tuplesProcessedCompletely < 1000) {
                return null;
            }
            this.latch.countDown();
            return null;
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest$TestEvent.class */
    public static class TestEvent {
        private int id;
        private Date eventTime;

        public int getId() {
            return this.id;
        }

        public void setId(int i) {
            this.id = i;
        }

        public Date getEventTime() {
            return this.eventTime;
        }

        public void setEventTime(Date date) {
            this.eventTime = date;
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest$TestGenerator.class */
    public static class TestGenerator extends BaseOperator implements InputOperator {
        public final transient DefaultOutputPort<TestEvent> output = new DefaultOutputPort<>();
        private final transient Random r = new Random();

        public void emitTuples() {
            TestEvent testEvent = new TestEvent();
            testEvent.id = this.r.nextInt(100);
            this.output.emit(testEvent);
        }
    }

    @Test
    public void testDeduperStreamCodec() throws Exception {
        LocalMode newInstance = LocalMode.newInstance();
        Configuration configuration = new Configuration(false);
        TestDedupApp testDedupApp = new TestDedupApp();
        newInstance.prepareDAG(testDedupApp, configuration);
        LocalMode.Controller controller = newInstance.getController();
        controller.runAsync();
        testDedupApp.dedup.latch.await();
        controller.shutdown();
        Assert.assertFalse(testFailed);
    }
}
