package com.datatorrent.contrib.cassandra;

import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Row;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.helper.TestPortContext;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/contrib/cassandra/AbstractUpsertOutputOperatorCountersTest.class */
public class AbstractUpsertOutputOperatorCountersTest {
    public static final String APP_ID = "TestCassandraUpsertOperator";
    public static final int OPERATOR_ID_FOR_COUNTER_COLUMNS = 1;
    CounterColumnUpdatesOperator counterUpdatesOperator = null;
    OperatorContextTestHelper.TestIdOperatorContext contextForCountersOperator;
    TestPortContext testPortContextForCounters;

    @Before
    public void setupApexContexts() throws Exception {
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(DAG.APPLICATION_ID, "TestCassandraUpsertOperator");
        this.contextForCountersOperator = new OperatorContextTestHelper.TestIdOperatorContext(1, defaultAttributeMap);
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap2 = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap2.put(Context.PortContext.TUPLE_CLASS, CounterColumnTableEntry.class);
        this.testPortContextForCounters = new TestPortContext(defaultAttributeMap2);
        this.counterUpdatesOperator = new CounterColumnUpdatesOperator();
        this.counterUpdatesOperator.setup(this.contextForCountersOperator);
        this.counterUpdatesOperator.activate(this.contextForCountersOperator);
        this.counterUpdatesOperator.input.setup(this.testPortContextForCounters);
    }

    @Test
    public void testForSingleRowInsertForCounterTables() throws Exception {
        CounterColumnTableEntry counterColumnTableEntry = new CounterColumnTableEntry();
        String str = new String("user1" + System.currentTimeMillis());
        counterColumnTableEntry.setUserId(str);
        counterColumnTableEntry.setUpdatecount(3L);
        UpsertExecutionContext upsertExecutionContext = new UpsertExecutionContext();
        upsertExecutionContext.setOverridingConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
        upsertExecutionContext.setPayload(counterColumnTableEntry);
        this.counterUpdatesOperator.beginWindow(9L);
        this.counterUpdatesOperator.input.process(upsertExecutionContext);
        this.counterUpdatesOperator.endWindow();
        List all = this.counterUpdatesOperator.session.execute("SELECT * FROM unittests.userupdates WHERE userid = '" + str + "'").all();
        Assert.assertEquals(all.size(), 1L);
        Assert.assertEquals(3L, ((Row) all.get(0)).getLong("updatecount"));
        CounterColumnTableEntry counterColumnTableEntry2 = new CounterColumnTableEntry();
        counterColumnTableEntry2.setUserId(str);
        counterColumnTableEntry2.setUpdatecount(2L);
        UpsertExecutionContext upsertExecutionContext2 = new UpsertExecutionContext();
        upsertExecutionContext2.setOverridingConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
        upsertExecutionContext2.setPayload(counterColumnTableEntry2);
        this.counterUpdatesOperator.beginWindow(10L);
        this.counterUpdatesOperator.input.process(upsertExecutionContext2);
        this.counterUpdatesOperator.endWindow();
        Assert.assertEquals(5L, ((Row) this.counterUpdatesOperator.session.execute("SELECT * FROM unittests.userupdates WHERE userid = '" + str + "'").all().get(0)).getLong("updatecount"));
        CounterColumnTableEntry counterColumnTableEntry3 = new CounterColumnTableEntry();
        counterColumnTableEntry3.setUserId(str);
        counterColumnTableEntry3.setUpdatecount(-1L);
        UpsertExecutionContext upsertExecutionContext3 = new UpsertExecutionContext();
        upsertExecutionContext3.setOverridingConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
        upsertExecutionContext3.setPayload(counterColumnTableEntry3);
        this.counterUpdatesOperator.beginWindow(11L);
        this.counterUpdatesOperator.input.process(upsertExecutionContext3);
        this.counterUpdatesOperator.endWindow();
        Assert.assertEquals(4L, ((Row) this.counterUpdatesOperator.session.execute("SELECT * FROM unittests.userupdates WHERE userid = '" + str + "'").all().get(0)).getLong("updatecount"));
    }
}
