/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.AsyncProcess;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.MultiResponse;
import org.apache.hadoop.hbase.client.MultiServerCallable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.RetryingCallable;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;

@Category(value={MediumTests.class})
public class TestAsyncProcess {
    private static final TableName DUMMY_TABLE = TableName.valueOf((String)"DUMMY_TABLE");
    private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
    private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
    private static final byte[] FAILS = "FAILS".getBytes();
    private static final Configuration conf = new Configuration();
    private static ServerName sn = new ServerName("localhost:10,1254");
    private static HRegionInfo hri1 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2);
    private static HRegionInfo hri2 = new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW);
    private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
    private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
    private static final String success = "success";
    private static Exception failure = new Exception("failure");

    static MultiResponse createMultiResponse(HRegionLocation loc, MultiAction<Row> multi) {
        MultiResponse mr = new MultiResponse();
        for (Map.Entry entry : multi.actions.entrySet()) {
            for (Action a : (List)entry.getValue()) {
                if (Arrays.equals(FAILS, a.getAction().getRow())) {
                    mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), (Object)failure);
                    continue;
                }
                mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), (Object)success);
            }
        }
        return mr;
    }

    @Test
    public void testSubmit() throws Exception {
        HConnection hc = TestAsyncProcess.createHConnection();
        MyAsyncProcess ap = new MyAsyncProcess(hc, null, conf);
        ArrayList<Put> puts = new ArrayList<Put>();
        puts.add(this.createPut(true, true));
        ap.submit(puts, false);
        Assert.assertTrue((boolean)puts.isEmpty());
    }

    @Test
    public void testSubmitWithCB() throws Exception {
        HConnection hc = TestAsyncProcess.createHConnection();
        MyCB mcb = new MyCB();
        MyAsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, mcb, conf);
        ArrayList<Put> puts = new ArrayList<Put>();
        puts.add(this.createPut(true, true));
        ap.submit(puts, false);
        Assert.assertTrue((boolean)puts.isEmpty());
        while (mcb.successCalled.get() != 1 && !ap.hasError()) {
            Thread.sleep(1L);
        }
        Assert.assertEquals((long)mcb.successCalled.get(), (long)1L);
    }

    @Test
    public void testSubmitBusyRegion() throws Exception {
        HConnection hc = TestAsyncProcess.createHConnection();
        MyAsyncProcess ap = new MyAsyncProcess(hc, null, conf);
        ArrayList<Put> puts = new ArrayList<Put>();
        puts.add(this.createPut(true, true));
        ap.incTaskCounters(hri1.getEncodedName());
        ap.submit(puts, false);
        Assert.assertEquals((long)puts.size(), (long)1L);
        ap.decTaskCounters(hri1.getEncodedName());
        ap.submit(puts, false);
        Assert.assertTrue((boolean)puts.isEmpty());
    }

    @Test
    public void testFail() throws Exception {
        HConnection hc = TestAsyncProcess.createHConnection();
        MyCB mcb = new MyCB();
        MyAsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, mcb, conf);
        ArrayList<Put> puts = new ArrayList<Put>();
        Put p = this.createPut(true, false);
        puts.add(p);
        ap.submit(puts, false);
        Assert.assertTrue((boolean)puts.isEmpty());
        while (!ap.hasError()) {
            Thread.sleep(1L);
        }
        Assert.assertEquals((long)0L, (long)mcb.successCalled.get());
        Assert.assertEquals((long)2L, (long)mcb.retriableFailure.get());
        Assert.assertEquals((long)1L, (long)mcb.failureCalled.get());
        Assert.assertEquals((long)1L, (long)ap.getErrors().exceptions.size());
        Assert.assertTrue((String)("was: " + ap.getErrors().exceptions.get(0)), (boolean)failure.equals(ap.getErrors().exceptions.get(0)));
        Assert.assertTrue((String)("was: " + ap.getErrors().exceptions.get(0)), (boolean)failure.equals(ap.getErrors().exceptions.get(0)));
        Assert.assertEquals((long)1L, (long)ap.getFailedOperations().size());
        Assert.assertTrue((String)("was: " + ap.getFailedOperations().get(0)), (boolean)p.equals(ap.getFailedOperations().get(0)));
    }

    @Test
    public void testWaitForNextTaskDone() throws IOException {
        HConnection hc = TestAsyncProcess.createHConnection();
        MyCB mcb = new MyCB();
        final MyAsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, mcb, conf);
        ap.tasksSent.incrementAndGet();
        final AtomicBoolean checkPoint = new AtomicBoolean(false);
        final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
        Thread t = new Thread(){

            @Override
            public void run() {
                Threads.sleep((long)1000L);
                Assert.assertFalse((boolean)checkPoint.get());
                ap.tasksDone.incrementAndGet();
                checkPoint2.set(true);
            }
        };
        t.start();
        ap.waitForNextTaskDone(0L);
        checkPoint.set(true);
        while (!checkPoint2.get()) {
            Threads.sleep((long)1L);
        }
    }

    @Test
    public void testSubmitTrue() throws IOException {
        HConnection hc = TestAsyncProcess.createHConnection();
        MyCB mcb = new MyCB();
        final MyAsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, mcb, conf);
        ap.tasksSent.incrementAndGet();
        final AtomicInteger ai = new AtomicInteger(1);
        ap.taskCounterPerRegion.put(hri1.getEncodedName(), ai);
        final AtomicBoolean checkPoint = new AtomicBoolean(false);
        final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
        Thread t = new Thread(){

            @Override
            public void run() {
                Threads.sleep((long)1000L);
                Assert.assertFalse((boolean)checkPoint.get());
                ai.decrementAndGet();
                ap.tasksDone.incrementAndGet();
                checkPoint2.set(true);
            }
        };
        ArrayList<Put> puts = new ArrayList<Put>();
        Put p = this.createPut(true, true);
        puts.add(p);
        ap.submit(puts, false);
        Assert.assertFalse((boolean)puts.isEmpty());
        t.start();
        ap.submit(puts, true);
        Assert.assertTrue((boolean)puts.isEmpty());
        checkPoint.set(true);
        while (!checkPoint2.get()) {
            Threads.sleep((long)1L);
        }
    }

    @Test
    public void testFailAndSuccess() throws Exception {
        HConnection hc = TestAsyncProcess.createHConnection();
        MyCB mcb = new MyCB();
        MyAsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, mcb, conf);
        ArrayList<Put> puts = new ArrayList<Put>();
        puts.add(this.createPut(true, false));
        puts.add(this.createPut(true, true));
        puts.add(this.createPut(true, true));
        ap.submit(puts, false);
        Assert.assertTrue((boolean)puts.isEmpty());
        while (!ap.hasError()) {
            Thread.sleep(1L);
        }
        Assert.assertEquals((long)mcb.successCalled.get(), (long)2L);
        Assert.assertEquals((long)mcb.retriableFailure.get(), (long)2L);
        Assert.assertEquals((long)mcb.failureCalled.get(), (long)1L);
        Assert.assertEquals((long)1L, (long)ap.getErrors().actions.size());
        puts.add(this.createPut(true, true));
        ap.submit(puts, false);
        Assert.assertTrue((boolean)puts.isEmpty());
        while (mcb.successCalled.get() != 3) {
            Thread.sleep(1L);
        }
        Assert.assertEquals((long)mcb.retriableFailure.get(), (long)2L);
        Assert.assertEquals((long)mcb.failureCalled.get(), (long)1L);
        ap.clearErrors();
        Assert.assertTrue((boolean)ap.getErrors().actions.isEmpty());
    }

    @Test
    public void testFlush() throws Exception {
        HConnection hc = TestAsyncProcess.createHConnection();
        MyCB mcb = new MyCB();
        MyAsyncProcess<Object> ap = new MyAsyncProcess<Object>(hc, mcb, conf);
        ArrayList<Put> puts = new ArrayList<Put>();
        puts.add(this.createPut(true, false));
        puts.add(this.createPut(true, true));
        puts.add(this.createPut(true, true));
        ap.submit(puts, false);
        ap.waitUntilDone();
        Assert.assertEquals((long)mcb.successCalled.get(), (long)2L);
        Assert.assertEquals((long)mcb.retriableFailure.get(), (long)2L);
        Assert.assertEquals((long)mcb.failureCalled.get(), (long)1L);
        Assert.assertEquals((long)1L, (long)ap.getFailedOperations().size());
    }

    @Test
    public void testMaxTask() throws Exception {
        HConnection hc = TestAsyncProcess.createHConnection();
        final MyAsyncProcess ap = new MyAsyncProcess(hc, null, conf);
        for (int i = 0; i < 1000; ++i) {
            ap.incTaskCounters("dummy");
        }
        final Thread myThread = Thread.currentThread();
        Thread t = new Thread(){

            @Override
            public void run() {
                Threads.sleep((long)2000L);
                myThread.interrupt();
            }
        };
        ArrayList<Put> puts = new ArrayList<Put>();
        puts.add(this.createPut(true, true));
        t.start();
        try {
            ap.submit(puts, false);
            Assert.fail((String)"We should have been interrupted.");
        }
        catch (InterruptedIOException expected) {
            // empty catch block
        }
        long sleepTime = 2000L;
        Thread t2 = new Thread(){

            @Override
            public void run() {
                Threads.sleep((long)2000L);
                while (ap.tasksDone.get() > 0L) {
                    ap.decTaskCounters("dummy");
                }
            }
        };
        t2.start();
        long start = System.currentTimeMillis();
        ap.submit(new ArrayList(), false);
        long end = System.currentTimeMillis();
        Assert.assertTrue((start + 100L + 2000L > end ? 1 : 0) != 0);
    }

    private static HConnection createHConnection() throws IOException {
        HConnection hc = (HConnection)Mockito.mock(HConnection.class);
        Mockito.when((Object)hc.getRegionLocation((TableName)Mockito.eq((Object)DUMMY_TABLE), (byte[])Mockito.eq((Object)DUMMY_BYTES_1), Mockito.anyBoolean())).thenReturn((Object)loc1);
        Mockito.when((Object)hc.locateRegion((TableName)Mockito.eq((Object)DUMMY_TABLE), (byte[])Mockito.eq((Object)DUMMY_BYTES_1))).thenReturn((Object)loc1);
        Mockito.when((Object)hc.getRegionLocation((TableName)Mockito.eq((Object)DUMMY_TABLE), (byte[])Mockito.eq((Object)DUMMY_BYTES_2), Mockito.anyBoolean())).thenReturn((Object)loc2);
        Mockito.when((Object)hc.locateRegion((TableName)Mockito.eq((Object)DUMMY_TABLE), (byte[])Mockito.eq((Object)DUMMY_BYTES_2))).thenReturn((Object)loc2);
        Mockito.when((Object)hc.getRegionLocation((TableName)Mockito.eq((Object)DUMMY_TABLE), (byte[])Mockito.eq((Object)FAILS), Mockito.anyBoolean())).thenReturn((Object)loc2);
        Mockito.when((Object)hc.locateRegion((TableName)Mockito.eq((Object)DUMMY_TABLE), (byte[])Mockito.eq((Object)FAILS))).thenReturn((Object)loc2);
        return hc;
    }

    @Test
    public void testHTablePutSuccess() throws Exception {
        HTable ht = (HTable)Mockito.mock(HTable.class);
        HConnection hc = TestAsyncProcess.createHConnection();
        ht.ap = new MyAsyncProcess(hc, null, conf);
        Put put = this.createPut(true, true);
        Assert.assertEquals((long)0L, (long)ht.getWriteBufferSize());
        ht.put(put);
        Assert.assertEquals((long)0L, (long)ht.getWriteBufferSize());
    }

    private void doHTableFailedPut(boolean bufferOn) throws Exception {
        HTable ht = new HTable();
        HConnection hc = TestAsyncProcess.createHConnection();
        MyCB mcb = new MyCB();
        ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
        ht.setAutoFlush(true, true);
        if (bufferOn) {
            ht.setWriteBufferSize(0x100000L);
        } else {
            ht.setWriteBufferSize(0L);
        }
        Put put = this.createPut(true, false);
        Assert.assertEquals((long)0L, (long)ht.currentWriteBufferSize);
        try {
            ht.put(put);
            if (bufferOn) {
                ht.flushCommits();
            }
            Assert.fail();
        }
        catch (RetriesExhaustedException expected) {
            // empty catch block
        }
        Assert.assertEquals((long)0L, (long)ht.currentWriteBufferSize);
        Assert.assertEquals((long)0L, (long)mcb.successCalled.get());
        Assert.assertEquals((long)2L, (long)mcb.retriableFailure.get());
        Assert.assertEquals((long)1L, (long)mcb.failureCalled.get());
        ht.close();
    }

    @Test
    public void testHTableFailedPutWithBuffer() throws Exception {
        this.doHTableFailedPut(true);
    }

    @Test
    public void doHTableFailedPutWithoutBuffer() throws Exception {
        this.doHTableFailedPut(false);
    }

    @Test
    public void testHTableFailedPutAndNewPut() throws Exception {
        HTable ht = new HTable();
        HConnection hc = TestAsyncProcess.createHConnection();
        MyCB mcb = new MyCB();
        ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
        ht.setAutoFlush(false, true);
        ht.setWriteBufferSize(0L);
        Put p = this.createPut(true, false);
        ht.put(p);
        ht.ap.waitUntilDone();
        p = this.createPut(true, true);
        Assert.assertEquals((long)0L, (long)ht.writeAsyncBuffer.size());
        try {
            ht.put(p);
            Assert.fail();
        }
        catch (RetriesExhaustedException expected) {
            // empty catch block
        }
        Assert.assertEquals((String)"the put should not been inserted.", (long)0L, (long)ht.writeAsyncBuffer.size());
    }

    @Test
    public void testWithNoClearOnFail() throws IOException {
        HTable ht = new HTable();
        HConnection hc = TestAsyncProcess.createHConnection();
        MyCB mcb = new MyCB();
        ht.ap = new MyAsyncProcess<Object>(hc, mcb, conf);
        ht.setAutoFlush(false, false);
        Put p = this.createPut(true, false);
        ht.put(p);
        Assert.assertEquals((long)0L, (long)ht.writeAsyncBuffer.size());
        try {
            ht.flushCommits();
        }
        catch (RetriesExhaustedWithDetailsException expected) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)ht.writeAsyncBuffer.size());
        try {
            ht.close();
        }
        catch (RetriesExhaustedWithDetailsException expected) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)ht.writeAsyncBuffer.size());
    }

    @Test
    public void testBatch() throws IOException, InterruptedException {
        HTable ht = new HTable();
        ht.connection = new MyConnectionImpl();
        ArrayList<Put> puts = new ArrayList<Put>();
        puts.add(this.createPut(true, true));
        puts.add(this.createPut(true, true));
        puts.add(this.createPut(true, true));
        puts.add(this.createPut(true, true));
        puts.add(this.createPut(true, false));
        puts.add(this.createPut(true, true));
        puts.add(this.createPut(true, false));
        Object[] res = new Object[puts.size()];
        try {
            ht.processBatch(puts, res);
            Assert.fail();
        }
        catch (RetriesExhaustedException expected) {
            // empty catch block
        }
        Assert.assertEquals((Object)res[0], (Object)success);
        Assert.assertEquals((Object)res[1], (Object)success);
        Assert.assertEquals((Object)res[2], (Object)success);
        Assert.assertEquals((Object)res[3], (Object)success);
        Assert.assertEquals((Object)res[4], (Object)failure);
        Assert.assertEquals((Object)res[5], (Object)success);
        Assert.assertEquals((Object)res[6], (Object)failure);
    }

    @Test
    public void testErrorsServers() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
        HTable ht = new HTable();
        Configuration configuration = new Configuration(conf);
        configuration.setBoolean("hbase.client.retries.by.server", true);
        configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
        MyConnectionImpl mci = new MyConnectionImpl(configuration);
        ht.connection = mci;
        ht.ap = new MyAsyncProcess((HConnection)mci, null, configuration);
        Assert.assertTrue((boolean)ht.ap.useServerTrackerForRetries);
        Assert.assertNotNull((Object)ht.ap.createServerErrorTracker());
        Assert.assertTrue((ht.ap.serverTrackerTimeout > 10000 ? 1 : 0) != 0);
        ht.ap.serverTrackerTimeout = 1;
        Put p = this.createPut(true, false);
        ht.setAutoFlush(false);
        ht.put(p);
        long start = System.currentTimeMillis();
        try {
            ht.flushCommits();
            Assert.fail();
        }
        catch (RetriesExhaustedWithDetailsException expected) {
            // empty catch block
        }
        Assert.assertTrue((System.currentTimeMillis() - start < 10000L ? 1 : 0) != 0);
    }

    private Put createPut(boolean reg1, boolean success) {
        Put p = !success ? new Put(FAILS) : (reg1 ? new Put(DUMMY_BYTES_1) : new Put(DUMMY_BYTES_2));
        p.add(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
        return p;
    }

    private class MyCB
    implements AsyncProcess.AsyncProcessCallback<Object> {
        private AtomicInteger successCalled = new AtomicInteger(0);
        private AtomicInteger failureCalled = new AtomicInteger(0);
        private AtomicInteger retriableFailure = new AtomicInteger(0);

        private MyCB() {
        }

        public void success(int originalIndex, byte[] region, Row row, Object o) {
            this.successCalled.incrementAndGet();
        }

        public boolean failure(int originalIndex, byte[] region, Row row, Throwable t) {
            this.failureCalled.incrementAndGet();
            return true;
        }

        public boolean retriableFailure(int originalIndex, Row row, byte[] region, Throwable exception) {
            return this.retriableFailure.incrementAndGet() < 2;
        }
    }

    static class MyConnectionImpl
    extends HConnectionManager.HConnectionImplementation {
        MyAsyncProcess<?> ap;
        static final Configuration c = new Configuration();

        protected MyConnectionImpl() {
            super(c);
        }

        protected MyConnectionImpl(Configuration conf) {
            super(conf);
        }

        protected <R> AsyncProcess createAsyncProcess(TableName tableName, ExecutorService pool, AsyncProcess.AsyncProcessCallback<R> callback, Configuration conf) {
            this.ap = new MyAsyncProcess<R>((HConnection)this, callback, conf);
            return this.ap;
        }

        public HRegionLocation locateRegion(TableName tableName, byte[] row) {
            return loc1;
        }

        static {
            c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
        }
    }

    static class MyAsyncProcess<Res>
    extends AsyncProcess<Res> {
        public MyAsyncProcess(HConnection hc, AsyncProcess.AsyncProcessCallback<Res> callback, Configuration conf) {
            super(hc, DUMMY_TABLE, (ExecutorService)new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory((String)"test-TestAsyncProcess")), callback, conf, new RpcRetryingCallerFactory(conf));
        }

        protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
            final MultiResponse mr = TestAsyncProcess.createMultiResponse(callable.getLocation(), (MultiAction<Row>)callable.getMulti());
            return new RpcRetryingCaller<MultiResponse>(conf){

                public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable) throws IOException, RuntimeException {
                    return mr;
                }
            };
        }
    }
}

