package com.datatorrent.lib.db.jdbc;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.lib.db.Connectable;
import com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.netlet.util.DTThrowable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Random;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.class */
public class JdbcNonTransactionalBatchOutputOperatorTest {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcNonTransactionalBatchOutputOperatorTest.class);
    public static final int HALF_BATCH_SIZE = 5;
    public static final int BATCH_SIZE = 10;
    private static Connection con;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest$TestOutputOperator.class */
    public static class TestOutputOperator extends AbstractJdbcNonTransactionableBatchOutputOperator<JdbcNonTransactionalOutputOperatorTest.TestEvent, JdbcNonTransactionalStore> {
        private TestOutputOperator() {
        }

        protected String getUpdateCommand() {
            return JdbcNonTransactionalOutputOperatorTest.TestOutputOperator.INSERT_STMT;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void setStatementParameters(PreparedStatement preparedStatement, JdbcNonTransactionalOutputOperatorTest.TestEvent testEvent) throws SQLException {
            preparedStatement.setInt(1, testEvent.id);
        }

        public int getNumOfEventsInStore(Connection connection) {
            try {
                Statement createStatement = connection.createStatement();
                ResultSet executeQuery = createStatement.executeQuery("SELECT count(*) FROM test_event_table");
                executeQuery.next();
                int i = executeQuery.getInt(1);
                createStatement.close();
                return i;
            } catch (SQLException e) {
                throw new RuntimeException("fetching count", e);
            }
        }
    }

    @BeforeClass
    public static void setup() {
        JdbcOperatorTest.setup();
        try {
            Class.forName("org.hsqldb.jdbcDriver").newInstance();
            con = DriverManager.getConnection("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
        } catch (ClassNotFoundException e) {
            DTThrowable.rethrow(e);
        } catch (IllegalAccessException e2) {
            DTThrowable.rethrow(e2);
        } catch (InstantiationException e3) {
            DTThrowable.rethrow(e3);
        } catch (SQLException e4) {
            DTThrowable.rethrow(e4);
        }
    }

    @AfterClass
    public static void teardown() {
        JdbcOperatorTest.setup();
        try {
            con.close();
        } catch (SQLException e) {
            DTThrowable.rethrow(e);
        }
    }

    private static TestOutputOperator createOperator(Operator.ProcessingMode processingMode) {
        Connectable jdbcNonTransactionalStore = new JdbcNonTransactionalStore();
        jdbcNonTransactionalStore.setDatabaseDriver("org.hsqldb.jdbcDriver");
        jdbcNonTransactionalStore.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
        TestOutputOperator testOutputOperator = new TestOutputOperator();
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.OperatorContext.PROCESSING_MODE, processingMode);
        defaultAttributeMap.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, -1L);
        defaultAttributeMap.put(DAG.APPLICATION_ID, JdbcNonTransactionalOutputOperatorTest.APP_ID);
        OperatorContextTestHelper.TestIdOperatorContext testIdOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(0, defaultAttributeMap);
        testOutputOperator.setStore(jdbcNonTransactionalStore);
        testOutputOperator.setBatchSize(10);
        testOutputOperator.setup(testIdOperatorContext);
        return testOutputOperator;
    }

    @Test
    public void testBatch() {
        JdbcOperatorTest.cleanTable();
        Random random = new Random();
        TestOutputOperator createOperator = createOperator(Operator.ProcessingMode.AT_LEAST_ONCE);
        createOperator.beginWindow(0L);
        for (int i = 0; i < 10; i++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        createOperator.endWindow();
        Assert.assertEquals("Commit window id ", 0L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 10L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.beginWindow(1L);
        for (int i2 = 0; i2 < 5; i2++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        createOperator.endWindow();
        Assert.assertEquals("Commit window id ", 1L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should not be written", 10L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.beginWindow(2L);
        for (int i3 = 0; i3 < 5; i3++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        createOperator.endWindow();
        Assert.assertEquals("Commit window id ", 2L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should not be written", 20L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.teardown();
    }

    @Test
    public void testAtLeastOnceFullBatch() {
        JdbcOperatorTest.cleanTable();
        Random random = new Random();
        TestOutputOperator createOperator = createOperator(Operator.ProcessingMode.AT_LEAST_ONCE);
        createOperator.beginWindow(0L);
        for (int i = 0; i < 10; i++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        createOperator.endWindow();
        Assert.assertEquals("Commit window id ", 0L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 10L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.beginWindow(1L);
        for (int i2 = 0; i2 < 10; i2++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        Assert.assertEquals("Commit window id ", 0L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 20L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.getStore().disconnect();
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_LEAST_ONCE);
        defaultAttributeMap.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 0L);
        defaultAttributeMap.put(DAG.APPLICATION_ID, JdbcNonTransactionalOutputOperatorTest.APP_ID);
        createOperator.setup(new OperatorContextTestHelper.TestIdOperatorContext(0, defaultAttributeMap));
        Assert.assertEquals("Commit window id ", 0L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 20L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.beginWindow(0L);
        for (int i3 = 0; i3 < 10; i3++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        createOperator.endWindow();
        Assert.assertEquals("Commit window id ", 0L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 20L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.beginWindow(1L);
        for (int i4 = 0; i4 < 10; i4++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        createOperator.endWindow();
        Assert.assertEquals("Commit window id ", 1L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 30L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
    }

    @Test
    public void testAtLeastOnceHalfBatch() {
        JdbcOperatorTest.cleanTable();
        Random random = new Random();
        TestOutputOperator createOperator = createOperator(Operator.ProcessingMode.AT_LEAST_ONCE);
        createOperator.beginWindow(0L);
        for (int i = 0; i < 10; i++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        createOperator.endWindow();
        Assert.assertEquals("Commit window id ", 0L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 10L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.beginWindow(1L);
        for (int i2 = 0; i2 < 5; i2++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        Assert.assertEquals("Commit window id ", 0L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 10L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.getStore().disconnect();
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_LEAST_ONCE);
        defaultAttributeMap.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 0L);
        defaultAttributeMap.put(DAG.APPLICATION_ID, JdbcNonTransactionalOutputOperatorTest.APP_ID);
        createOperator.setup(new OperatorContextTestHelper.TestIdOperatorContext(0, defaultAttributeMap));
        Assert.assertEquals("Commit window id ", 0L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 10L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.beginWindow(0L);
        for (int i3 = 0; i3 < 10; i3++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        createOperator.endWindow();
        Assert.assertEquals("Commit window id ", 0L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 10L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.beginWindow(1L);
        for (int i4 = 0; i4 < 5; i4++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        createOperator.endWindow();
        Assert.assertEquals("Commit window id ", 1L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 20L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
    }

    @Test
    public void testAtMostOnceFullBatch() {
        JdbcOperatorTest.cleanTable();
        Random random = new Random();
        TestOutputOperator createOperator = createOperator(Operator.ProcessingMode.AT_MOST_ONCE);
        createOperator.beginWindow(0L);
        for (int i = 0; i < 10; i++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        createOperator.endWindow();
        Assert.assertEquals("Commit window id ", 0L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 10L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.beginWindow(1L);
        for (int i2 = 0; i2 < 10; i2++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        Assert.assertEquals("Commit window id ", 0L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 20L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.getStore().disconnect();
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE);
        defaultAttributeMap.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 0L);
        defaultAttributeMap.put(DAG.APPLICATION_ID, JdbcNonTransactionalOutputOperatorTest.APP_ID);
        createOperator.setup(new OperatorContextTestHelper.TestIdOperatorContext(0, defaultAttributeMap));
        createOperator.beginWindow(2L);
        for (int i3 = 0; i3 < 10; i3++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        createOperator.endWindow();
        Assert.assertEquals("Commit window id ", 2L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 30L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
    }

    @Test
    public void testAtMostOnceHalfBatch() {
        JdbcOperatorTest.cleanTable();
        Random random = new Random();
        TestOutputOperator createOperator = createOperator(Operator.ProcessingMode.AT_MOST_ONCE);
        createOperator.beginWindow(0L);
        for (int i = 0; i < 10; i++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        createOperator.endWindow();
        Assert.assertEquals("Commit window id ", 0L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 10L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.beginWindow(1L);
        for (int i2 = 0; i2 < 5; i2++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        Assert.assertEquals("Commit window id ", 0L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 10L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.getStore().disconnect();
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE);
        defaultAttributeMap.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 0L);
        defaultAttributeMap.put(DAG.APPLICATION_ID, JdbcNonTransactionalOutputOperatorTest.APP_ID);
        createOperator.setup(new OperatorContextTestHelper.TestIdOperatorContext(0, defaultAttributeMap));
        Assert.assertEquals("Commit window id ", 0L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 10L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
        createOperator.beginWindow(2L);
        for (int i3 = 0; i3 < 10; i3++) {
            createOperator.input.put(new JdbcNonTransactionalOutputOperatorTest.TestEvent(random.nextInt()));
        }
        createOperator.endWindow();
        Assert.assertEquals("Commit window id ", 2L, createOperator.getStore().getCommittedWindowId(JdbcNonTransactionalOutputOperatorTest.APP_ID, 0));
        Assert.assertEquals("Batch should be written", 20L, createOperator.getNumOfEventsInStore(createOperator.getStore().connection));
    }
}
