package com.datatorrent.contrib.hbase;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.contrib.aerospike.AerospikeTestUtils;
import com.datatorrent.contrib.util.TestPOJO;
import com.datatorrent.contrib.util.TupleGenerator;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.TableInfo;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/hbase/HBasePOJOPutOperatorTest.class */
public class HBasePOJOPutOperatorTest {
    private static final Logger logger = LoggerFactory.getLogger(HBasePOJOPutOperatorTest.class);
    public static final int TEST_SIZE = 15000;
    public static final int WINDOW_SIZE = 1500;
    private HBasePOJOPutOperator operator;
    private final long startWindowId = Calendar.getInstance().getTimeInMillis();
    private TupleGenerator<TestPOJO> tupleGenerator;

    @Before
    public void prepare() throws Exception {
        this.operator = new HBasePOJOPutOperator();
        setupOperator(this.operator);
        createOrDeleteTable((HBaseStore) this.operator.getStore(), false);
    }

    @After
    public void cleanup() throws Exception {
        createOrDeleteTable((HBaseStore) this.operator.getStore(), true);
    }

    @Test
    public void testPutInternal() throws Exception {
        writeRecords();
        readRecordsAndVerify();
    }

    protected void writeRecords() {
        long j = this.startWindowId;
        int i = 0;
        for (int i2 = 0; i2 < 15000; i2++) {
            if (i == 0) {
                try {
                    long j2 = j;
                    j = j2 + 1;
                    this.operator.beginWindow(j2);
                } catch (Exception e) {
                    logger.error("testPutInternal() exception.", e);
                    Assert.fail(e.getMessage());
                    return;
                }
            }
            this.operator.processTuple(getNextTuple());
            i++;
            if (i == 1500) {
                this.operator.endWindow();
                i = 0;
            }
        }
        Thread.sleep(30000L);
    }

    protected void createOrDeleteTable(HBaseStore hBaseStore, boolean z) throws Exception {
        HBaseAdmin hBaseAdmin = null;
        try {
            try {
                HBaseAdmin hBaseAdmin2 = new HBaseAdmin(hBaseStore.getConfiguration());
                String tableName = hBaseStore.getTableName();
                if (!hBaseAdmin2.isTableAvailable(tableName) && !z) {
                    HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
                    hTableDescriptor.addFamily(new HColumnDescriptor("f0"));
                    hTableDescriptor.addFamily(new HColumnDescriptor("f1"));
                    hBaseAdmin2.createTable(hTableDescriptor);
                } else if (z) {
                    hBaseAdmin2.disableTable(tableName);
                    hBaseAdmin2.deleteTable(tableName);
                }
                if (hBaseAdmin2 != null) {
                    try {
                        hBaseAdmin2.close();
                    } catch (Exception e) {
                        logger.warn("close admin exception. ", e);
                    }
                }
            } catch (Exception e2) {
                logger.error("exception", e2);
                throw e2;
            }
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    hBaseAdmin.close();
                } catch (Exception e3) {
                    logger.warn("close admin exception. ", e3);
                }
            }
            throw th;
        }
    }

    protected void setupOperator(HBasePOJOPutOperator hBasePOJOPutOperator) {
        configure(hBasePOJOPutOperator);
        Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
        defaultAttributeMap.put(Context.OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_LEAST_ONCE);
        defaultAttributeMap.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, -1L);
        defaultAttributeMap.put(DAG.APPLICATION_ID, "JdbcOperatorTest");
        hBasePOJOPutOperator.setup(OperatorContextTestHelper.mockOperatorContext(0, defaultAttributeMap));
    }

    protected void configure(HBasePOJOPutOperator hBasePOJOPutOperator) {
        TableInfo tableInfo = new TableInfo();
        tableInfo.setRowOrIdExpression("row");
        ArrayList arrayList = new ArrayList();
        arrayList.add(new HBaseFieldInfo("name", "name", FieldInfo.SupportType.STRING, "f0"));
        arrayList.add(new HBaseFieldInfo("age", "age", FieldInfo.SupportType.INTEGER, "f1"));
        arrayList.add(new HBaseFieldInfo("address", "address", FieldInfo.SupportType.STRING, "f1"));
        tableInfo.setFieldsInfo(arrayList);
        hBasePOJOPutOperator.setTableInfo(tableInfo);
        HBaseStore hBaseStore = new HBaseStore();
        hBaseStore.setTableName(AerospikeTestUtils.NAMESPACE);
        hBaseStore.setZookeeperQuorum("localhost");
        hBaseStore.setZookeeperClientPort(2181);
        hBasePOJOPutOperator.setStore(hBaseStore);
    }

    protected Object getNextTuple() {
        if (this.tupleGenerator == null) {
            this.tupleGenerator = new TupleGenerator<>(TestPOJO.class);
        }
        return this.tupleGenerator.getNextTuple();
    }

    protected void resetTupleGenerator() {
        if (this.tupleGenerator == null) {
            this.tupleGenerator = new TupleGenerator<>(TestPOJO.class);
        } else {
            this.tupleGenerator.reset();
        }
    }

    protected void readRecordsAndVerify() {
        int[] iArr = new int[TEST_SIZE];
        for (int i = 1; i <= 15000; i++) {
            iArr[i - 1] = 1;
        }
        try {
            ResultScanner scanner = this.operator.getStore().getTable().getScanner(new Scan());
            int i2 = 0;
            while (true) {
                Result next = scanner.next();
                if (next == null) {
                    break;
                }
                int intValue = Integer.valueOf(Bytes.toString(next.getRow())).intValue();
                Assert.assertTrue("rowId=" + intValue + " aut of range", intValue > 0 && intValue <= 15000);
                Assert.assertTrue("the rowId=" + intValue + " already processed.", iArr[intValue - 1] == 1);
                iArr[intValue - 1] = 0;
                List<Cell> listCells = next.listCells();
                HashMap hashMap = new HashMap();
                for (Cell cell : listCells) {
                    hashMap.put(Bytes.toString(CellUtil.cloneQualifier(cell)), CellUtil.cloneValue(cell));
                }
                TestPOJO from = TestPOJO.from(hashMap);
                from.setRowId(Long.valueOf(intValue));
                TestPOJO testPOJO = new TestPOJO(intValue);
                Assert.assertTrue(String.format("expected %s, get %s ", testPOJO.toString(), from.toString()), testPOJO.completeEquals(from));
                i2++;
            }
            int i3 = 0;
            if (i2 != 15000) {
                logger.error("unsaved records: ");
                StringBuilder sb = new StringBuilder();
                for (int i4 = 0; i4 < 15000; i4++) {
                    if (iArr[i4] != 0) {
                        sb.append(i4 + 1).append(", ");
                        i3++;
                    }
                    if (i3 > 0 && i3 % 20 == 0) {
                        logger.error(sb.toString());
                        sb.delete(0, sb.length());
                    }
                }
                logger.error(sb.toString());
                logger.error("End of unsaved records");
            }
            Assert.assertTrue("expected total records = 15000, got " + i2 + ", missed " + i3, i2 == 15000);
        } catch (Exception e) {
            throw new RuntimeException("exception", e);
        }
    }

    public HBasePOJOPutOperator getOperator() {
        return this.operator;
    }
}
