/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.coprocessor;

import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint;
import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={CoprocessorTests.class, MediumTests.class})
public class TestCoprocessorEndpoint {
    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestCoprocessorEndpoint.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestCoprocessorEndpoint.class);
    private static final TableName TEST_TABLE = TableName.valueOf((String)"TestCoprocessorEndpoint");
    private static final byte[] TEST_FAMILY = Bytes.toBytes((String)"TestFamily");
    private static final byte[] TEST_QUALIFIER = Bytes.toBytes((String)"TestQualifier");
    private static byte[] ROW = Bytes.toBytes((String)"testRow");
    private static final int ROWSIZE = 20;
    private static final int rowSeperator1 = 5;
    private static final int rowSeperator2 = 12;
    private static byte[][] ROWS = TestCoprocessorEndpoint.makeN(ROW, 20);
    private static HBaseTestingUtility util = new HBaseTestingUtility();

    @BeforeClass
    public static void setupBeforeClass() throws Exception {
        Configuration conf = util.getConfiguration();
        conf.setInt("hbase.client.operation.timeout", 5000);
        conf.setStrings("hbase.coprocessor.region.classes", new String[]{ColumnAggregationEndpoint.class.getName(), ProtobufCoprocessorService.class.getName()});
        conf.setStrings("hbase.coprocessor.master.classes", new String[]{ProtobufCoprocessorService.class.getName()});
        util.startMiniCluster(2);
        Admin admin = util.getAdmin();
        HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
        desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
        admin.createTable((TableDescriptor)desc, (byte[][])new byte[][]{ROWS[5], ROWS[12]});
        util.waitUntilAllRegionsAssigned(TEST_TABLE);
        Table table = util.getConnection().getTable(TEST_TABLE);
        for (int i = 0; i < 20; ++i) {
            Put put = new Put(ROWS[i]);
            put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes((int)i));
            table.put(put);
        }
        table.close();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        util.shutdownMiniCluster();
    }

    private Map<byte[], Long> sum(Table table, final byte[] family, final byte[] qualifier, byte[] start, byte[] end) throws ServiceException, Throwable {
        return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class, start, end, (Batch.Call)new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>(){

            public Long call(ColumnAggregationProtos.ColumnAggregationService instance) throws IOException {
                CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback();
                ColumnAggregationProtos.SumRequest.Builder builder = ColumnAggregationProtos.SumRequest.newBuilder();
                builder.setFamily(ByteStringer.wrap((byte[])family));
                if (qualifier != null && qualifier.length > 0) {
                    builder.setQualifier(ByteStringer.wrap((byte[])qualifier));
                }
                instance.sum(null, builder.build(), (RpcCallback)rpcCallback);
                return ((ColumnAggregationProtos.SumResponse)rpcCallback.get()).getSum();
            }
        });
    }

    @Test
    public void testAggregation() throws Throwable {
        Table table = util.getConnection().getTable(TEST_TABLE);
        Map<byte[], Long> results = this.sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0], ROWS[ROWS.length - 1]);
        int sumResult = 0;
        int expectedResult = 0;
        for (Map.Entry<byte[], Long> e : results.entrySet()) {
            LOG.info("Got value " + e.getValue() + " for region " + Bytes.toStringBinary((byte[])e.getKey()));
            sumResult = (int)((long)sumResult + e.getValue());
        }
        for (int i = 0; i < 20; ++i) {
            expectedResult += i;
        }
        Assert.assertEquals((String)"Invalid result", (long)expectedResult, (long)sumResult);
        results.clear();
        results = this.sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[5], ROWS[ROWS.length - 1]);
        sumResult = 0;
        expectedResult = 0;
        for (Map.Entry<byte[], Long> e : results.entrySet()) {
            LOG.info("Got value " + e.getValue() + " for region " + Bytes.toStringBinary((byte[])e.getKey()));
            sumResult = (int)((long)sumResult + e.getValue());
        }
        for (int i = 5; i < 20; ++i) {
            expectedResult += i;
        }
        Assert.assertEquals((String)"Invalid result", (long)expectedResult, (long)sumResult);
        table.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCoprocessorService() throws Throwable {
        List regions;
        Table table = util.getConnection().getTable(TEST_TABLE);
        try (RegionLocator rl = util.getConnection().getRegionLocator(TEST_TABLE);){
            regions = rl.getAllRegionLocations();
        }
        TestProtos.EchoRequestProto request = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
        final Map results = Collections.synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR));
        try {
            ServerRpcController controller = new ServerRpcController();
            table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class, ROWS[0], ROWS[ROWS.length - 1], (Batch.Call)new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>((RpcController)controller, request){
                final /* synthetic */ RpcController val$controller;
                final /* synthetic */ TestProtos.EchoRequestProto val$request;
                {
                    this.val$controller = rpcController;
                    this.val$request = echoRequestProto;
                }

                public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException {
                    LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
                    CoprocessorRpcUtils.BlockingRpcCallback callback = new CoprocessorRpcUtils.BlockingRpcCallback();
                    instance.echo(this.val$controller, this.val$request, (RpcCallback)callback);
                    TestProtos.EchoResponseProto response = (TestProtos.EchoResponseProto)callback.get();
                    LOG.debug("Batch.Call returning result " + response);
                    return response;
                }
            }, (Batch.Callback)new Batch.Callback<TestProtos.EchoResponseProto>(){

                public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
                    Assert.assertNotNull((Object)result);
                    Assert.assertEquals((Object)"hello", (Object)result.getMessage());
                    results.put(region, result.getMessage());
                }
            });
            for (Map.Entry e : results.entrySet()) {
                LOG.info("Got value " + (String)e.getValue() + " for region " + Bytes.toStringBinary((byte[])((byte[])e.getKey())));
            }
            Assert.assertEquals((long)3L, (long)results.size());
            for (HRegionLocation info : regions) {
                LOG.info("Region info is " + info.getRegionInfo().getRegionNameAsString());
                Assert.assertTrue((boolean)results.containsKey(info.getRegionInfo().getRegionName()));
            }
            results.clear();
            table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class, ROWS[5], ROWS[ROWS.length - 1], (Batch.Call)new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>((RpcController)controller, request){
                final /* synthetic */ RpcController val$controller;
                final /* synthetic */ TestProtos.EchoRequestProto val$request;
                {
                    this.val$controller = rpcController;
                    this.val$request = echoRequestProto;
                }

                public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException {
                    LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
                    CoprocessorRpcUtils.BlockingRpcCallback callback = new CoprocessorRpcUtils.BlockingRpcCallback();
                    instance.echo(this.val$controller, this.val$request, (RpcCallback)callback);
                    TestProtos.EchoResponseProto response = (TestProtos.EchoResponseProto)callback.get();
                    LOG.debug("Batch.Call returning result " + response);
                    return response;
                }
            }, (Batch.Callback)new Batch.Callback<TestProtos.EchoResponseProto>(){

                public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
                    Assert.assertNotNull((Object)result);
                    Assert.assertEquals((Object)"hello", (Object)result.getMessage());
                    results.put(region, result.getMessage());
                }
            });
            for (Map.Entry e : results.entrySet()) {
                LOG.info("Got value " + (String)e.getValue() + " for region " + Bytes.toStringBinary((byte[])((byte[])e.getKey())));
            }
            Assert.assertEquals((long)2L, (long)results.size());
        }
        finally {
            table.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCoprocessorServiceNullResponse() throws Throwable {
        List regions;
        Table table = util.getConnection().getTable(TEST_TABLE);
        try (RegionLocator rl = util.getConnection().getRegionLocator(TEST_TABLE);){
            regions = rl.getAllRegionLocations();
        }
        TestProtos.EchoRequestProto request = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
        try {
            ServerRpcController controller = new ServerRpcController();
            Map results = table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class, ROWS[0], ROWS[ROWS.length - 1], (Batch.Call)new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>((RpcController)controller, request){
                final /* synthetic */ RpcController val$controller;
                final /* synthetic */ TestProtos.EchoRequestProto val$request;
                {
                    this.val$controller = rpcController;
                    this.val$request = echoRequestProto;
                }

                public String call(TestRpcServiceProtos.TestProtobufRpcProto instance) throws IOException {
                    CoprocessorRpcUtils.BlockingRpcCallback callback = new CoprocessorRpcUtils.BlockingRpcCallback();
                    instance.echo(this.val$controller, this.val$request, (RpcCallback)callback);
                    TestProtos.EchoResponseProto response = (TestProtos.EchoResponseProto)callback.get();
                    LOG.debug("Batch.Call got result " + response);
                    return null;
                }
            });
            for (Map.Entry e : results.entrySet()) {
                LOG.info("Got value " + (String)e.getValue() + " for region " + Bytes.toStringBinary((byte[])((byte[])e.getKey())));
            }
            Assert.assertEquals((long)3L, (long)results.size());
            for (HRegionLocation region : regions) {
                HRegionInfo info = region.getRegionInfo();
                LOG.info("Region info is " + info.getRegionNameAsString());
                Assert.assertTrue((boolean)results.containsKey(info.getRegionName()));
                Assert.assertNull(results.get(info.getRegionName()));
            }
        }
        finally {
            table.close();
        }
    }

    @Test
    public void testMasterCoprocessorService() throws Throwable {
        Admin admin = util.getAdmin();
        TestProtos.EchoRequestProto request = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
        TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service = TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub((BlockingRpcChannel)admin.coprocessorService());
        Assert.assertEquals((Object)"hello", (Object)service.echo(null, request).getMessage());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCoprocessorError() throws Exception {
        Configuration configuration = new Configuration(util.getConfiguration());
        configuration.setInt("hbase.client.retries.number", 1);
        try (Table table = util.getConnection().getTable(TEST_TABLE);){
            CoprocessorRpcChannel protocol = table.coprocessorService(ROWS[0]);
            TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service = TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub((BlockingRpcChannel)protocol);
            service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
            Assert.fail((String)"Should have thrown an exception");
        }
    }

    @Test
    public void testMasterCoprocessorError() throws Throwable {
        Admin admin = util.getAdmin();
        TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service = TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub((BlockingRpcChannel)admin.coprocessorService());
        try {
            service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
            Assert.fail((String)"Should have thrown an exception");
        }
        catch (ServiceException serviceException) {
            // empty catch block
        }
    }

    private static byte[][] makeN(byte[] base, int n) {
        byte[][] ret = new byte[n][];
        for (int i = 0; i < n; ++i) {
            ret[i] = Bytes.add((byte[])base, (byte[])Bytes.toBytes((String)String.format("%02d", i)));
        }
        return ret;
    }
}

