package org.apache.apex.malhar.lib.dedup;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.fileaccess.TFileImpl;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
import com.datatorrent.stram.engine.PortContext;
import java.io.IOException;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.class */
public class DeduperTimeBasedPOJOImplTest {
    private static String applicationPath;
    private static final String APPLICATION_PATH_PREFIX = "target/DeduperPOJOImplTest";
    private static final String APP_ID = "DeduperPOJOImplTest";
    private static final int OPERATOR_ID = 0;
    private static TimeBasedDedupOperator deduper;

    @Before
    public void setup() {
        applicationPath = OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX);
        deduper = new TimeBasedDedupOperator();
        deduper.setKeyExpression("key");
        deduper.setTimeExpression("date.getTime()");
        deduper.setBucketSpan(10L);
        deduper.setExpireBefore(60L);
        TFileImpl.DTFileImpl dTFileImpl = new TFileImpl.DTFileImpl();
        dTFileImpl.setBasePath(applicationPath + "/bucket_data");
        deduper.managedState.setFileAccess(dTFileImpl);
    }

    @Test
    public void testDedup() {
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
        defaultAttributeMap.put(DAG.APPLICATION_PATH, applicationPath);
        defaultAttributeMap.put(DAG.InputPortMeta.TUPLE_CLASS, TestPojo.class);
        OperatorContextTestHelper.TestIdOperatorContext testIdOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(0, defaultAttributeMap);
        deduper.setup(testIdOperatorContext);
        deduper.input.setup(new PortContext(defaultAttributeMap, testIdOperatorContext));
        deduper.activate(testIdOperatorContext);
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        TestUtils.setSink(deduper.unique, collectorTestSink);
        CollectorTestSink collectorTestSink2 = new CollectorTestSink();
        TestUtils.setSink(deduper.duplicate, collectorTestSink2);
        CollectorTestSink collectorTestSink3 = new CollectorTestSink();
        TestUtils.setSink(deduper.expired, collectorTestSink3);
        deduper.beginWindow(0L);
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            deduper.input.process(new TestPojo(i, new Date(currentTimeMillis + i)));
        }
        deduper.input.process(new TestPojo(100L, new Date(currentTimeMillis - 60000)));
        for (int i2 = 90; i2 < 200; i2++) {
            deduper.input.process(new TestPojo(i2, new Date(currentTimeMillis + i2)));
        }
        deduper.handleIdleTime();
        deduper.endWindow();
        Assert.assertTrue(collectorTestSink.collectedTuples.size() == 200);
        Assert.assertTrue(collectorTestSink2.collectedTuples.size() == 10);
        Assert.assertTrue(collectorTestSink3.collectedTuples.size() == 1);
        deduper.teardown();
    }

    @After
    public void teardown() {
        Path path = new Path(applicationPath);
        try {
            FileSystem.newInstance(path.toUri(), new Configuration()).delete(path, true);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
