package com.datatorrent.contrib.hbase;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.contrib.aerospike.AerospikeTestUtils;
import com.datatorrent.contrib.memcache.MemcachePOJOOperatorTest;
import com.datatorrent.contrib.util.TestPOJO;
import com.datatorrent.contrib.util.TupleCacheOutputOperator;
import com.datatorrent.contrib.util.TupleGenerateCacheOperator;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.TableInfo;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.class */
public class HBasePOJOInputOperatorTest {
    private static final Logger logger = LoggerFactory.getLogger(HBasePOJOInputOperatorTest.class);
    private final int TUPLE_NUM = MemcachePOJOOperatorTest.TUPLE_SIZE;
    private final long RUN_DURATION = 30000;
    private HBaseStore store;
    private HBasePOJOPutOperator hbaseOutputOperator;
    private TestHBasePOJOInputOperator hbaseInputOperator;

    /* loaded from: input_file:com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest$MyGenerator.class */
    public static class MyGenerator extends TupleGenerateCacheOperator<TestPOJO> {
        public MyGenerator() {
            setTupleType(TestPOJO.class);
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest$OPERATOR.class */
    public enum OPERATOR {
        GENERATOR,
        HBASEOUTPUT,
        HBASEINPUT,
        OUTPUT
    }

    /* loaded from: input_file:com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest$TestHBasePOJOInputOperator.class */
    public static class TestHBasePOJOInputOperator extends HBasePOJOInputOperator {
        public void setup(Context.OperatorContext operatorContext) {
            try {
                Thread.sleep(1000L);
                super.setup(operatorContext);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Before
    public void prepare() throws Exception {
        this.hbaseInputOperator = new TestHBasePOJOInputOperator();
        this.hbaseOutputOperator = new HBasePOJOPutOperator();
        setupOperators();
        HBaseUtil.createTable(this.store.getConfiguration(), this.store.getTableName());
    }

    @After
    public void cleanup() throws Exception {
        HBaseUtil.deleteTable(this.store.getConfiguration(), this.store.getTableName());
    }

    @Test
    public void test() throws Exception {
        LocalMode newInstance = LocalMode.newInstance();
        StreamingApplication streamingApplication = new StreamingApplication() { // from class: com.datatorrent.contrib.hbase.HBasePOJOInputOperatorTest.1
            public void populateDAG(DAG dag, Configuration configuration) {
            }
        };
        DAG dag = newInstance.getDAG();
        MyGenerator addOperator = dag.addOperator(OPERATOR.GENERATOR.name(), MyGenerator.class);
        addOperator.setTupleNum(MemcachePOJOOperatorTest.TUPLE_SIZE);
        this.hbaseOutputOperator = dag.addOperator(OPERATOR.HBASEOUTPUT.name(), this.hbaseOutputOperator);
        this.hbaseInputOperator = dag.addOperator(OPERATOR.HBASEINPUT.name(), this.hbaseInputOperator);
        dag.setOutputPortAttribute(this.hbaseInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, TestPOJO.class);
        TupleCacheOutputOperator addOperator2 = dag.addOperator(OPERATOR.OUTPUT.name(), TupleCacheOutputOperator.class);
        dag.addStream("queue1", addOperator.outputPort, this.hbaseOutputOperator.input).setLocality(DAG.Locality.NODE_LOCAL);
        dag.addStream("queue2", this.hbaseInputOperator.outputPort, addOperator2.inputPort).setLocality(DAG.Locality.NODE_LOCAL);
        newInstance.prepareDAG(streamingApplication, new Configuration(false));
        LocalMode.Controller controller = newInstance.getController();
        controller.runAsync();
        long currentTimeMillis = System.currentTimeMillis();
        do {
            try {
                Thread.sleep(1000L);
            } catch (Exception e) {
            }
            logger.info("Tuple row key: ", addOperator2.getReceivedTuples());
            logger.info("Received tuple number {}, instance is {}.", Integer.valueOf(addOperator2.getReceivedTuples() == null ? 0 : addOperator2.getReceivedTuples().size()), Integer.valueOf(System.identityHashCode(addOperator2)));
            if (addOperator2.getReceivedTuples() != null && addOperator2.getReceivedTuples().size() == 1000) {
                controller.shutdown();
                validate(addOperator.getTuples(), addOperator2.getReceivedTuples());
                return;
            }
        } while (System.currentTimeMillis() - currentTimeMillis <= 30000);
        throw new RuntimeException("Testcase taking too long");
    }

    protected void validate(List<TestPOJO> list, List<TestPOJO> list2) {
        logger.info("expected size: " + list.size());
        logger.info("actual size: " + list2.size());
        Assert.assertTrue(String.format("The expected size {%d} is different from actual size {%d}.", Integer.valueOf(list.size()), Integer.valueOf(list2.size())), list.size() == list2.size());
        list2.removeAll(list);
        Assert.assertTrue("content not same.", list2.isEmpty());
    }

    protected void setupOperators() {
        TableInfo tableInfo = new TableInfo();
        tableInfo.setRowOrIdExpression("row");
        ArrayList arrayList = new ArrayList();
        arrayList.add(new HBaseFieldInfo("name", "name", FieldInfo.SupportType.STRING, "f0"));
        arrayList.add(new HBaseFieldInfo("age", "age", FieldInfo.SupportType.INTEGER, "f1"));
        arrayList.add(new HBaseFieldInfo("address", "address", FieldInfo.SupportType.STRING, "f1"));
        tableInfo.setFieldsInfo(arrayList);
        this.hbaseInputOperator.setTableInfo(tableInfo);
        this.hbaseOutputOperator.setTableInfo(tableInfo);
        this.store = new HBaseStore();
        this.store.setTableName(AerospikeTestUtils.NAMESPACE);
        this.store.setZookeeperQuorum("localhost");
        this.store.setZookeeperClientPort(2181);
        this.hbaseInputOperator.setStore(this.store);
        this.hbaseOutputOperator.setStore(this.store);
        Context.OperatorContext mockOperatorContext = OperatorContextTestHelper.mockOperatorContext(0, new Attribute.AttributeMap.DefaultAttributeMap());
        this.hbaseInputOperator.setup(mockOperatorContext);
        this.hbaseOutputOperator.setup(mockOperatorContext);
    }
}
