/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.hbase;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.hbase.HBaseIO;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class HBaseIOTest {
    @Rule
    public final transient TestPipeline p = TestPipeline.create();
    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private static HBaseTestingUtility htu;
    private static HBaseAdmin admin;
    private static Configuration conf;
    private static final byte[] COLUMN_FAMILY;
    private static final byte[] COLUMN_NAME;
    private static final byte[] COLUMN_EMAIL;

    @BeforeClass
    public static void beforeClass() throws Exception {
        conf.setInt("hbase.client.retries.number", 1);
        conf.setStrings("hbase.master.hostname", new String[]{"localhost"});
        conf.setStrings("hbase.regionserver.hostname", new String[]{"localhost"});
        htu = new HBaseTestingUtility(conf);
        htu.startMiniCluster(1, 4);
        admin = htu.getHBaseAdmin();
    }

    @AfterClass
    public static void afterClass() throws Exception {
        if (admin != null) {
            admin.close();
            admin = null;
        }
        if (htu != null) {
            htu.shutdownMiniCluster();
            htu = null;
        }
    }

    @Test
    public void testReadBuildsCorrectly() {
        HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId("table");
        Assert.assertEquals((Object)"table", (Object)read.getTableId());
        Assert.assertNotNull((String)"configuration", (Object)read.getConfiguration());
    }

    @Test
    public void testReadBuildsCorrectlyInDifferentOrder() {
        HBaseIO.Read read = HBaseIO.read().withTableId("table").withConfiguration(conf);
        Assert.assertEquals((Object)"table", (Object)read.getTableId());
        Assert.assertNotNull((String)"configuration", (Object)read.getConfiguration());
    }

    @Test
    public void testWriteBuildsCorrectly() {
        HBaseIO.Write write = HBaseIO.write().withConfiguration(conf).withTableId("table");
        Assert.assertEquals((Object)"table", (Object)write.getTableId());
        Assert.assertNotNull((String)"configuration", (Object)write.getConfiguration());
    }

    @Test
    public void testWriteBuildsCorrectlyInDifferentOrder() {
        HBaseIO.Write write = HBaseIO.write().withTableId("table").withConfiguration(conf);
        Assert.assertEquals((Object)"table", (Object)write.getTableId());
        Assert.assertNotNull((String)"configuration", (Object)write.getConfiguration());
    }

    @Test
    public void testWriteValidationFailsMissingTable() {
        HBaseIO.Write write = HBaseIO.write().withConfiguration(conf);
        this.thrown.expect(IllegalArgumentException.class);
        write.validate(null);
    }

    @Test
    public void testWriteValidationFailsMissingConfiguration() {
        HBaseIO.Write write = HBaseIO.write().withTableId("table");
        this.thrown.expect(IllegalArgumentException.class);
        write.validate(null);
    }

    @Test
    public void testReadingFailsTableDoesNotExist() throws Exception {
        String table = "TEST-TABLE-INVALID";
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage(String.format("Table %s does not exist", "TEST-TABLE-INVALID"));
        this.runReadTest(HBaseIO.read().withConfiguration(conf).withTableId("TEST-TABLE-INVALID"), new ArrayList<Result>());
    }

    @Test
    public void testReadingEmptyTable() throws Exception {
        String table = "TEST-EMPTY-TABLE";
        HBaseIOTest.createTable("TEST-EMPTY-TABLE");
        this.runReadTest(HBaseIO.read().withConfiguration(conf).withTableId("TEST-EMPTY-TABLE"), new ArrayList<Result>());
    }

    @Test
    public void testReading() throws Exception {
        String table = "TEST-MANY-ROWS-TABLE";
        int numRows = 1001;
        HBaseIOTest.createTable("TEST-MANY-ROWS-TABLE");
        HBaseIOTest.writeData("TEST-MANY-ROWS-TABLE", 1001);
        this.runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId("TEST-MANY-ROWS-TABLE"), 1001L);
    }

    @Test
    public void testReadingWithSplits() throws Exception {
        String table = "TEST-MANY-ROWS-SPLITS-TABLE";
        int numRows = 1500;
        int numRegions = 4;
        long bytesPerRow = 100L;
        HBaseIOTest.createTable("TEST-MANY-ROWS-SPLITS-TABLE");
        HBaseIOTest.writeData("TEST-MANY-ROWS-SPLITS-TABLE", 1500);
        HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId("TEST-MANY-ROWS-SPLITS-TABLE");
        HBaseIO.HBaseSource source = new HBaseIO.HBaseSource(read, null);
        List splits = source.split(37500L, null);
        Assert.assertThat((Object)splits, (Matcher)Matchers.hasSize((int)4));
        SourceTestUtils.assertSourcesEqualReferenceSource((BoundedSource)source, (List)splits, null);
    }

    @Test
    public void testReadingWithFilter() throws Exception {
        String table = "TEST-FILTER-TABLE";
        int numRows = 1001;
        HBaseIOTest.createTable("TEST-FILTER-TABLE");
        HBaseIOTest.writeData("TEST-FILTER-TABLE", 1001);
        String regex = ".*17.*";
        RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, (ByteArrayComparable)new RegexStringComparator(regex));
        HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId("TEST-FILTER-TABLE").withFilter((Filter)filter);
        this.runReadTestLength(read, 20L);
    }

    @Test
    public void testReadingWithKeyRange() throws Exception {
        String table = "TEST-KEY-RANGE-TABLE";
        int numRows = 1001;
        byte[] startRow = "2".getBytes();
        byte[] stopRow = "9".getBytes();
        ByteKey startKey = ByteKey.copyFrom((byte[])startRow);
        HBaseIOTest.createTable("TEST-KEY-RANGE-TABLE");
        HBaseIOTest.writeData("TEST-KEY-RANGE-TABLE", 1001);
        ByteKeyRange prefixRange = ByteKeyRange.ALL_KEYS.withEndKey(startKey);
        this.runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId("TEST-KEY-RANGE-TABLE").withKeyRange(prefixRange), 126L);
        ByteKeyRange suffixRange = ByteKeyRange.ALL_KEYS.withStartKey(startKey);
        this.runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId("TEST-KEY-RANGE-TABLE").withKeyRange(suffixRange), 875L);
        this.runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId("TEST-KEY-RANGE-TABLE").withKeyRange(startRow, stopRow), 441L);
    }

    @Test
    public void testReadingDisplayData() {
        HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId("fooTable");
        DisplayData displayData = DisplayData.from((HasDisplayData)read);
        Assert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"tableId", (String)"fooTable"));
        Assert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"configuration"));
    }

    @Test
    public void testWriting() throws Exception {
        String table = "table";
        String key = "key";
        String value = "value";
        HBaseIOTest.createTable("table");
        ((PCollection)this.p.apply("single row", (PTransform)Create.of(HBaseIOTest.makeWrite("key", "value"), (Object[])new KV[0]).withCoder(HBaseIO.WRITE_CODER))).apply("write", (PTransform)HBaseIO.write().withConfiguration(conf).withTableId("table"));
        this.p.run().waitUntilFinish();
        List<Result> results = HBaseIOTest.readTable("table", new Scan());
        Assert.assertEquals((long)1L, (long)results.size());
    }

    @Test
    public void testWritingFailsTableDoesNotExist() throws Exception {
        String table = "TEST-TABLE";
        PCollection emptyInput = (PCollection)this.p.apply((PTransform)Create.empty((Coder)HBaseIO.WRITE_CODER));
        emptyInput.apply("write", (PTransform)HBaseIO.write().withConfiguration(conf).withTableId("TEST-TABLE"));
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage(String.format("Table %s does not exist", "TEST-TABLE"));
        this.p.run();
    }

    @Test
    public void testWritingFailsBadElement() throws Exception {
        String table = "TEST-TABLE";
        String key = "KEY";
        HBaseIOTest.createTable("TEST-TABLE");
        ((PCollection)this.p.apply((PTransform)Create.of(HBaseIOTest.makeBadWrite("KEY"), (Object[])new KV[0]).withCoder(HBaseIO.WRITE_CODER))).apply((PTransform)HBaseIO.write().withConfiguration(conf).withTableId("TEST-TABLE"));
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectCause(Matchers.instanceOf(IllegalArgumentException.class));
        this.thrown.expectMessage("No columns to insert");
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testWritingDisplayData() {
        HBaseIO.Write write = HBaseIO.write().withTableId("fooTable").withConfiguration(conf);
        DisplayData displayData = DisplayData.from((HasDisplayData)write);
        Assert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"tableId", (String)"fooTable"));
    }

    private static void createTable(String tableId) throws Exception {
        byte[][] splitKeys = new byte[][]{"4".getBytes(), "8".getBytes(), "C".getBytes()};
        HBaseIOTest.createTable(tableId, COLUMN_FAMILY, splitKeys);
    }

    private static void createTable(String tableId, byte[] columnFamily, byte[][] splitKeys) throws Exception {
        TableName tableName = TableName.valueOf((String)tableId);
        HTableDescriptor desc = new HTableDescriptor(tableName);
        HColumnDescriptor colDef = new HColumnDescriptor(columnFamily);
        desc.addFamily(colDef);
        admin.createTable(desc, splitKeys);
    }

    private static void writeData(String tableId, int numRows) throws Exception {
        HConnection connection = admin.getConnection();
        TableName tableName = TableName.valueOf((String)tableId);
        BufferedMutator mutator = connection.getBufferedMutator(tableName);
        List<Mutation> mutations = HBaseIOTest.makeTableData(numRows);
        mutator.mutate(mutations);
        mutator.flush();
        mutator.close();
    }

    private static List<Mutation> makeTableData(int numRows) {
        ArrayList<Mutation> mutations = new ArrayList<Mutation>(numRows);
        for (int i = 0; i < numRows; ++i) {
            String prefix = String.format("%X", i % 16);
            byte[] rowKey = Bytes.toBytes((String)StringUtils.leftPad((String)("_" + String.valueOf(i)), (int)21, (String)prefix));
            byte[] value = Bytes.toBytes((String)String.valueOf(i));
            byte[] valueEmail = Bytes.toBytes((String)(String.valueOf(i) + "@email.com"));
            mutations.add((Mutation)new Put(rowKey).addColumn(COLUMN_FAMILY, COLUMN_NAME, value));
            mutations.add((Mutation)new Put(rowKey).addColumn(COLUMN_FAMILY, COLUMN_EMAIL, valueEmail));
        }
        return mutations;
    }

    private static ResultScanner scanTable(String tableId, Scan scan) throws Exception {
        Connection connection = ConnectionFactory.createConnection((Configuration)conf);
        TableName tableName = TableName.valueOf((String)tableId);
        Table table = connection.getTable(tableName);
        return table.getScanner(scan);
    }

    private static List<Result> readTable(String tableId, Scan scan) throws Exception {
        ResultScanner scanner = HBaseIOTest.scanTable(tableId, scan);
        ArrayList<Result> results = new ArrayList<Result>();
        for (Result result : scanner) {
            results.add(result);
        }
        scanner.close();
        return results;
    }

    private static KV<byte[], Iterable<Mutation>> makeWrite(String key, String value) {
        byte[] rowKey = key.getBytes(StandardCharsets.UTF_8);
        ArrayList<Mutation> mutations = new ArrayList<Mutation>();
        mutations.add(HBaseIOTest.makeMutation(key, value));
        return KV.of((Object)rowKey, mutations);
    }

    private static Mutation makeMutation(String key, String value) {
        byte[] rowKey = key.getBytes(StandardCharsets.UTF_8);
        return new Put(rowKey).addColumn(COLUMN_FAMILY, COLUMN_NAME, Bytes.toBytes((String)value)).addColumn(COLUMN_FAMILY, COLUMN_EMAIL, Bytes.toBytes((String)(value + "@email.com")));
    }

    private static KV<byte[], Iterable<Mutation>> makeBadWrite(String key) {
        Put put = new Put(key.getBytes());
        ArrayList<Put> mutations = new ArrayList<Put>();
        mutations.add(put);
        return KV.of((Object)key.getBytes(StandardCharsets.UTF_8), mutations);
    }

    private void runReadTest(HBaseIO.Read read, List<Result> expected) {
        String transformId = read.getTableId() + "_" + read.getKeyRange();
        PCollection rows = (PCollection)this.p.apply("Read" + transformId, (PTransform)read);
        PAssert.that((PCollection)rows).containsInAnyOrder(expected);
        this.p.run().waitUntilFinish();
    }

    private void runReadTestLength(HBaseIO.Read read, long numElements) {
        String transformId = read.getTableId() + "_" + read.getKeyRange();
        PCollection rows = (PCollection)this.p.apply("Read" + transformId, (PTransform)read);
        PAssert.thatSingleton((PCollection)((PCollection)rows.apply("Count" + transformId, Count.globally()))).isEqualTo((Object)numElements);
        this.p.run().waitUntilFinish();
    }

    static {
        conf = HBaseConfiguration.create();
        COLUMN_FAMILY = Bytes.toBytes((String)"info");
        COLUMN_NAME = Bytes.toBytes((String)"name");
        COLUMN_EMAIL = Bytes.toBytes((String)"email");
    }
}

