package com.datatorrent.contrib.couchbase;

import com.couchbase.client.CouchbaseClient;
import com.couchbase.client.CouchbaseConnectionFactory;
import com.couchbase.client.CouchbaseConnectionFactoryBuilder;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Partitioner;
import com.datatorrent.lib.db.Connectable;
import com.datatorrent.lib.partitioner.StatelessPartitionerTest;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.netlet.util.DTThrowable;
import com.google.common.collect.Lists;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.couchbase.mock.Bucket;
import org.couchbase.mock.BucketConfiguration;
import org.couchbase.mock.CouchbaseMock;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/couchbase/CouchBaseInputOperatorTest.class */
public class CouchBaseInputOperatorTest {
    private static final String APP_ID = "CouchBaseInputOperatorTest";
    private static final String password = "";
    protected static ArrayList<String> keyList;
    private final int numNodes = 2;
    private final int numReplicas = 3;
    protected CouchbaseConnectionFactory connectionFactory;
    private static final Logger logger = LoggerFactory.getLogger(CouchBaseInputOperatorTest.class);
    protected static CouchbaseClient client = null;

    /* loaded from: input_file:com/datatorrent/contrib/couchbase/CouchBaseInputOperatorTest$TestInputOperator.class */
    public static class TestInputOperator extends AbstractCouchBaseInputOperator<String> {
        /* renamed from: getTuple, reason: merged with bridge method [inline-methods] */
        public String m16getTuple(Object obj) {
            String obj2 = obj.toString();
            CouchBaseInputOperatorTest.logger.debug("returned tuple is {}", obj2);
            return obj2;
        }

        /* renamed from: getKeys, reason: merged with bridge method [inline-methods] */
        public ArrayList<String> m15getKeys() {
            return CouchBaseInputOperatorTest.keyList;
        }

        public void insertEventsInTable(int i) {
            CouchBaseInputOperatorTest.logger.debug("number of events is {}", Integer.valueOf(i));
            for (int i2 = 0; i2 < i; i2++) {
                String valueOf = String.valueOf("Key" + (i2 * 10));
                CouchBaseInputOperatorTest.keyList.add(valueOf);
                try {
                    CouchBaseInputOperatorTest.client.set(valueOf, Integer.valueOf(i2 * 100)).get();
                } catch (InterruptedException e) {
                    DTThrowable.rethrow(e);
                } catch (ExecutionException e2) {
                    DTThrowable.rethrow(e2);
                }
            }
            CouchBaseInputOperatorTest.client.shutdown();
            CouchBaseInputOperatorTest.client = null;
        }
    }

    protected CouchbaseMock createMock(String str, String str2, BucketConfiguration bucketConfiguration) throws Exception {
        bucketConfiguration.numNodes = 2;
        bucketConfiguration.numReplicas = 3;
        bucketConfiguration.name = str;
        bucketConfiguration.type = Bucket.BucketType.COUCHBASE;
        bucketConfiguration.password = str2;
        bucketConfiguration.hostname = "localhost";
        ArrayList arrayList = new ArrayList();
        arrayList.add(bucketConfiguration);
        return new CouchbaseMock(0, arrayList);
    }

    @Test
    public void TestCouchBaseInputOperator() throws Exception {
        BucketConfiguration bucketConfiguration = new BucketConfiguration();
        CouchbaseConnectionFactoryBuilder couchbaseConnectionFactoryBuilder = new CouchbaseConnectionFactoryBuilder();
        CouchbaseMock createMock = createMock("default", password, bucketConfiguration);
        CouchbaseMock createMock2 = createMock("default", password, bucketConfiguration);
        createMock.start();
        createMock.waitForStartup();
        ArrayList arrayList = new ArrayList();
        int httpPort = createMock.getHttpPort();
        logger.debug("port is {}", Integer.valueOf(httpPort));
        createMock2.start();
        createMock2.waitForStartup();
        int httpPort2 = createMock2.getHttpPort();
        logger.debug("port is {}", Integer.valueOf(httpPort2));
        arrayList.add(new URI("http", null, "localhost", httpPort, "/pools", password, password));
        this.connectionFactory = couchbaseConnectionFactoryBuilder.buildCouchbaseConnection(arrayList, bucketConfiguration.name, bucketConfiguration.password);
        client = new CouchbaseClient(this.connectionFactory);
        Connectable couchBaseStore = new CouchBaseStore();
        keyList = new ArrayList<>();
        couchBaseStore.setBucket(bucketConfiguration.name);
        couchBaseStore.setPasswordConfig(password);
        couchBaseStore.setPassword(bucketConfiguration.password);
        couchBaseStore.setUriString("localhost:" + httpPort + ",localhost:" + httpPort);
        new Attribute.AttributeMap.DefaultAttributeMap().put(DAG.APPLICATION_ID, APP_ID);
        TestInputOperator testInputOperator = new TestInputOperator();
        testInputOperator.setStore(couchBaseStore);
        testInputOperator.insertEventsInTable(10);
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        testInputOperator.outputPort.setSink(collectorTestSink);
        Collection definePartitions = testInputOperator.definePartitions(Lists.newArrayList(), new StatelessPartitionerTest.PartitioningContextImpl((List) null, 0));
        Assert.assertEquals(2L, definePartitions.size());
        Iterator it = definePartitions.iterator();
        while (it.hasNext()) {
            Assert.assertNotSame(testInputOperator, ((Partitioner.Partition) it.next()).getPartitionedInstance());
        }
        ArrayList<AbstractCouchBaseInputOperator> newArrayList = Lists.newArrayList();
        Iterator it2 = definePartitions.iterator();
        while (it2.hasNext()) {
            TestInputOperator testInputOperator2 = (TestInputOperator) ((Partitioner.Partition) it2.next()).getPartitionedInstance();
            testInputOperator2.setServerURIString("localhost:" + httpPort);
            testInputOperator2.setStore(couchBaseStore);
            testInputOperator2.setup(null);
            testInputOperator2.outputPort.setSink(collectorTestSink);
            newArrayList.add(testInputOperator2);
            httpPort = httpPort2;
        }
        collectorTestSink.clear();
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            for (AbstractCouchBaseInputOperator abstractCouchBaseInputOperator : newArrayList) {
                abstractCouchBaseInputOperator.beginWindow(i);
                abstractCouchBaseInputOperator.emitTuples();
                abstractCouchBaseInputOperator.endWindow();
            }
            i++;
        }
        Assert.assertEquals("Tuples read should be same ", 10L, collectorTestSink.collectedTuples.size());
        Iterator it3 = newArrayList.iterator();
        while (it3.hasNext()) {
            ((AbstractCouchBaseInputOperator) it3.next()).teardown();
        }
        createMock.stop();
        createMock2.stop();
    }
}
