package com.datatorrent.lib.db.jdbc;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.db.Connectable;
import com.datatorrent.lib.db.TransactionableStore;
import com.datatorrent.lib.db.jdbc.JdbcOperatorTest;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.helper.TestPortContext;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.TestUtils;
import com.google.common.collect.Lists;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Iterator;
import javax.annotation.Nonnull;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.class */
public class JdbcPojoOperatorTest extends JdbcOperatorTest {

    /* loaded from: input_file:com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest$TestInputOperator.class */
    private static class TestInputOperator extends AbstractJdbcInputOperator<JdbcOperatorTest.TestEvent> {
        private static final String retrieveQuery = "SELECT * FROM test_event_table";

        TestInputOperator() {
            JdbcOperatorTest.cleanTable();
        }

        /* renamed from: getTuple, reason: merged with bridge method [inline-methods] */
        public JdbcOperatorTest.TestEvent m27getTuple(ResultSet resultSet) {
            try {
                return new JdbcOperatorTest.TestEvent(resultSet.getInt(1));
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }

        public String queryToRetrieveData() {
            return retrieveQuery;
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest$TestOutputOperator.class */
    private static class TestOutputOperator extends AbstractJdbcTransactionableOutputOperator<JdbcOperatorTest.TestEvent> {
        private static final String INSERT_STMT = "INSERT INTO test_event_table values (?)";

        TestOutputOperator() {
            JdbcOperatorTest.cleanTable();
        }

        @Nonnull
        protected String getUpdateCommand() {
            return "INSERT INTO test_event_table values (?)";
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void setStatementParameters(PreparedStatement preparedStatement, JdbcOperatorTest.TestEvent testEvent) throws SQLException {
            preparedStatement.setInt(1, testEvent.id);
        }

        public int getNumOfEventsInStore() {
            try {
                ResultSet executeQuery = DriverManager.getConnection("jdbc:hsqldb:mem:test;sql.syntax_mys=true").createStatement().executeQuery("SELECT count(*) from test_event_table");
                executeQuery.next();
                return executeQuery.getInt(1);
            } catch (SQLException e) {
                throw new RuntimeException("fetching count", e);
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest$TestPOJOOutputOperator.class */
    private static class TestPOJOOutputOperator extends JdbcPOJOInsertOutputOperator {

        /* loaded from: input_file:com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest$TestPOJOOutputOperator$TestPOJONonInsertOutputOperator.class */
        private static class TestPOJONonInsertOutputOperator extends JdbcPOJONonInsertOutputOperator {
            public TestPOJONonInsertOutputOperator() {
                JdbcOperatorTest.cleanTable();
            }

            public int getNumOfEventsInStore() {
                try {
                    ResultSet executeQuery = DriverManager.getConnection("jdbc:hsqldb:mem:test;sql.syntax_mys=true").createStatement().executeQuery("SELECT count(*) from test_pojo_event_table");
                    executeQuery.next();
                    return executeQuery.getInt(1);
                } catch (SQLException e) {
                    throw new RuntimeException("fetching count", e);
                }
            }

            public int getDistinctNonUnique() {
                try {
                    ResultSet executeQuery = DriverManager.getConnection("jdbc:hsqldb:mem:test;sql.syntax_mys=true").createStatement().executeQuery("SELECT count(distinct(name)) from test_pojo_event_table");
                    executeQuery.next();
                    return executeQuery.getInt(1);
                } catch (SQLException e) {
                    throw new RuntimeException("fetching count", e);
                }
            }
        }

        TestPOJOOutputOperator() {
            JdbcOperatorTest.cleanTable();
        }

        public int getNumOfEventsInStore(String str) {
            try {
                ResultSet executeQuery = DriverManager.getConnection("jdbc:hsqldb:mem:test;sql.syntax_mys=true").createStatement().executeQuery("SELECT count(*) from " + str);
                executeQuery.next();
                return executeQuery.getInt(1);
            } catch (SQLException e) {
                throw new RuntimeException("fetching count", e);
            }
        }

        public int getNumOfNullEventsInStore(String str) {
            try {
                ResultSet executeQuery = DriverManager.getConnection("jdbc:hsqldb:mem:test;sql.syntax_mys=true").createStatement().executeQuery("SELECT count(*) from " + str + " where name1 is null");
                executeQuery.next();
                return executeQuery.getInt(1);
            } catch (SQLException e) {
                throw new RuntimeException("fetching count", e);
            }
        }
    }

    @Test
    public void testJdbcOutputOperator() {
        TransactionableStore jdbcTransactionalStore = new JdbcTransactionalStore();
        jdbcTransactionalStore.setDatabaseDriver("org.hsqldb.jdbcDriver");
        jdbcTransactionalStore.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
        Context.OperatorContext mockOperatorContext = OperatorContextTestHelper.mockOperatorContext(OPERATOR_ID, defaultAttributeMap);
        TestOutputOperator testOutputOperator = new TestOutputOperator();
        testOutputOperator.setBatchSize(3);
        testOutputOperator.setStore(jdbcTransactionalStore);
        testOutputOperator.setup(mockOperatorContext);
        testOutputOperator.activate(mockOperatorContext);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 10; i++) {
            newArrayList.add(new JdbcOperatorTest.TestEvent(i));
        }
        testOutputOperator.beginWindow(0L);
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            testOutputOperator.input.process((JdbcOperatorTest.TestEvent) it.next());
        }
        testOutputOperator.endWindow();
        Assert.assertEquals("rows in db", 10L, testOutputOperator.getNumOfEventsInStore());
        cleanTable();
    }

    @Test
    public void testJdbcPojoOutputOperator() {
        TransactionableStore jdbcTransactionalStore = new JdbcTransactionalStore();
        jdbcTransactionalStore.setDatabaseDriver("org.hsqldb.jdbcDriver");
        jdbcTransactionalStore.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
        Context.OperatorContext mockOperatorContext = OperatorContextTestHelper.mockOperatorContext(OPERATOR_ID, defaultAttributeMap);
        TestPOJOOutputOperator testPOJOOutputOperator = new TestPOJOOutputOperator();
        testPOJOOutputOperator.setBatchSize(3);
        testPOJOOutputOperator.setTablename("test_pojo_event_table");
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new JdbcFieldInfo("ID", "id", (FieldInfo.SupportType) null, 4));
        newArrayList.add(new JdbcFieldInfo("NAME", "name", (FieldInfo.SupportType) null, 12));
        testPOJOOutputOperator.setFieldInfos(newArrayList);
        testPOJOOutputOperator.setStore(jdbcTransactionalStore);
        testPOJOOutputOperator.setup(mockOperatorContext);
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap2 = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap2.put(Context.PortContext.TUPLE_CLASS, JdbcOperatorTest.TestPOJOEvent.class);
        testPOJOOutputOperator.input.setup(new TestPortContext(defaultAttributeMap2));
        testPOJOOutputOperator.activate(mockOperatorContext);
        ArrayList newArrayList2 = Lists.newArrayList();
        for (int i = 0; i < 10; i++) {
            newArrayList2.add(new JdbcOperatorTest.TestPOJOEvent(i, "test" + i));
        }
        testPOJOOutputOperator.beginWindow(0L);
        Iterator it = newArrayList2.iterator();
        while (it.hasNext()) {
            testPOJOOutputOperator.input.process((JdbcOperatorTest.TestPOJOEvent) it.next());
        }
        testPOJOOutputOperator.endWindow();
        Assert.assertEquals("rows in db", 10L, testPOJOOutputOperator.getNumOfEventsInStore("test_pojo_event_table"));
    }

    @Test
    public void testJdbcPojoInsertOutputOperator() {
        TransactionableStore jdbcTransactionalStore = new JdbcTransactionalStore();
        jdbcTransactionalStore.setDatabaseDriver("org.hsqldb.jdbcDriver");
        jdbcTransactionalStore.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
        Context.OperatorContext mockOperatorContext = OperatorContextTestHelper.mockOperatorContext(OPERATOR_ID, defaultAttributeMap);
        TestPOJOOutputOperator testPOJOOutputOperator = new TestPOJOOutputOperator();
        testPOJOOutputOperator.setBatchSize(3);
        testPOJOOutputOperator.setTablename("test_pojo_event_table");
        testPOJOOutputOperator.setStore(jdbcTransactionalStore);
        testPOJOOutputOperator.setup(mockOperatorContext);
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap2 = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap2.put(Context.PortContext.TUPLE_CLASS, JdbcOperatorTest.TestPOJOEvent.class);
        testPOJOOutputOperator.input.setup(new TestPortContext(defaultAttributeMap2));
        TestUtils.setSink(testPOJOOutputOperator.error, new CollectorTestSink());
        testPOJOOutputOperator.activate(mockOperatorContext);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 10; i++) {
            newArrayList.add(new JdbcOperatorTest.TestPOJOEvent(i, "test" + i));
        }
        newArrayList.add(new JdbcOperatorTest.TestPOJOEvent(0, "test0"));
        newArrayList.add(new JdbcOperatorTest.TestPOJOEvent(2, "test2"));
        newArrayList.add(new JdbcOperatorTest.TestPOJOEvent(10, "test10"));
        newArrayList.add(new JdbcOperatorTest.TestPOJOEvent(11, "test11"));
        newArrayList.add(new JdbcOperatorTest.TestPOJOEvent(3, "test3"));
        newArrayList.add(new JdbcOperatorTest.TestPOJOEvent(12, "test12"));
        testPOJOOutputOperator.beginWindow(0L);
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            testPOJOOutputOperator.input.process((JdbcOperatorTest.TestPOJOEvent) it.next());
        }
        testPOJOOutputOperator.endWindow();
        Assert.assertEquals("rows in db", 13L, testPOJOOutputOperator.getNumOfEventsInStore("test_pojo_event_table"));
        Assert.assertEquals("Error tuples", 3L, r0.collectedTuples.size());
    }

    @Test
    public void testJdbcPojoInsertOutputOperatorExactlyOnce() {
        TransactionableStore jdbcTransactionalStore = new JdbcTransactionalStore();
        jdbcTransactionalStore.setDatabaseDriver("org.hsqldb.jdbcDriver");
        jdbcTransactionalStore.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
        Context.OperatorContext mockOperatorContext = OperatorContextTestHelper.mockOperatorContext(OPERATOR_ID, defaultAttributeMap);
        TestPOJOOutputOperator testPOJOOutputOperator = new TestPOJOOutputOperator();
        testPOJOOutputOperator.setBatchSize(3);
        testPOJOOutputOperator.setTablename("test_pojo_event_table");
        testPOJOOutputOperator.setStore(jdbcTransactionalStore);
        testPOJOOutputOperator.setup(mockOperatorContext);
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap2 = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap2.put(Context.PortContext.TUPLE_CLASS, JdbcOperatorTest.TestPOJOEvent.class);
        TestPortContext testPortContext = new TestPortContext(defaultAttributeMap2);
        testPOJOOutputOperator.input.setup(testPortContext);
        TestUtils.setSink(testPOJOOutputOperator.error, new CollectorTestSink());
        testPOJOOutputOperator.activate(mockOperatorContext);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 70; i++) {
            newArrayList.add(new JdbcOperatorTest.TestPOJOEvent(i, "test" + i));
        }
        testPOJOOutputOperator.beginWindow(0L);
        for (int i2 = 0; i2 < 10; i2++) {
            testPOJOOutputOperator.input.process(newArrayList.get(i2));
        }
        testPOJOOutputOperator.endWindow();
        testPOJOOutputOperator.beginWindow(1L);
        for (int i3 = 10; i3 < 20; i3++) {
            testPOJOOutputOperator.input.process(newArrayList.get(i3));
        }
        testPOJOOutputOperator.endWindow();
        testPOJOOutputOperator.beginWindow(2L);
        for (int i4 = 20; i4 < 30; i4++) {
            testPOJOOutputOperator.input.process(newArrayList.get(i4));
        }
        testPOJOOutputOperator.endWindow();
        testPOJOOutputOperator.setup(mockOperatorContext);
        testPOJOOutputOperator.input.setup(testPortContext);
        testPOJOOutputOperator.activate(mockOperatorContext);
        testPOJOOutputOperator.beginWindow(0L);
        for (int i5 = 30; i5 < 40; i5++) {
            testPOJOOutputOperator.input.process(newArrayList.get(i5));
        }
        testPOJOOutputOperator.endWindow();
        testPOJOOutputOperator.beginWindow(1L);
        for (int i6 = 40; i6 < 50; i6++) {
            testPOJOOutputOperator.input.process(newArrayList.get(i6));
        }
        testPOJOOutputOperator.endWindow();
        testPOJOOutputOperator.beginWindow(2L);
        for (int i7 = 50; i7 < 60; i7++) {
            testPOJOOutputOperator.input.process(newArrayList.get(i7));
        }
        testPOJOOutputOperator.beginWindow(3L);
        for (int i8 = 60; i8 < 70; i8++) {
            testPOJOOutputOperator.input.process(newArrayList.get(i8));
        }
        testPOJOOutputOperator.endWindow();
        testPOJOOutputOperator.deactivate();
        testPOJOOutputOperator.teardown();
        Assert.assertEquals("rows in db", 40L, testPOJOOutputOperator.getNumOfEventsInStore("test_pojo_event_table"));
    }

