package org.apache.apex.malhar.lib.dedup;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.fileaccess.TFileImpl;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.class */
public class DeduperOrderingTest {
    public static boolean testFailed = false;

    /* loaded from: input_file:org/apache/apex/malhar/lib/dedup/DeduperOrderingTest$DeduperOrderingTestApp.class */
    public static class DeduperOrderingTestApp implements StreamingApplication {
        Verifier verifier;

        public void populateDAG(DAG dag, Configuration configuration) {
            RandomDedupDataGenerator addOperator = dag.addOperator("Input", RandomDedupDataGenerator.class);
            TimeBasedDedupOperator addOperator2 = dag.addOperator("Dedup", TimeBasedDedupOperator.class);
            addOperator2.setKeyExpression("key");
            addOperator2.setTimeExpression("date.getTime()");
            addOperator2.setBucketSpan(10L);
            addOperator2.setExpireBefore(60L);
            addOperator2.setPreserveTupleOrder(true);
            TFileImpl.DTFileImpl dTFileImpl = new TFileImpl.DTFileImpl();
            dTFileImpl.setBasePath(((String) dag.getAttributes().get(DAG.APPLICATION_PATH)) + "/bucket_data");
            addOperator2.managedState.setFileAccess(dTFileImpl);
            dag.setInputPortAttribute(addOperator2.input, Context.PortContext.TUPLE_CLASS, TestPojo.class);
            this.verifier = dag.addOperator("Verifier", Verifier.class);
            dag.addStream("Input to Dedup", addOperator.output, addOperator2.input);
            dag.addStream("Dedup to Unique", addOperator2.unique, this.verifier.unique);
            dag.addStream("Dedup to Duplicate", addOperator2.duplicate, this.verifier.duplicate);
            dag.addStream("Dedup to Expired", addOperator2.expired, this.verifier.expired);
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/dedup/DeduperOrderingTest$RandomDedupDataGenerator.class */
    public static class RandomDedupDataGenerator extends BaseOperator implements InputOperator {
        private final long count = 500;
        private long windowCount = 0;
        private long sequenceId = 0;
        public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();

        public void beginWindow(long j) {
            this.windowCount = 0L;
        }

        public void emitTuples() {
            if (this.windowCount < 500) {
                this.output.emit(new TestPojo(this.sequenceId, new Date(), this.sequenceId));
                this.sequenceId++;
                this.windowCount++;
            }
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/dedup/DeduperOrderingTest$Verifier.class */
    public static class Verifier extends BaseOperator implements StatsListener {
        long prevSequence = 0;
        public transient CountDownLatch latch = new CountDownLatch(1);

        @AutoMetric
        int count = 0;
        public final transient DefaultInputPort<Object> unique = new DefaultInputPort<Object>() { // from class: org.apache.apex.malhar.lib.dedup.DeduperOrderingTest.Verifier.1
            public void process(Object obj) {
                TestPojo testPojo = (TestPojo) obj;
                if (testPojo.getSequence() < Verifier.this.prevSequence) {
                    DeduperOrderingTest.testFailed = true;
                }
                Verifier.this.count++;
                Verifier.this.prevSequence = testPojo.sequence;
            }
        };
        public final transient DefaultInputPort<Object> duplicate = new DefaultInputPort<Object>() { // from class: org.apache.apex.malhar.lib.dedup.DeduperOrderingTest.Verifier.2
            public void process(Object obj) {
                TestPojo testPojo = (TestPojo) obj;
                if (testPojo.getSequence() < Verifier.this.prevSequence) {
                    DeduperOrderingTest.testFailed = true;
                }
                Verifier.this.count++;
                Verifier.this.prevSequence = testPojo.sequence;
            }
        };
        public final transient DefaultInputPort<Object> expired = new DefaultInputPort<Object>() { // from class: org.apache.apex.malhar.lib.dedup.DeduperOrderingTest.Verifier.3
            public void process(Object obj) {
                TestPojo testPojo = (TestPojo) obj;
                if (testPojo.getSequence() < Verifier.this.prevSequence) {
                    DeduperOrderingTest.testFailed = true;
                }
                Verifier.this.count++;
                Verifier.this.prevSequence = testPojo.sequence;
            }
        };

        public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
            this.count = ((Integer) ((Stats.OperatorStats) batchedOperatorStats.getLastWindowedStats().get(batchedOperatorStats.getLastWindowedStats().size() - 1)).metrics.get("count")).intValue();
            if (this.count < 1000) {
                return null;
            }
            this.latch.countDown();
            return null;
        }
    }

    @Test
    public void testApplication() throws IOException, Exception {
        LocalMode newInstance = LocalMode.newInstance();
        Configuration configuration = new Configuration(false);
        DeduperOrderingTestApp deduperOrderingTestApp = new DeduperOrderingTestApp();
        newInstance.prepareDAG(deduperOrderingTestApp, configuration);
        LocalMode.Controller controller = newInstance.getController();
        controller.runAsync();
        deduperOrderingTestApp.verifier.latch.await();
        Assert.assertFalse(testFailed);
        controller.shutdown();
    }
}
