package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MockServer;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ReplicationTests.class, MediumTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.class */
public abstract class TestReplicationSourceManager {
    protected static Configuration conf;
    protected static HBaseTestingUtility utility;
    protected static Replication replication;
    protected static ReplicationSourceManager manager;
    protected static ReplicationSourceManager managerOfCluster;
    protected static ZKWatcher zkw;
    protected static TableDescriptor htd;
    protected static RegionInfo hri;
    protected static final String slaveId = "1";
    protected static FileSystem fs;
    protected static Path oldLogDir;
    protected static Path logDir;
    protected static Path remoteLogDir;
    protected static CountDownLatch latch;
    protected static NavigableMap<byte[], Integer> scopes;

    @Rule
    public TestName testName = new TestName();

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationSourceManager.class);
    protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationSourceManager.class);
    protected static final byte[] r1 = Bytes.toBytes("r1");
    protected static final byte[] r2 = Bytes.toBytes("r2");
    protected static final byte[] f1 = Bytes.toBytes(SpaceQuotaHelperForTests.F1);
    protected static final byte[] f2 = Bytes.toBytes("f2");
    protected static final TableName test = TableName.valueOf("test");
    protected static List<String> files = new ArrayList();

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager$DummyNodeFailoverWorker.class */
    static class DummyNodeFailoverWorker extends Thread {
        private Map<String, Set<String>> logZnodesMap;
        Server server;
        private ServerName deadRS;
        ReplicationQueueStorage rq;

        public DummyNodeFailoverWorker(ServerName serverName, Server server) throws Exception {
            this.deadRS = serverName;
            this.server = server;
            this.rq = ReplicationStorageFactory.getReplicationQueueStorage(this.server.getZooKeeper(), this.server.getConfiguration());
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    this.logZnodesMap = new HashMap();
                    Iterator it = this.rq.getAllQueues(this.deadRS).iterator();
                    while (it.hasNext()) {
                        Pair claimQueue = this.rq.claimQueue(this.deadRS, (String) it.next(), this.server.getServerName());
                        if (claimQueue != null) {
                            this.logZnodesMap.put(claimQueue.getFirst(), claimQueue.getSecond());
                        }
                    }
                    this.server.abort("Done with testing", (Throwable) null);
                    TestReplicationSourceManager.latch.countDown();
                } catch (Exception e) {
                    TestReplicationSourceManager.LOG.error("Got exception while running NodeFailoverWorker", e);
                    TestReplicationSourceManager.latch.countDown();
                }
            } catch (Throwable th) {
                TestReplicationSourceManager.latch.countDown();
                throw th;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int isLogZnodesMapPopulated() {
            Collection<Set<String>> values = this.logZnodesMap.values();
            if (values.size() > 1) {
                throw new RuntimeException("unexpected size of logZnodesMap: " + values.size());
            }
            if (values.size() != 1) {
                return 0;
            }
            Set<String> next = values.iterator().next();
            Iterator<String> it = TestReplicationSourceManager.files.iterator();
            while (it.hasNext()) {
                if (!next.contains(it.next())) {
                    return 0;
                }
            }
            return 1;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager$DummyServer.class */
    static class DummyServer extends MockServer {
        String hostname;

        DummyServer() {
            this.hostname = "hostname.example.org";
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DummyServer(String str) {
            this.hostname = str;
        }

        @Override // org.apache.hadoop.hbase.util.MockServer
        public Configuration getConfiguration() {
            return TestReplicationSourceManager.conf;
        }

        @Override // org.apache.hadoop.hbase.util.MockServer
        public ZKWatcher getZooKeeper() {
            return TestReplicationSourceManager.zkw;
        }

        @Override // org.apache.hadoop.hbase.util.MockServer
        public Connection getConnection() {
            return null;
        }

        @Override // org.apache.hadoop.hbase.util.MockServer
        public ChoreService getChoreService() {
            return null;
        }

        @Override // org.apache.hadoop.hbase.util.MockServer
        public ServerName getServerName() {
            return ServerName.valueOf(this.hostname, 1234, 1L);
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager$FailInitializeDummyReplicationSource.class */
    static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
        FailInitializeDummyReplicationSource() {
        }

        @Override // org.apache.hadoop.hbase.replication.ReplicationSourceDummy
        public void init(Configuration configuration, FileSystem fileSystem, ReplicationSourceManager replicationSourceManager, ReplicationQueueStorage replicationQueueStorage, ReplicationPeer replicationPeer, Server server, String str, UUID uuid, WALFileLengthProvider wALFileLengthProvider, MetricsSource metricsSource) throws IOException {
            throw new IOException("Failing deliberately");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void setupZkAndReplication() throws Exception {
        Assert.assertNotNull(conf);
        zkw = new ZKWatcher(conf, "test", (Abortable) null);
        ZKUtil.createWithParents(zkw, "/hbase/replication");
        ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
        ZKUtil.setData(zkw, "/hbase/replication/peers/1", Bytes.toBytes(conf.get("hbase.zookeeper.quorum") + ":" + conf.get("hbase.zookeeper.property.clientPort") + ":/1"));
        ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
        ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
        ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/sync-rep-state");
        ZKUtil.setData(zkw, "/hbase/replication/peers/1/sync-rep-state", ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES);
        ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/new-sync-rep-state");
        ZKUtil.setData(zkw, "/hbase/replication/peers/1/new-sync-rep-state", ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES);
        ZKUtil.createWithParents(zkw, "/hbase/replication/state");
        ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
        ZKClusterId.setClusterId(zkw, new ClusterId());
        CommonFSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
        fs = FileSystem.get(conf);
        oldLogDir = utility.getDataTestDir("oldWALs");
        logDir = utility.getDataTestDir("WALs");
        remoteLogDir = utility.getDataTestDir("remoteWALs");
        replication = new Replication();
        replication.initialize(new DummyServer(), fs, logDir, oldLogDir, new WALFactory(conf, "test", (Abortable) null, false));
        managerOfCluster = getManagerFromCluster();
        if (managerOfCluster != null) {
            managerOfCluster.addPeer(slaveId);
        }
        manager = replication.getReplicationManager();
        manager.addSource(slaveId);
        if (managerOfCluster != null) {
            waitPeer(slaveId, managerOfCluster, true);
        }
        waitPeer(slaveId, manager, true);
        htd = TableDescriptorBuilder.newBuilder(test).setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1).setScope(1).build()).setColumnFamily(ColumnFamilyDescriptorBuilder.of(f2)).build();
        scopes = new TreeMap(Bytes.BYTES_COMPARATOR);
        Iterator it = htd.getColumnFamilyNames().iterator();
        while (it.hasNext()) {
            scopes.put((byte[]) it.next(), 0);
        }
        hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(r1).setEndKey(r2).build();
    }

    private static ReplicationSourceManager getManagerFromCluster() {
        if (utility.getMiniHBaseCluster() == null) {
            return null;
        }
        return (ReplicationSourceManager) utility.getMiniHBaseCluster().getRegionServerThreads().stream().map((v0) -> {
            return v0.getRegionServer();
        }).findAny().map((v0) -> {
            return v0.getReplicationSourceService();
        }).map(replicationSourceService -> {
            return (Replication) replicationSourceService;
        }).map((v0) -> {
            return v0.getReplicationManager();
        }).get();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        if (manager != null) {
            manager.join();
        }
        utility.shutdownMiniCluster();
    }

    private void cleanLogDir() throws IOException {
        fs.delete(logDir, true);
        fs.delete(oldLogDir, true);
        fs.delete(remoteLogDir, true);
    }

    @Before
    public void setUp() throws Exception {
        LOG.info("Start " + this.testName.getMethodName());
        cleanLogDir();
    }

    @After
    public void tearDown() throws Exception {
        LOG.info("End " + this.testName.getMethodName());
        cleanLogDir();
        for (String str : (List) manager.getSources().stream().map((v0) -> {
            return v0.getPeerId();
        }).collect(Collectors.toList())) {
            if (!slaveId.equals(str)) {
                removePeerAndWait(str);
            }
        }
    }

    @Test
    public void testLogRoll() throws Exception {
        MultiVersionConcurrencyControl multiVersionConcurrencyControl = new MultiVersionConcurrencyControl();
        KeyValue keyValue = new KeyValue(r1, f1, r1);
        WALEdit wALEdit = new WALEdit();
        wALEdit.add(keyValue);
        WALFactory wALFactory = new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8"));
        wALFactory.getWALProvider().addWALActionsListener(new ReplicationSourceWALActionListener(conf, replication.getReplicationManager()));
        WAL wal = wALFactory.getWAL(hri);
        manager.init();
        TableDescriptor build = TableDescriptorBuilder.newBuilder(TableName.valueOf("tableame")).setColumnFamily(ColumnFamilyDescriptorBuilder.of(f1)).build();
        TreeMap treeMap = new TreeMap(Bytes.BYTES_COMPARATOR);
        Iterator it = build.getColumnFamilyNames().iterator();
        while (it.hasNext()) {
            treeMap.put((byte[]) it.next(), 0);
        }
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 >= 101) {
                break;
            }
            if (j2 > 1 && j2 % 20 == 0) {
                wal.rollWriter();
            }
            LOG.info(Long.toString(j2));
            wal.sync(wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, EnvironmentEdgeManager.currentTime(), multiVersionConcurrencyControl, treeMap), wALEdit));
            j = j2 + 1;
        }
        LOG.info("1000 and 1000");
        long j3 = 1000 + 101;
        LOG.info(j3 + " and " + j3);
        for (int i = 0; i < 3; i++) {
            wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, EnvironmentEdgeManager.currentTime(), multiVersionConcurrencyControl, treeMap), wALEdit);
        }
        wal.sync();
        int i2 = 0;
        Iterator it2 = ((Map) manager.getWALs().get(slaveId)).entrySet().iterator();
        while (it2.hasNext()) {
            i2 += ((NavigableSet) ((Map.Entry) it2.next()).getValue()).size();
        }
        Assert.assertEquals(6L, i2);
        wal.rollWriter();
        ReplicationSourceInterface replicationSourceInterface = (ReplicationSourceInterface) Mockito.mock(ReplicationSourceInterface.class);
        Mockito.when(replicationSourceInterface.getQueueId()).thenReturn(slaveId);
        Mockito.when(Boolean.valueOf(replicationSourceInterface.isRecovered())).thenReturn(false);
        Mockito.when(Boolean.valueOf(replicationSourceInterface.isSyncReplication())).thenReturn(false);
        manager.logPositionAndCleanOldLogs(replicationSourceInterface, new WALEntryBatch(0, ((ReplicationSourceInterface) manager.getSources().get(0)).getCurrentPath()));
        wal.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, EnvironmentEdgeManager.currentTime(), multiVersionConcurrencyControl, treeMap), wALEdit);
        wal.sync();
        Assert.assertEquals(1L, manager.getWALs().size());
    }

    @Test
    public void testClaimQueues() throws Exception {
        DummyServer dummyServer = new DummyServer("hostname0.example.org");
        ReplicationQueueStorage replicationQueueStorage = ReplicationStorageFactory.getReplicationQueueStorage(dummyServer.getZooKeeper(), dummyServer.getConfiguration());
        files.add("log1");
        files.add("log2");
        Iterator<String> it = files.iterator();
        while (it.hasNext()) {
            replicationQueueStorage.addWAL(dummyServer.getServerName(), slaveId, it.next());
        }
        DummyServer dummyServer2 = new DummyServer("dummyserver1.example.org");
        DummyServer dummyServer3 = new DummyServer("dummyserver2.example.org");
        DummyServer dummyServer4 = new DummyServer("dummyserver3.example.org");
        DummyNodeFailoverWorker dummyNodeFailoverWorker = new DummyNodeFailoverWorker(dummyServer.getServerName(), dummyServer2);
        DummyNodeFailoverWorker dummyNodeFailoverWorker2 = new DummyNodeFailoverWorker(dummyServer.getServerName(), dummyServer3);
        DummyNodeFailoverWorker dummyNodeFailoverWorker3 = new DummyNodeFailoverWorker(dummyServer.getServerName(), dummyServer4);
        latch = new CountDownLatch(3);
        dummyNodeFailoverWorker.start();
        dummyNodeFailoverWorker2.start();
        dummyNodeFailoverWorker3.start();
        latch.await();
        Assert.assertEquals(1L, 0 + dummyNodeFailoverWorker.isLogZnodesMapPopulated() + dummyNodeFailoverWorker2.isLogZnodesMapPopulated() + dummyNodeFailoverWorker3.isLogZnodesMapPopulated());
        dummyServer.abort("", (Throwable) null);
    }

    @Test
    public void testCleanupFailoverQueues() throws Exception {
        DummyServer dummyServer = new DummyServer("hostname1.example.org");
        ReplicationQueueStorage replicationQueueStorage = ReplicationStorageFactory.getReplicationQueueStorage(dummyServer.getZooKeeper(), dummyServer.getConfiguration());
        TreeSet treeSet = new TreeSet();
        String str = "testgroup." + EnvironmentEdgeManager.currentTime() + ".log1";
        String str2 = "testgroup." + EnvironmentEdgeManager.currentTime() + ".log2";
        treeSet.add(str);
        treeSet.add(str2);
        Iterator it = treeSet.iterator();
        while (it.hasNext()) {
            replicationQueueStorage.addWAL(dummyServer.getServerName(), slaveId, (String) it.next());
        }
        DummyServer dummyServer2 = new DummyServer("dummyserver1.example.org");
        ReplicationFactory.getReplicationPeers(dummyServer2.getZooKeeper(), dummyServer2.getConfiguration()).init();
        manager.claimQueue(dummyServer.getServerName(), slaveId);
        Assert.assertEquals(1L, manager.getWalsByIdRecoveredQueues().size());
        String str3 = "1-" + dummyServer.getServerName().getServerName();
        Assert.assertEquals(treeSet, ((Map) manager.getWalsByIdRecoveredQueues().get(str3)).get("testgroup"));
        ReplicationSourceInterface replicationSourceInterface = (ReplicationSourceInterface) Mockito.mock(ReplicationSourceInterface.class);
        Mockito.when(replicationSourceInterface.getQueueId()).thenReturn(str3);
        Mockito.when(Boolean.valueOf(replicationSourceInterface.isRecovered())).thenReturn(true);
        Mockito.when(Boolean.valueOf(replicationSourceInterface.isSyncReplication())).thenReturn(false);
        manager.cleanOldLogs(str2, false, replicationSourceInterface);
        Assert.assertEquals(Sets.newHashSet(new String[]{str2}), ((Map) manager.getWalsByIdRecoveredQueues().get(str3)).get("testgroup"));
    }

    @Test
    public void testCleanupUnknownPeerZNode() throws Exception {
        DummyServer dummyServer = new DummyServer("hostname2.example.org");
        ReplicationQueueStorage replicationQueueStorage = ReplicationStorageFactory.getReplicationQueueStorage(dummyServer.getZooKeeper(), dummyServer.getConfiguration());
        replicationQueueStorage.addWAL(dummyServer.getServerName(), "2", "testgroup.log1");
        replicationQueueStorage.addWAL(dummyServer.getServerName(), "2", "testgroup.log2");
        manager.claimQueue(dummyServer.getServerName(), "2");
        Iterator it = manager.getAllQueues().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((String) it.next()).startsWith(slaveId));
        }
    }

    @Test
    public void testCompactionWALEdits() throws Exception {
        TableName valueOf = TableName.valueOf("testCompactionWALEdits");
        ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), WALEdit.createCompaction(RegionInfoBuilder.newBuilder(valueOf).setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.EMPTY_END_ROW).build(), WALProtos.CompactionDescriptor.getDefaultInstance()), conf);
    }

    @Test
    public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
        TreeMap treeMap = new TreeMap(Bytes.BYTES_COMPARATOR);
        WALEdit bulkLoadWALEdit = getBulkLoadWALEdit(treeMap);
        WALKeyImpl wALKeyImpl = new WALKeyImpl(treeMap);
        ReplicationSourceWALActionListener.scopeWALEdits(wALKeyImpl, bulkLoadWALEdit, conf);
        Assert.assertNull("No bulk load entries scope should be added if bulk load replication is disabled.", wALKeyImpl.getReplicationScopes());
    }

    @Test
    public void testBulkLoadWALEdits() throws Exception {
        TreeMap treeMap = new TreeMap(Bytes.BYTES_COMPARATOR);
        WALEdit bulkLoadWALEdit = getBulkLoadWALEdit(treeMap);
        WALKeyImpl wALKeyImpl = new WALKeyImpl(treeMap);
        Configuration create = HBaseConfiguration.create(conf);
        create.setBoolean("hbase.replication.bulkload.enabled", true);
        ReplicationSourceWALActionListener.scopeWALEdits(wALKeyImpl, bulkLoadWALEdit, create);
        NavigableMap replicationScopes = wALKeyImpl.getReplicationScopes();
        Assert.assertTrue("This family scope is set to global, should be part of replication key scopes.", replicationScopes.containsKey(f1));
        Assert.assertFalse("This family scope is set to local, should not be part of replication key scopes", replicationScopes.containsKey(f2));
    }

    @Test
    public void testPeerRemovalCleanup() throws Exception {
        String str = conf.get("replication.replicationsource.implementation");
        ReplicationPeerConfig build = ReplicationPeerConfig.newBuilder().setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build();
        try {
            DummyServer dummyServer = new DummyServer();
            ReplicationQueueStorage replicationQueueStorage = ReplicationStorageFactory.getReplicationQueueStorage(dummyServer.getZooKeeper(), dummyServer.getConfiguration());
            conf.set("replication.replicationsource.implementation", FailInitializeDummyReplicationSource.class.getName());
            manager.getReplicationPeers();
            addPeerAndWait("FakePeer", build, false);
            Assert.assertNull(manager.getSource("FakePeer"));
            replicationQueueStorage.addWAL(dummyServer.getServerName(), "FakePeer", "FakeFile");
            removePeerAndWait("FakePeer");
            Assert.assertFalse(replicationQueueStorage.getAllQueues(dummyServer.getServerName()).contains("FakePeer"));
            conf.set("replication.replicationsource.implementation", str);
            removePeerAndWait("FakePeer");
        } catch (Throwable th) {
            conf.set("replication.replicationsource.implementation", str);
            removePeerAndWait("FakePeer");
            throw th;
        }
    }

    private static MetricsReplicationSourceSource getGlobalSource() throws Exception {
        ReplicationSourceInterface source = manager.getSource(slaveId);
        Field declaredField = MetricsSource.class.getDeclaredField("globalSourceSource");
        declaredField.setAccessible(true);
        return (MetricsReplicationSourceSource) declaredField.get(source.getSourceMetrics());
    }

    private static long getSizeOfLatestPath() {
        if (utility.getMiniHBaseCluster() == null) {
            return 0L;
        }
        return utility.getMiniHBaseCluster().getRegionServerThreads().stream().map((v0) -> {
            return v0.getRegionServer();
        }).map((v0) -> {
            return v0.getReplicationSourceService();
        }).map(replicationSourceService -> {
            return (Replication) replicationSourceService;
        }).map((v0) -> {
            return v0.getReplicationManager();
        }).mapToLong((v0) -> {
            return v0.getSizeOfLatestPath();
        }).sum();
    }

    @Test
    public void testRemovePeerMetricsCleanup() throws Exception {
        ReplicationPeerConfig build = ReplicationPeerConfig.newBuilder().setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build();
        try {
            int sizeOfLogQueue = getGlobalSource().getSizeOfLogQueue();
            long sizeOfLatestPath = getSizeOfLatestPath();
            addPeerAndWait("DummyPeer", build, true);
            Assert.assertEquals(sizeOfLatestPath + sizeOfLogQueue, r0.getSizeOfLogQueue());
            ReplicationSourceInterface source = manager.getSource("DummyPeer");
            Assert.assertNotNull(source);
            int sizeOfLogQueue2 = source.getSourceMetrics().getSizeOfLogQueue();
            source.enqueueLog(new Path("abc"));
            Assert.assertEquals(1 + sizeOfLogQueue2, source.getSourceMetrics().getSizeOfLogQueue());
            Assert.assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + sizeOfLogQueue, r0.getSizeOfLogQueue());
            removePeerAndWait("DummyPeer");
            Assert.assertEquals(sizeOfLogQueue, r0.getSizeOfLogQueue());
            addPeerAndWait("DummyPeer", build, true);
            Assert.assertNotNull(manager.getSource("DummyPeer"));
            Assert.assertEquals(r0.getSourceMetrics().getSizeOfLogQueue() + sizeOfLogQueue, r0.getSizeOfLogQueue());
            removePeerAndWait("DummyPeer");
        } catch (Throwable th) {
            removePeerAndWait("DummyPeer");
            throw th;
        }
    }

    private ReplicationSourceInterface mockReplicationSource(String str) {
        ReplicationSourceInterface replicationSourceInterface = (ReplicationSourceInterface) Mockito.mock(ReplicationSourceInterface.class);
        Mockito.when(replicationSourceInterface.getPeerId()).thenReturn(str);
        Mockito.when(replicationSourceInterface.getQueueId()).thenReturn(str);
        Mockito.when(Boolean.valueOf(replicationSourceInterface.isRecovered())).thenReturn(false);
        Mockito.when(Boolean.valueOf(replicationSourceInterface.isSyncReplication())).thenReturn(true);
        ReplicationPeerConfig replicationPeerConfig = (ReplicationPeerConfig) Mockito.mock(ReplicationPeerConfig.class);
        Mockito.when(replicationPeerConfig.getRemoteWALDir()).thenReturn(remoteLogDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
        ReplicationPeer replicationPeer = (ReplicationPeer) Mockito.mock(ReplicationPeer.class);
        Mockito.when(replicationPeer.getPeerConfig()).thenReturn(replicationPeerConfig);
        Mockito.when(replicationSourceInterface.getPeer()).thenReturn(replicationPeer);
        return replicationSourceInterface;
    }

    @Test
    public void testRemoveRemoteWALs() throws Exception {
        addPeerAndWait("1_2", ReplicationPeerConfig.newBuilder().setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build(), true);
        try {
            Path path = new Path(logDir, "remoteWAL-12345-1.12345.syncrep");
            manager.preLogRoll(path);
            manager.postLogRoll(path);
            Path path2 = new Path(remoteLogDir, slaveId);
            fs.mkdirs(path2);
            Path makeQualified = new Path(path2, "remoteWAL-12345-1.23456.syncrep").makeQualified(fs.getUri(), fs.getWorkingDirectory());
            fs.create(makeQualified).close();
            Path path3 = new Path(logDir, "remoteWAL-12345-1.23456.syncrep");
            manager.preLogRoll(path3);
            manager.postLogRoll(path3);
            manager.cleanOldLogs("remoteWAL-12345-1.23456.syncrep", true, mockReplicationSource("1_2"));
            Assert.assertTrue(fs.exists(makeQualified));
            manager.cleanOldLogs("remoteWAL-12345-1.23456.syncrep", true, mockReplicationSource(slaveId));
            Assert.assertFalse(fs.exists(makeQualified));
            removePeerAndWait("1_2");
        } catch (Throwable th) {
            removePeerAndWait("1_2");
            throw th;
        }
    }

    @Test
    public void testSameWALPrefix() throws IOException {
        Set set = (Set) manager.getLastestPath().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toSet());
        manager.preLogRoll(new Path("localhost,8080,12345-45678-Peer.34567"));
        manager.preLogRoll(new Path("localhost,8080,12345.56789"));
        Set set2 = (Set) manager.getLastestPath().stream().map((v0) -> {
            return v0.getName();
        }).filter(str -> {
            return !set.contains(str);
        }).collect(Collectors.toSet());
        Assert.assertEquals(2L, set2.size());
        Assert.assertTrue(set2.contains("localhost,8080,12345-45678-Peer.34567"));
        Assert.assertTrue(set2.contains("localhost,8080,12345.56789"));
    }

    private void addPeerAndWait(String str, ReplicationPeerConfig replicationPeerConfig, boolean z) throws Exception {
        manager.getReplicationPeers().getPeerStorage().addPeer(str, replicationPeerConfig, true, SyncReplicationState.NONE);
        try {
            manager.addPeer(str);
        } catch (Exception e) {
        }
        waitPeer(str, manager, z);
        if (managerOfCluster != null) {
            managerOfCluster.addPeer(str);
            waitPeer(str, managerOfCluster, z);
        }
    }

    private static void waitPeer(String str, ReplicationSourceManager replicationSourceManager, boolean z) {
        ReplicationPeers replicationPeers = replicationSourceManager.getReplicationPeers();
        Waiter.waitFor(conf, 20000L, () -> {
            if (!z) {
                return replicationPeers.getPeer(str) != null;
            }
            ReplicationSourceInterface source = replicationSourceManager.getSource(str);
            if (source == null) {
                return false;
            }
            if (source instanceof ReplicationSourceDummy) {
                return ((ReplicationSourceDummy) source).isStartup();
            }
            return true;
        });
    }

    private void removePeerAndWait(final String str) throws Exception {
        final ReplicationPeers replicationPeers = manager.getReplicationPeers();
        if (replicationPeers.getPeerStorage().listPeerIds().contains(str)) {
            replicationPeers.getPeerStorage().removePeer(str);
            try {
                manager.removePeer(str);
            } catch (Exception e) {
            }
        }
        Waiter.waitFor(conf, 20000L, new Waiter.Predicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestReplicationSourceManager.1
            public boolean evaluate() throws Exception {
                return !TestReplicationSourceManager.manager.getAllQueues().contains(str) && replicationPeers.getPeer(str) == null && !replicationPeers.getPeerStorage().listPeerIds().contains(str) && TestReplicationSourceManager.manager.getSource(str) == null;
            }
        });
    }

    private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> navigableMap) {
        HashMap hashMap = new HashMap(1);
        HashMap hashMap2 = new HashMap(1);
        ArrayList arrayList = new ArrayList(1);
        Path path = new Path(Bytes.toString(f1));
        arrayList.add(path);
        try {
            hashMap2.put(path.getName(), Long.valueOf(fs.getFileStatus(path).getLen()));
        } catch (IOException e) {
            LOG.debug("Failed to calculate the size of hfile " + path);
            hashMap2.put(path.getName(), 0L);
        }
        hashMap.put(f1, arrayList);
        navigableMap.put(f1, 1);
        ArrayList arrayList2 = new ArrayList(1);
        Path path2 = new Path(Bytes.toString(f2));
        arrayList2.add(path2);
        try {
            hashMap2.put(path2.getName(), Long.valueOf(fs.getFileStatus(path2).getLen()));
        } catch (IOException e2) {
            LOG.debug("Failed to calculate the size of hfile " + path2);
            hashMap2.put(path2.getName(), 0L);
        }
        hashMap.put(f2, arrayList2);
        return WALEdit.createBulkLoadEvent(hri, ProtobufUtil.toBulkLoadDescriptor(hri.getTable(), UnsafeByteOperations.unsafeWrap(hri.getEncodedNameAsBytes()), hashMap, hashMap2, 1L));
    }
}
