package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.rsgroup.TestRSGroupsBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({MediumTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicator.class */
public class TestReplicator extends TestReplicationBase {

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicator.class);
    static final Logger LOG = LoggerFactory.getLogger(TestReplicator.class);
    static final int NUM_ROWS = 10;

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicator$FailureInjectingReplicationEndpointForTest.class */
    public static class FailureInjectingReplicationEndpointForTest extends ReplicationEndpointForTest {
        private final AtomicBoolean failNext = new AtomicBoolean(false);

        @Override // org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest
        protected Callable<Integer> createReplicator(List<WAL.Entry> list, int i, int i2) {
            return () -> {
                if (!this.failNext.compareAndSet(false, true)) {
                    if (this.failNext.compareAndSet(true, false)) {
                        throw new ServiceException("Injected failure");
                    }
                    return Integer.valueOf(i);
                }
                int replicateEntries = replicateEntries(list, i, i2);
                entriesCount += list.size();
                TestReplicator.LOG.info("Completed replicating batch " + System.identityHashCode(list) + " count=" + batchCount.incrementAndGet());
                return Integer.valueOf(replicateEntries);
            };
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicator$ReplicationEndpointForTest.class */
    public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint {
        protected static int entriesCount;
        protected static AtomicInteger batchCount = new AtomicInteger(0);
        private static final Object latch = new Object();
        private static AtomicBoolean useLatch = new AtomicBoolean(false);

        public static void resume() {
            useLatch.set(false);
            synchronized (latch) {
                latch.notifyAll();
            }
        }

        public static void pause() {
            useLatch.set(true);
        }

        public static void await() throws InterruptedException {
            if (useLatch.get()) {
                TestReplicator.LOG.info("Waiting on latch");
                synchronized (latch) {
                    latch.wait();
                }
                TestReplicator.LOG.info("Waited on latch, now proceeding");
            }
        }

        public static int getBatchCount() {
            return batchCount.get();
        }

        public static void setBatchCount(int i) {
            TestReplicator.LOG.info("SetBatchCount=" + i + ", old=" + getBatchCount());
            batchCount.set(i);
        }

        public static int getEntriesCount() {
            return entriesCount;
        }

        public static void setEntriesCount(int i) {
            TestReplicator.LOG.info("SetEntriesCount=" + i);
            entriesCount = i;
        }

        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            try {
                await();
            } catch (InterruptedException e) {
                TestReplicator.LOG.warn("Interrupted waiting for latch", e);
            }
            return super.replicate(replicateContext);
        }

        protected Callable<Integer> createReplicator(List<WAL.Entry> list, int i, int i2) {
            return () -> {
                int replicateEntries = replicateEntries(list, i, i2);
                entriesCount += list.size();
                TestReplicator.LOG.info("Completed replicating batch " + System.identityHashCode(list) + " count=" + batchCount.incrementAndGet());
                return Integer.valueOf(replicateEntries);
            };
        }
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        CONF1.setInt("hbase.ipc.max.request.size", 10240);
        TestReplicationBase.setUpBeforeClass();
    }

    @Test
    public void testReplicatorBatching() throws Exception {
        truncateTable(UTIL1, tableName);
        truncateTable(UTIL2, tableName);
        hbaseAdmin.addReplicationPeer("testReplicatorBatching", ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()).build());
        ReplicationEndpointForTest.setBatchCount(0);
        ReplicationEndpointForTest.setEntriesCount(0);
        try {
            ReplicationEndpointForTest.pause();
            try {
                byte[] bArr = new byte[8192];
                for (int i = 0; i < 10; i++) {
                    htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, (byte[]) null, bArr));
                }
                ReplicationEndpointForTest.resume();
                Waiter.waitFor(CONF1, TestRSGroupsBase.WAIT_TIMEOUT, new Waiter.ExplainingPredicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestReplicator.1
                    public boolean evaluate() throws Exception {
                        TestReplicator.LOG.info("Count=" + ReplicationEndpointForTest.getBatchCount());
                        return ReplicationEndpointForTest.getBatchCount() >= 10;
                    }

                    public String explainFailure() throws Exception {
                        return "We waited too long for expected replication of 10 entries";
                    }
                });
                Assert.assertEquals("We sent an incorrect number of batches", 10L, ReplicationEndpointForTest.getBatchCount());
                HBaseTestingUtility hBaseTestingUtility = UTIL2;
                Assert.assertEquals("We did not replicate enough rows", 10L, HBaseTestingUtility.countRows(htable2));
                hbaseAdmin.removeReplicationPeer("testReplicatorBatching");
            } catch (Throwable th) {
                ReplicationEndpointForTest.resume();
                throw th;
            }
        } catch (Throwable th2) {
            hbaseAdmin.removeReplicationPeer("testReplicatorBatching");
            throw th2;
        }
    }

    @Test
    public void testReplicatorWithErrors() throws Exception {
        truncateTable(UTIL1, tableName);
        truncateTable(UTIL2, tableName);
        hbaseAdmin.addReplicationPeer("testReplicatorWithErrors", ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()).build());
        FailureInjectingReplicationEndpointForTest.setBatchCount(0);
        FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
        try {
            FailureInjectingReplicationEndpointForTest.pause();
            try {
                byte[] bArr = new byte[8192];
                for (int i = 0; i < 10; i++) {
                    htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, (byte[]) null, bArr));
                }
                FailureInjectingReplicationEndpointForTest.resume();
                Waiter.waitFor(CONF1, TestRSGroupsBase.WAIT_TIMEOUT, new Waiter.ExplainingPredicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestReplicator.2
                    public boolean evaluate() throws Exception {
                        return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= 10;
                    }

                    public String explainFailure() throws Exception {
                        return "We waited too long for expected replication of 10 entries";
                    }
                });
                HBaseTestingUtility hBaseTestingUtility = UTIL2;
                Assert.assertEquals("We did not replicate enough rows", 10L, HBaseTestingUtility.countRows(htable2));
                hbaseAdmin.removeReplicationPeer("testReplicatorWithErrors");
            } catch (Throwable th) {
                FailureInjectingReplicationEndpointForTest.resume();
                throw th;
            }
        } catch (Throwable th2) {
            hbaseAdmin.removeReplicationPeer("testReplicatorWithErrors");
            throw th2;
        }
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        TestReplicationBase.tearDownAfterClass();
    }

    private void truncateTable(HBaseTestingUtility hBaseTestingUtility, TableName tableName) throws IOException {
        Admin admin = hBaseTestingUtility.getAdmin();
        admin.disableTable(tableName);
        admin.truncateTable(tableName, false);
    }
}
