/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.TestReplicationSyncUpToolBase;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ReplicationTests.class, LargeTests.class})
public class TestReplicationSyncUpToolWithBulkLoadedData
extends TestReplicationSyncUpToolBase {
    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class);

    protected void customizeClusterConf(Configuration conf) {
        conf.setBoolean("hbase.replication.bulkload.enabled", true);
        conf.set("hbase.replication.cluster.id", "12345");
        conf.set("hbase.replication.source.fs.conf.provider", TestSourceFSConfigurationProvider.class.getCanonicalName());
        String classes = conf.get("hbase.coprocessor.region.classes", "");
        if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) {
            classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint";
            conf.set("hbase.coprocessor.region.classes", classes);
        }
    }

    @Test
    public void testSyncUpTool() throws Exception {
        this.setupReplication();
        Iterator<String> randomHFileRangeListIterator = null;
        HashSet<String> randomHFileRanges = new HashSet<String>(16);
        for (int i = 0; i < 16; ++i) {
            randomHFileRanges.add(UTIL1.getRandomUUID().toString());
        }
        ArrayList randomHFileRangeList = new ArrayList(randomHFileRanges);
        Collections.sort(randomHFileRangeList);
        randomHFileRangeListIterator = randomHFileRangeList.iterator();
        this.loadAndReplicateHFiles(true, randomHFileRangeListIterator);
        this.mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
    }

    private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator) throws Exception {
        LOG.debug("mimicSyncUpAfterBulkLoad");
        this.shutDownTargetHBaseCluster();
        this.loadAndReplicateHFiles(false, randomHFileRangeListIterator);
        int rowCount_ht1Source = UTIL1.countRows(this.ht1Source);
        Assert.assertEquals((String)"t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", (long)206L, (long)rowCount_ht1Source);
        int rowCount_ht2Source = UTIL1.countRows(this.ht2Source);
        Assert.assertEquals((String)"t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", (long)406L, (long)rowCount_ht2Source);
        this.shutDownSourceHBaseCluster();
        this.restartTargetHBaseCluster(1);
        Thread.sleep(500L);
        int rowCountHt1TargetAtPeer1 = UTIL2.countRows(this.ht1TargetAtPeer1);
        int rowCountHt2TargetAtPeer1 = UTIL2.countRows(this.ht2TargetAtPeer1);
        Assert.assertEquals((String)"@Peer1 t1_syncup should still have 100 rows", (long)100L, (long)rowCountHt1TargetAtPeer1);
        Assert.assertEquals((String)"@Peer1 t2_syncup should still have 200 rows", (long)200L, (long)rowCountHt2TargetAtPeer1);
        this.syncUp(UTIL1);
        for (int i = 0; i < 50; ++i) {
            this.syncUp(UTIL1);
            rowCountHt1TargetAtPeer1 = UTIL2.countRows(this.ht1TargetAtPeer1);
            rowCountHt2TargetAtPeer1 = UTIL2.countRows(this.ht2TargetAtPeer1);
            if (i == 49) {
                if (rowCountHt1TargetAtPeer1 != 200 || rowCountHt2TargetAtPeer1 != 400) {
                    this.restartSourceHBaseCluster(1);
                    rowCount_ht1Source = UTIL1.countRows(this.ht1Source);
                    LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
                    rowCount_ht2Source = UTIL1.countRows(this.ht2Source);
                    LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
                }
                Assert.assertEquals((String)"@Peer1 t1_syncup should be sync up and have 200 rows", (long)200L, (long)rowCountHt1TargetAtPeer1);
                Assert.assertEquals((String)"@Peer1 t2_syncup should be sync up and have 400 rows", (long)400L, (long)rowCountHt2TargetAtPeer1);
            }
            if (rowCountHt1TargetAtPeer1 == 200 && rowCountHt2TargetAtPeer1 == 400) {
                LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
                break;
            }
            LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1);
            Thread.sleep(500L);
        }
    }

    private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave, Iterator<String> randomHFileRangeListIterator) throws Exception {
        LOG.debug("loadAndReplicateHFiles");
        byte[][][] hfileRanges = new byte[][][]{new byte[][]{Bytes.toBytes((String)randomHFileRangeListIterator.next()), Bytes.toBytes((String)randomHFileRangeListIterator.next())}};
        this.loadAndValidateHFileReplication("HFileReplication_1", TestReplicationBase.row, FAMILY, this.ht1Source, hfileRanges, 100);
        hfileRanges = new byte[][][]{new byte[][]{Bytes.toBytes((String)randomHFileRangeListIterator.next()), Bytes.toBytes((String)randomHFileRangeListIterator.next())}};
        this.loadAndValidateHFileReplication("HFileReplication_1", TestReplicationBase.row, NO_REP_FAMILY, this.ht1Source, hfileRanges, 3);
        hfileRanges = new byte[][][]{new byte[][]{Bytes.toBytes((String)randomHFileRangeListIterator.next()), Bytes.toBytes((String)randomHFileRangeListIterator.next())}};
        this.loadAndValidateHFileReplication("HFileReplication_1", TestReplicationBase.row, FAMILY, this.ht2Source, hfileRanges, 200);
        hfileRanges = new byte[][][]{new byte[][]{Bytes.toBytes((String)randomHFileRangeListIterator.next()), Bytes.toBytes((String)randomHFileRangeListIterator.next())}};
        this.loadAndValidateHFileReplication("HFileReplication_1", TestReplicationBase.row, NO_REP_FAMILY, this.ht2Source, hfileRanges, 3);
        if (verifyReplicationOnSlave) {
            this.wait(this.ht1TargetAtPeer1, UTIL1.countRows(this.ht1Source) - 3, "t1_syncup has 103 rows on source, and 100 on slave1");
            this.wait(this.ht2TargetAtPeer1, UTIL1.countRows(this.ht2Source) - 3, "t2_syncup has 203 rows on source, and 200 on slave1");
        }
    }

    private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam, Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
        Path dir = UTIL1.getDataTestDirOnTestFS(testName);
        FileSystem fs = UTIL1.getTestFileSystem();
        dir = dir.makeQualified(fs);
        Path familyDir = new Path(dir, Bytes.toString((byte[])fam));
        int hfileIdx = 0;
        for (byte[][] range : hfileRanges) {
            byte[] from = range[0];
            byte[] to = range[1];
            HFileTestUtil.createHFile((Configuration)UTIL1.getConfiguration(), (FileSystem)fs, (Path)new Path(familyDir, "hfile_" + hfileIdx++), (byte[])fam, (byte[])row, (byte[])from, (byte[])to, (int)numOfRows);
        }
        TableName tableName = source.getName();
        BulkLoadHFiles loader = BulkLoadHFiles.create((Configuration)UTIL1.getConfiguration());
        loader.bulkLoad(tableName, dir);
    }

    private void wait(Table target, int expectedCount, String msg) throws IOException, InterruptedException {
        for (int i = 0; i < 50; ++i) {
            int rowCountHt2TargetAtPeer1 = UTIL2.countRows(target);
            if (i == 49) {
                Assert.assertEquals((String)msg, (long)expectedCount, (long)rowCountHt2TargetAtPeer1);
            }
            if (expectedCount == rowCountHt2TargetAtPeer1) break;
            Thread.sleep(500L);
        }
    }
}

