package com.datatorrent.contrib.memsql;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.class */
public class AbstractMemsqlInputOperatorTest {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractMemsqlInputOperatorTest.class);
    public static final String HOST_PREFIX = "jdbc:mysql://";
    public static final String HOST = "localhost";
    public static final String USER = "root";
    public static final String PORT = "3307";
    public static final String DATABASE = "bench";
    public static final String TABLE = "testtable";
    public static final String FQ_TABLE = "bench.testtable";
    public static final String INDEX_COLUMN = "data_index";
    public static final String DATA_COLUMN2 = "data2";
    public static final int BLAST_SIZE = 10;
    public static final int NUM_WINDOWS = 10;
    public static final int DATABASE_SIZE = 100;
    public static final int OPERATOR_ID = 0;

    public static void populateDatabase(MemsqlStore memsqlStore) {
        memsqlStore.connect();
        try {
            PreparedStatement prepareStatement = memsqlStore.getConnection().prepareStatement("insert into bench.testtable (data_index,data2) VALUES (?,?)");
            for (int i = 0; i < 100; i++) {
                prepareStatement.setInt(1, i);
                prepareStatement.setString(2, "Testname" + i);
                prepareStatement.executeUpdate();
            }
            prepareStatement.close();
        } catch (SQLException e) {
            LOG.error((String) null, e);
        }
        memsqlStore.disconnect();
    }

    public static void memsqlInitializeDatabase(MemsqlStore memsqlStore) throws SQLException {
        memsqlStore.connect();
        Statement createStatement = memsqlStore.getConnection().createStatement();
        createStatement.executeUpdate("drop database if exists bench");
        createStatement.executeUpdate("create database bench");
        memsqlStore.disconnect();
        memsqlStore.connect();
        Statement createStatement2 = memsqlStore.getConnection().createStatement();
        createStatement2.executeUpdate("create table bench.testtable(data_index INTEGER PRIMARY KEY, data2 VARCHAR(256))");
        createStatement2.executeUpdate("CREATE TABLE IF NOT EXISTS bench." + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, PRIMARY KEY (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ") )");
        createStatement2.close();
        memsqlStore.disconnect();
    }

    public static MemsqlStore createStore(MemsqlStore memsqlStore, boolean z) {
        if (memsqlStore == null) {
            memsqlStore = new MemsqlStore();
        }
        StringBuilder sb = new StringBuilder();
        String str = "jdbc:mysql://localhost:3307";
        if (z) {
            str = str + "/bench";
        }
        LOG.debug("Host name: {}", str);
        LOG.debug("User name: {}", "root");
        LOG.debug("Port: {}", "3307");
        memsqlStore.setDatabaseUrl(str);
        sb.append("user:").append("root").append(",");
        sb.append("port:").append("3307");
        String sb2 = sb.toString();
        LOG.debug(sb2);
        memsqlStore.setConnectionProperties(sb2);
        return memsqlStore;
    }

    public static void cleanDatabase() throws SQLException {
        memsqlInitializeDatabase(createStore(null, false));
    }

    @Test
    public void TestMemsqlInputOperator() throws SQLException {
        cleanDatabase();
        populateDatabase(createStore(null, true));
        MemsqlInputOperator memsqlInputOperator = new MemsqlInputOperator();
        createStore(memsqlInputOperator.getStore(), true);
        memsqlInputOperator.setBlastSize(10);
        memsqlInputOperator.setTablename(FQ_TABLE);
        memsqlInputOperator.setPrimaryKeyCol("data_index");
        memsqlInputOperator.setTablename(FQ_TABLE);
        memsqlInputOperator.outputPort.setSink(new CollectorTestSink());
        memsqlInputOperator.setup((Context.OperatorContext) null);
        for (int i = 0; i < 11; i++) {
            memsqlInputOperator.beginWindow(i);
            memsqlInputOperator.emitTuples();
            memsqlInputOperator.endWindow();
        }
        Assert.assertEquals("Number of tuples in database", 100L, r0.collectedTuples.size());
    }

    @Test
    public void TestMemsqlPOJOInputOperator() throws SQLException {
        cleanDatabase();
        populateDatabase(createStore(null, true));
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
        OperatorContextTestHelper.TestIdOperatorContext testIdOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(0, defaultAttributeMap);
        MemsqlPOJOInputOperator memsqlPOJOInputOperator = new MemsqlPOJOInputOperator();
        createStore(memsqlPOJOInputOperator.getStore(), true);
        memsqlPOJOInputOperator.setBatchSize(10);
        memsqlPOJOInputOperator.setTablename(FQ_TABLE);
        memsqlPOJOInputOperator.setPrimaryKeyColumn("data_index");
        ArrayList arrayList = new ArrayList();
        arrayList.add("id");
        arrayList.add("name");
        memsqlPOJOInputOperator.setExpressions(arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add("data_index");
        arrayList2.add("data2");
        memsqlPOJOInputOperator.setColumns(arrayList2);
        memsqlPOJOInputOperator.setQuery("select * from bench.testtable;");
        memsqlPOJOInputOperator.setOutputClass("com.datatorrent.contrib.memsql.TestInputPojo");
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        memsqlPOJOInputOperator.outputPort.setSink(collectorTestSink);
        memsqlPOJOInputOperator.setup(testIdOperatorContext);
        memsqlPOJOInputOperator.beginWindow(0L);
        memsqlPOJOInputOperator.emitTuples();
        memsqlPOJOInputOperator.endWindow();
        Assert.assertEquals("rows from db", 100L, collectorTestSink.collectedTuples.size());
        for (int i = 0; i < 10; i++) {
            TestInputPojo testInputPojo = (TestInputPojo) collectorTestSink.collectedTuples.get(i);
            Assert.assertEquals("id set in testpojo", i, testInputPojo.getId());
            Assert.assertEquals("name set in testpojo", "Testname" + i, testInputPojo.getName());
        }
        collectorTestSink.clear();
        memsqlPOJOInputOperator.setQuery("select * from bench.testtable where %p >= %s;");
        memsqlPOJOInputOperator.setStartRow(10);
        memsqlPOJOInputOperator.setup(testIdOperatorContext);
        memsqlPOJOInputOperator.beginWindow(0L);
        memsqlPOJOInputOperator.emitTuples();
        memsqlPOJOInputOperator.endWindow();
        Assert.assertEquals("rows from db", 90L, collectorTestSink.collectedTuples.size());
        collectorTestSink.clear();
        memsqlPOJOInputOperator.setQuery("select * from bench.testtable where %p >= %s LIMIT %l;");
        memsqlPOJOInputOperator.setStartRow(1);
        memsqlPOJOInputOperator.setBatchSize(10);
        memsqlPOJOInputOperator.setup(testIdOperatorContext);
        memsqlPOJOInputOperator.beginWindow(0L);
        memsqlPOJOInputOperator.emitTuples();
        memsqlPOJOInputOperator.endWindow();
        Assert.assertEquals("rows from db", 10L, collectorTestSink.collectedTuples.size());
    }
}
