package com.datatorrent.contrib.aerospike;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.Statement;
import com.datatorrent.api.Context;
import com.datatorrent.contrib.aerospike.AerospikeTestUtils;
import com.datatorrent.lib.db.Connectable;
import com.datatorrent.lib.db.TransactionableStore;
import com.datatorrent.lib.testbench.CollectorTestSink;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/contrib/aerospike/AerospikeOperatorTest.class */
public class AerospikeOperatorTest {
    private static String APP_ID = "AerospikeOperatorTest";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/contrib/aerospike/AerospikeOperatorTest$TestEvent.class */
    public static class TestEvent {
        int id;

        TestEvent(int i) {
            this.id = i;
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/aerospike/AerospikeOperatorTest$TestInputOperator.class */
    private static class TestInputOperator extends AbstractAerospikeGetOperator<TestEvent> {
        TestInputOperator() {
            AerospikeTestUtils.cleanTable();
        }

        /* renamed from: getTuple, reason: merged with bridge method [inline-methods] */
        public TestEvent m5getTuple(Record record) {
            return new TestEvent(record.getInt(AerospikeTestUtils.TestPOJO.ID));
        }

        public Statement queryToRetrieveData() {
            Statement statement = new Statement();
            statement.setNamespace(AerospikeTestUtils.NAMESPACE);
            statement.setSetName(AerospikeTestUtils.SET_NAME);
            return statement;
        }

        public void insertEventsInTable(int i) {
            AerospikeClient aerospikeClient = null;
            try {
                try {
                    aerospikeClient = new AerospikeClient("127.0.0.1", AerospikeTestUtils.PORT);
                    for (int i2 = 0; i2 < i; i2++) {
                        aerospikeClient.put((WritePolicy) null, new Key(AerospikeTestUtils.NAMESPACE, AerospikeTestUtils.SET_NAME, String.valueOf(i2)), new Bin[]{new Bin(AerospikeTestUtils.TestPOJO.ID, i2)});
                    }
                    if (null != aerospikeClient) {
                        aerospikeClient.close();
                    }
                } catch (AerospikeException e) {
                    throw e;
                }
            } catch (Throwable th) {
                if (null != aerospikeClient) {
                    aerospikeClient.close();
                }
                throw th;
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/aerospike/AerospikeOperatorTest$TestOutputOperator.class */
    private static class TestOutputOperator extends AbstractAerospikeTransactionalPutOperator<TestEvent> {
        TestOutputOperator() {
            AerospikeTestUtils.cleanTable();
            AerospikeTestUtils.cleanMetaTable();
        }

        protected Key getUpdatedBins(TestEvent testEvent, List<Bin> list) throws AerospikeException {
            Key key = new Key(AerospikeTestUtils.NAMESPACE, AerospikeTestUtils.SET_NAME, String.valueOf(testEvent.id));
            list.add(new Bin(AerospikeTestUtils.TestPOJO.ID, testEvent.id));
            return key;
        }

        protected /* bridge */ /* synthetic */ Key getUpdatedBins(Object obj, List list) throws AerospikeException {
            return getUpdatedBins((TestEvent) obj, (List<Bin>) list);
        }
    }

    @Test
    public void TestAerospikeOutputOperator() {
        TransactionableStore transactionalStore = AerospikeTestUtils.getTransactionalStore();
        Context.OperatorContext operatorContext = AerospikeTestUtils.getOperatorContext(APP_ID);
        TestOutputOperator testOutputOperator = new TestOutputOperator();
        testOutputOperator.setStore(transactionalStore);
        testOutputOperator.setup(operatorContext);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new TestEvent(i));
        }
        testOutputOperator.beginWindow(0L);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            testOutputOperator.input.process((TestEvent) it.next());
        }
        testOutputOperator.endWindow();
        Assert.assertEquals("rows in db", 10L, AerospikeTestUtils.getNumOfEventsInStore());
    }

    @Test
    public void TestAerospikeInputOperator() {
        Connectable store = AerospikeTestUtils.getStore();
        Context.OperatorContext operatorContext = AerospikeTestUtils.getOperatorContext(APP_ID);
        TestInputOperator testInputOperator = new TestInputOperator();
        testInputOperator.setStore(store);
        testInputOperator.insertEventsInTable(10);
        testInputOperator.outputPort.setSink(new CollectorTestSink());
        testInputOperator.setup(operatorContext);
        testInputOperator.beginWindow(0L);
        testInputOperator.emitTuples();
        testInputOperator.endWindow();
        Assert.assertEquals("rows from db", 10L, r0.collectedTuples.size());
    }
}