    @Test
    public void testJdbcPojoInsertOutputOperatorNullName() {
        TransactionableStore jdbcTransactionalStore = new JdbcTransactionalStore();
        jdbcTransactionalStore.setDatabaseDriver("org.hsqldb.jdbcDriver");
        jdbcTransactionalStore.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
        Context.OperatorContext mockOperatorContext = OperatorContextTestHelper.mockOperatorContext(OPERATOR_ID, defaultAttributeMap);
        TestPOJOOutputOperator testPOJOOutputOperator = new TestPOJOOutputOperator();
        testPOJOOutputOperator.setBatchSize(3);
        testPOJOOutputOperator.setTablename("test_pojo_event_table_name_diff");
        testPOJOOutputOperator.setStore(jdbcTransactionalStore);
        testPOJOOutputOperator.setup(mockOperatorContext);
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap2 = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap2.put(Context.PortContext.TUPLE_CLASS, JdbcOperatorTest.TestPOJOEvent.class);
        testPOJOOutputOperator.input.setup(new TestPortContext(defaultAttributeMap2));
        testPOJOOutputOperator.activate(mockOperatorContext);
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < 10; i++) {
            newArrayList.add(new JdbcOperatorTest.TestPOJOEvent(i, "test" + i));
        }
        testPOJOOutputOperator.beginWindow(0L);
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            testPOJOOutputOperator.input.process((JdbcOperatorTest.TestPOJOEvent) it.next());
        }
        testPOJOOutputOperator.endWindow();
        Assert.assertEquals("rows in db", 10L, testPOJOOutputOperator.getNumOfEventsInStore("test_pojo_event_table_name_diff"));
        Assert.assertEquals("null name rows in db", 10L, testPOJOOutputOperator.getNumOfNullEventsInStore("test_pojo_event_table_name_diff"));
    }

    @Test
    public void testJdbcPojoInsertOutputOperatorNullId() {
        TransactionableStore jdbcTransactionalStore = new JdbcTransactionalStore();
        jdbcTransactionalStore.setDatabaseDriver("org.hsqldb.jdbcDriver");
        jdbcTransactionalStore.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
        Context.OperatorContext mockOperatorContext = OperatorContextTestHelper.mockOperatorContext(OPERATOR_ID, defaultAttributeMap);
        TestPOJOOutputOperator testPOJOOutputOperator = new TestPOJOOutputOperator();
        testPOJOOutputOperator.setBatchSize(3);
        testPOJOOutputOperator.setTablename("test_pojo_event_table_id_diff");
        testPOJOOutputOperator.setStore(jdbcTransactionalStore);
        testPOJOOutputOperator.setup(mockOperatorContext);
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap2 = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap2.put(Context.PortContext.TUPLE_CLASS, JdbcOperatorTest.TestPOJOEvent.class);
        testPOJOOutputOperator.input.setup(new TestPortContext(defaultAttributeMap2));
        boolean z = false;
        try {
            testPOJOOutputOperator.activate(mockOperatorContext);
        } catch (Exception e) {
            z = true;
            Assert.assertTrue(e instanceof RuntimeException);
            Assert.assertTrue(e.getMessage().toLowerCase().contains("id1 not found in pojo"));
        }
        Assert.assertTrue(z);
    }

    @Test
    public void testJdbcPojoOutputOperatorMerge() {
        TransactionableStore jdbcTransactionalStore = new JdbcTransactionalStore();
        jdbcTransactionalStore.setDatabaseDriver("org.hsqldb.jdbcDriver");
        jdbcTransactionalStore.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
        Context.OperatorContext mockOperatorContext = OperatorContextTestHelper.mockOperatorContext(OPERATOR_ID, defaultAttributeMap);
        TestPOJOOutputOperator.TestPOJONonInsertOutputOperator testPOJONonInsertOutputOperator = new TestPOJOOutputOperator.TestPOJONonInsertOutputOperator();
        testPOJONonInsertOutputOperator.setBatchSize(3);
        testPOJONonInsertOutputOperator.setStore(jdbcTransactionalStore);
        testPOJONonInsertOutputOperator.setSqlStatement("MERGE INTO test_pojo_event_table AS T USING (VALUES (?, ?)) AS FOO(id, name) ON T.id = FOO.id WHEN MATCHED THEN UPDATE SET name = FOO.name WHEN NOT MATCHED THEN INSERT( id, name ) VALUES (FOO.id, FOO.name);");
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new JdbcFieldInfo("id", "id", (FieldInfo.SupportType) null, 4));
        newArrayList.add(new JdbcFieldInfo("name", "name", (FieldInfo.SupportType) null, 12));
        testPOJONonInsertOutputOperator.setFieldInfos(newArrayList);
        testPOJONonInsertOutputOperator.setup(mockOperatorContext);
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap2 = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap2.put(Context.PortContext.TUPLE_CLASS, JdbcOperatorTest.TestPOJOEvent.class);
        testPOJONonInsertOutputOperator.input.setup(new TestPortContext(defaultAttributeMap2));
        testPOJONonInsertOutputOperator.activate(mockOperatorContext);
        ArrayList newArrayList2 = Lists.newArrayList();
        for (int i = 0; i < 10; i++) {
            newArrayList2.add(new JdbcOperatorTest.TestPOJOEvent(i, "test" + i));
        }
        for (int i2 = 0; i2 < 5; i2++) {
            newArrayList2.add(new JdbcOperatorTest.TestPOJOEvent(i2, "test100"));
        }
        testPOJONonInsertOutputOperator.getDistinctNonUnique();
        testPOJONonInsertOutputOperator.beginWindow(0L);
        Iterator it = newArrayList2.iterator();
        while (it.hasNext()) {
            testPOJONonInsertOutputOperator.input.process((JdbcOperatorTest.TestPOJOEvent) it.next());
        }
        testPOJONonInsertOutputOperator.endWindow();
        Assert.assertEquals("rows in db", 10L, testPOJONonInsertOutputOperator.getNumOfEventsInStore());
        Assert.assertEquals("rows in db", 6L, testPOJONonInsertOutputOperator.getDistinctNonUnique());
    }

    @Test
    public void testJdbcInputOperator() {
        Connectable jdbcStore = new JdbcStore();
        jdbcStore.setDatabaseDriver("org.hsqldb.jdbcDriver");
        jdbcStore.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
        Context.OperatorContext mockOperatorContext = OperatorContextTestHelper.mockOperatorContext(OPERATOR_ID, defaultAttributeMap);
        TestInputOperator testInputOperator = new TestInputOperator();
        testInputOperator.setStore(jdbcStore);
        insertEventsInTable(10);
        testInputOperator.outputPort.setSink(new CollectorTestSink());
        testInputOperator.setup(mockOperatorContext);
        testInputOperator.beginWindow(0L);
        testInputOperator.emitTuples();
        testInputOperator.endWindow();
        Assert.assertEquals("rows from db", 10L, r0.collectedTuples.size());
    }

    @Test
    public void testJdbcPojoInputOperator() {
        JdbcStore jdbcStore = new JdbcStore();
        jdbcStore.setDatabaseDriver("org.hsqldb.jdbcDriver");
        jdbcStore.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(DAG.APPLICATION_ID, APP_ID);
        Context.OperatorContext mockOperatorContext = OperatorContextTestHelper.mockOperatorContext(OPERATOR_ID, defaultAttributeMap);
        insertEvents(10, true, 0);
        JdbcPOJOInputOperator jdbcPOJOInputOperator = new JdbcPOJOInputOperator();
        jdbcPOJOInputOperator.setStore(jdbcStore);
        jdbcPOJOInputOperator.setTableName("test_pojo_event_table");
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(new FieldInfo("ID", "id", (FieldInfo.SupportType) null));
        newArrayList.add(new FieldInfo("STARTDATE", "startDate", (FieldInfo.SupportType) null));
        newArrayList.add(new FieldInfo("STARTTIME", "startTime", (FieldInfo.SupportType) null));
        newArrayList.add(new FieldInfo("STARTTIMESTAMP", "startTimestamp", (FieldInfo.SupportType) null));
        newArrayList.add(new FieldInfo("SCORE", "score", FieldInfo.SupportType.DOUBLE));
        jdbcPOJOInputOperator.setFieldInfos(newArrayList);
        jdbcPOJOInputOperator.setFetchSize(5);
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        jdbcPOJOInputOperator.outputPort.setSink(collectorTestSink);
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap2 = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap2.put(Context.PortContext.TUPLE_CLASS, JdbcOperatorTest.TestPOJOEvent.class);
        TestPortContext testPortContext = new TestPortContext(defaultAttributeMap2);
        jdbcPOJOInputOperator.setup(mockOperatorContext);
        jdbcPOJOInputOperator.outputPort.setup(testPortContext);
        jdbcPOJOInputOperator.activate(mockOperatorContext);
        jdbcPOJOInputOperator.beginWindow(0L);
        jdbcPOJOInputOperator.emitTuples();
        jdbcPOJOInputOperator.endWindow();
        Assert.assertEquals("rows from db", 5L, collectorTestSink.collectedTuples.size());
        int i = 0;
        for (JdbcOperatorTest.TestPOJOEvent testPOJOEvent : collectorTestSink.collectedTuples) {
            Assert.assertTrue("i=" + i, testPOJOEvent.getId() == i);
            Assert.assertTrue("date", testPOJOEvent.getStartDate() instanceof Date);
            Assert.assertTrue("time", testPOJOEvent.getStartTime() instanceof Time);
            Assert.assertTrue("timestamp", testPOJOEvent.getStartTimestamp() instanceof Timestamp);
            i++;
        }
        collectorTestSink.collectedTuples.clear();
        jdbcPOJOInputOperator.beginWindow(1L);
        jdbcPOJOInputOperator.emitTuples();
        jdbcPOJOInputOperator.endWindow();
        Assert.assertEquals("rows from db", 5L, collectorTestSink.collectedTuples.size());
        for (JdbcOperatorTest.TestPOJOEvent testPOJOEvent2 : collectorTestSink.collectedTuples) {
            Assert.assertTrue("i=" + i, testPOJOEvent2.getId() == i);
            Assert.assertTrue("date", testPOJOEvent2.getStartDate() instanceof Date);
            Assert.assertTrue("time", testPOJOEvent2.getStartTime() instanceof Time);
            Assert.assertTrue("timestamp", testPOJOEvent2.getStartTimestamp() instanceof Timestamp);
            Assert.assertTrue("score", testPOJOEvent2.getScore() == 55.4d);
            i++;
        }
        collectorTestSink.collectedTuples.clear();
        jdbcPOJOInputOperator.beginWindow(2L);
        jdbcPOJOInputOperator.emitTuples();
        jdbcPOJOInputOperator.endWindow();
        Assert.assertEquals("rows from db", 0L, collectorTestSink.collectedTuples.size());
        insertEvents(3, false, 10);
        jdbcPOJOInputOperator.beginWindow(3L);
        jdbcPOJOInputOperator.emitTuples();
        jdbcPOJOInputOperator.endWindow();
        Assert.assertEquals("rows from db", 3L, collectorTestSink.collectedTuples.size());
        for (JdbcOperatorTest.TestPOJOEvent testPOJOEvent3 : collectorTestSink.collectedTuples) {
            Assert.assertTrue("i=" + i, testPOJOEvent3.getId() == i);
            Assert.assertTrue("date", testPOJOEvent3.getStartDate() instanceof Date);
            Assert.assertTrue("time", testPOJOEvent3.getStartTime() instanceof Time);
            Assert.assertTrue("timestamp", testPOJOEvent3.getStartTimestamp() instanceof Timestamp);
            Assert.assertTrue("score", testPOJOEvent3.getScore() == 55.4d);
            i++;
        }
    }
}
