package com.datatorrent.lib.db.jdbc;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Partitioner;
import com.datatorrent.lib.db.jdbc.JdbcOperatorTest;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.helper.TestPortContext;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.KeyValPair;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.class */
public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest {
    public String dir = null;

    @Mock
    private ScheduledExecutorService mockscheduler;

    @Mock
    private ScheduledFuture futureTaskMock;

    @Mock
    private WindowDataManager windowDataManagerMock;

    @Before
    public void beforeTest() {
        this.dir = "target/" + APP_ID + "/";
        MockitoAnnotations.initMocks(this);
        Mockito.when(this.mockscheduler.scheduleWithFixedDelay((Runnable) Matchers.any(Runnable.class), Matchers.anyLong(), Matchers.anyLong(), (TimeUnit) Matchers.any(TimeUnit.class))).thenReturn(this.futureTaskMock);
    }

    @After
    public void afterTest() throws IOException {
        cleanTable();
        FileUtils.deleteDirectory(new File(this.dir));
    }

    @Test
    public void testDBPoller() throws InterruptedException {
        insertEvents(10, true, 0);
        JdbcStore jdbcStore = new JdbcStore();
        jdbcStore.setDatabaseDriver("org.hsqldb.jdbcDriver");
        jdbcStore.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
        List<FieldInfo> fieldInfos = getFieldInfos();
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.PortContext.TUPLE_CLASS, JdbcOperatorTest.TestPOJOEvent.class);
        TestPortContext testPortContext = new TestPortContext(defaultAttributeMap);
        JdbcPOJOPollInputOperator jdbcPOJOPollInputOperator = new JdbcPOJOPollInputOperator();
        jdbcPOJOPollInputOperator.setStore(jdbcStore);
        jdbcPOJOPollInputOperator.setTableName("test_pojo_event_table");
        jdbcPOJOPollInputOperator.setKey("id");
        jdbcPOJOPollInputOperator.setFieldInfos(fieldInfos);
        jdbcPOJOPollInputOperator.setFetchSize(100);
        jdbcPOJOPollInputOperator.setBatchSize(100);
        jdbcPOJOPollInputOperator.setPartitionCount(2);
        Collection<Partitioner.Partition> definePartitions = jdbcPOJOPollInputOperator.definePartitions(new ArrayList(), (Partitioner.PartitioningContext) null);
        int i = 0;
        for (Partitioner.Partition partition : definePartitions) {
            Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap2 = new Attribute.AttributeMap.DefaultAttributeMap();
            defaultAttributeMap2.put(DAG.APPLICATION_ID, APP_ID);
            defaultAttributeMap2.put(Context.DAGContext.APPLICATION_PATH, this.dir);
            int i2 = i;
            i++;
            OperatorContextTestHelper.TestIdOperatorContext testIdOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(i2, defaultAttributeMap2);
            JdbcPOJOPollInputOperator jdbcPOJOPollInputOperator2 = (JdbcPOJOPollInputOperator) partition.getPartitionedInstance();
            jdbcPOJOPollInputOperator2.outputPort.setup(testPortContext);
            jdbcPOJOPollInputOperator2.setScheduledExecutorService(this.mockscheduler);
            jdbcPOJOPollInputOperator2.setup(testIdOperatorContext);
            jdbcPOJOPollInputOperator2.activate(testIdOperatorContext);
        }
        Iterator it = definePartitions.iterator();
        JdbcPOJOPollInputOperator jdbcPOJOPollInputOperator3 = (JdbcPOJOPollInputOperator) ((Partitioner.Partition) it.next()).getPartitionedInstance();
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        jdbcPOJOPollInputOperator3.outputPort.setSink(collectorTestSink);
        jdbcPOJOPollInputOperator3.beginWindow(0L);
        jdbcPOJOPollInputOperator3.pollRecords();
        jdbcPOJOPollInputOperator3.pollRecords();
        jdbcPOJOPollInputOperator3.emitTuples();
        jdbcPOJOPollInputOperator3.endWindow();
        Assert.assertEquals("rows from db", 5L, collectorTestSink.collectedTuples.size());
        for (JdbcOperatorTest.TestPOJOEvent testPOJOEvent : collectorTestSink.collectedTuples) {
            Assert.assertTrue("date", testPOJOEvent.getStartDate() instanceof Date);
            Assert.assertTrue("date", testPOJOEvent.getId() < 5);
        }
        JdbcPOJOPollInputOperator jdbcPOJOPollInputOperator4 = (JdbcPOJOPollInputOperator) ((Partitioner.Partition) it.next()).getPartitionedInstance();
        CollectorTestSink collectorTestSink2 = new CollectorTestSink();
        jdbcPOJOPollInputOperator4.outputPort.setSink(collectorTestSink2);
        jdbcPOJOPollInputOperator4.beginWindow(0L);
        jdbcPOJOPollInputOperator4.pollRecords();
        jdbcPOJOPollInputOperator4.emitTuples();
        jdbcPOJOPollInputOperator4.endWindow();
        Assert.assertEquals("rows from db", 5L, collectorTestSink2.collectedTuples.size());
        for (JdbcOperatorTest.TestPOJOEvent testPOJOEvent2 : collectorTestSink2.collectedTuples) {
            Assert.assertTrue("date", testPOJOEvent2.getId() < 10 && testPOJOEvent2.getId() >= 5);
        }
        insertEvents(4, false, 10);
        JdbcPOJOPollInputOperator jdbcPOJOPollInputOperator5 = (JdbcPOJOPollInputOperator) ((Partitioner.Partition) it.next()).getPartitionedInstance();
        jdbcPOJOPollInputOperator5.outputPort.setSink(new CollectorTestSink());
        jdbcPOJOPollInputOperator5.beginWindow(0L);
        jdbcPOJOPollInputOperator5.pollRecords();
        jdbcPOJOPollInputOperator5.emitTuples();
        jdbcPOJOPollInputOperator5.endWindow();
        Assert.assertEquals("rows from db", 4L, r0.collectedTuples.size());
    }

    @Test
    public void testRecovery() throws IOException {
        Mockito.when(Long.valueOf(this.windowDataManagerMock.getLargestCompletedWindow())).thenReturn(1L);
        Mockito.when(this.windowDataManagerMock.retrieve(1L)).thenReturn(new MutablePair(0, 4));
        insertEvents(10, true, 0);
        JdbcStore jdbcStore = new JdbcStore();
        jdbcStore.setDatabaseDriver("org.hsqldb.jdbcDriver");
        jdbcStore.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
        List<FieldInfo> fieldInfos = getFieldInfos();
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.PortContext.TUPLE_CLASS, JdbcOperatorTest.TestPOJOEvent.class);
        TestPortContext testPortContext = new TestPortContext(defaultAttributeMap);
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap2 = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap2.put(DAG.APPLICATION_ID, APP_ID);
        defaultAttributeMap2.put(Context.DAGContext.APPLICATION_PATH, this.dir);
        OperatorContextTestHelper.TestIdOperatorContext testIdOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(1, defaultAttributeMap2);
        JdbcPOJOPollInputOperator jdbcPOJOPollInputOperator = new JdbcPOJOPollInputOperator();
        jdbcPOJOPollInputOperator.setStore(jdbcStore);
        jdbcPOJOPollInputOperator.setTableName("test_pojo_event_table");
        jdbcPOJOPollInputOperator.setKey("id");
        jdbcPOJOPollInputOperator.setFieldInfos(fieldInfos);
        jdbcPOJOPollInputOperator.setFetchSize(100);
        jdbcPOJOPollInputOperator.setBatchSize(100);
        jdbcPOJOPollInputOperator.lastEmittedRow = 0;
        jdbcPOJOPollInputOperator.rangeQueryPair = new KeyValPair(0, 8);
        jdbcPOJOPollInputOperator.outputPort.setup(testPortContext);
        jdbcPOJOPollInputOperator.setScheduledExecutorService(this.mockscheduler);
        jdbcPOJOPollInputOperator.setup(testIdOperatorContext);
        jdbcPOJOPollInputOperator.setWindowManager(this.windowDataManagerMock);
        jdbcPOJOPollInputOperator.activate(testIdOperatorContext);
        jdbcPOJOPollInputOperator.outputPort.setSink(new CollectorTestSink());
        jdbcPOJOPollInputOperator.beginWindow(0L);
        ((ScheduledExecutorService) Mockito.verify(this.mockscheduler, Mockito.times(0))).scheduleAtFixedRate((Runnable) Matchers.any(Runnable.class), Matchers.anyLong(), Matchers.anyLong(), (TimeUnit) Matchers.any(TimeUnit.class));
        jdbcPOJOPollInputOperator.emitTuples();
        jdbcPOJOPollInputOperator.endWindow();
        jdbcPOJOPollInputOperator.beginWindow(1L);
        ((ScheduledExecutorService) Mockito.verify(this.mockscheduler, Mockito.times(1))).scheduleAtFixedRate((Runnable) Matchers.any(Runnable.class), Matchers.anyLong(), Matchers.anyLong(), (TimeUnit) Matchers.any(TimeUnit.class));
    }

    private List<FieldInfo> getFieldInfos() {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new FieldInfo("ID", "id", (FieldInfo.SupportType) null));
        newArrayList.add(new FieldInfo("STARTDATE", "startDate", (FieldInfo.SupportType) null));
        newArrayList.add(new FieldInfo("STARTTIME", "startTime", (FieldInfo.SupportType) null));
        newArrayList.add(new FieldInfo("STARTTIMESTAMP", "startTimestamp", (FieldInfo.SupportType) null));
        newArrayList.add(new FieldInfo("NAME", "name", (FieldInfo.SupportType) null));
        return newArrayList;
    }
}
