package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.class */
public class ReplicationSourceManager implements ReplicationListener {
    private static final Log LOG = LogFactory.getLog(ReplicationSourceManager.class);
    private final ReplicationQueues replicationQueues;
    private final ReplicationTracker replicationTracker;
    private final ReplicationPeers replicationPeers;
    private final UUID clusterId;
    private final Server server;
    private final Configuration conf;
    private final FileSystem fs;
    private Set<Path> latestPaths;
    private final Path logDir;
    private final Path oldLogDir;
    private final long sleepBeforeFailover;
    private final ThreadPoolExecutor executor;
    private final Random rand;
    private final boolean replicationForBulkLoadDataEnabled;
    private final List<ReplicationSourceInterface> sources = new CopyOnWriteArrayList();
    private final Map<String, Map<String, SortedSet<String>>> walsById = new HashMap();
    private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues = new ConcurrentHashMap();
    private final List<ReplicationSourceInterface> oldsources = new CopyOnWriteArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager$NodeFailoverWorker.class */
    public class NodeFailoverWorker extends Thread {
        private String rsZnode;
        private final ReplicationQueues rq;
        private final ReplicationPeers rp;
        private final UUID clusterId;

        public NodeFailoverWorker(ReplicationSourceManager replicationSourceManager, String str) {
            this(str, replicationSourceManager.replicationQueues, replicationSourceManager.replicationPeers, replicationSourceManager.clusterId);
        }

        public NodeFailoverWorker(String str, ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, UUID uuid) {
            super("Failover-for-" + str);
            this.rsZnode = str;
            this.rq = replicationQueues;
            this.rp = replicationPeers;
            this.clusterId = uuid;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            List<String> list;
            if (this.rq.isThisOurZnode(this.rsZnode)) {
                return;
            }
            try {
                Thread.sleep(ReplicationSourceManager.this.sleepBeforeFailover + (ReplicationSourceManager.this.rand.nextFloat() * ((float) ReplicationSourceManager.this.sleepBeforeFailover)));
            } catch (InterruptedException e) {
                ReplicationSourceManager.LOG.warn("Interrupted while waiting before transferring a queue.");
                Thread.currentThread().interrupt();
            }
            if (ReplicationSourceManager.this.server.isStopped()) {
                ReplicationSourceManager.LOG.info("Not transferring queue since we are shutting down");
                return;
            }
            HashMap hashMap = new HashMap();
            List<String> unClaimedQueueIds = this.rq.getUnClaimedQueueIds(this.rsZnode);
            while (true) {
                list = unClaimedQueueIds;
                if (list == null || list.isEmpty()) {
                    break;
                }
                Pair<String, SortedSet<String>> claimQueue = this.rq.claimQueue(this.rsZnode, list.get(ReplicationSourceManager.this.rand.nextInt(list.size())));
                long j = ReplicationSourceManager.this.sleepBeforeFailover / 2;
                if (claimQueue != null) {
                    hashMap.put(claimQueue.getFirst(), claimQueue.getSecond());
                    j = ReplicationSourceManager.this.sleepBeforeFailover;
                }
                try {
                    Thread.sleep(j);
                } catch (InterruptedException e2) {
                    ReplicationSourceManager.LOG.warn("Interrupted while waiting before transferring a queue.");
                    Thread.currentThread().interrupt();
                }
                unClaimedQueueIds = this.rq.getUnClaimedQueueIds(this.rsZnode);
            }
            if (list != null) {
                this.rq.removeReplicatorIfQueueIsEmpty(this.rsZnode);
            }
            if (hashMap.isEmpty()) {
                return;
            }
            for (Map.Entry entry : hashMap.entrySet()) {
                String str = (String) entry.getKey();
                SortedSet<String> sortedSet = (SortedSet) entry.getValue();
                try {
                    String peerId = new ReplicationQueueInfo(str).getPeerId();
                    ReplicationPeer peer = ReplicationSourceManager.this.replicationPeers.getPeer(peerId);
                    ReplicationPeerConfig replicationPeerConfig = null;
                    try {
                        replicationPeerConfig = ReplicationSourceManager.this.replicationPeers.getReplicationPeerConfig(peerId);
                    } catch (ReplicationException e3) {
                        ReplicationSourceManager.LOG.warn("Received exception while getting replication peer config, skipping replay" + e3);
                    }
                    if (peer == null || replicationPeerConfig == null) {
                        ReplicationSourceManager.LOG.warn("Skipping failover for peer:" + peerId + " of node" + this.rsZnode);
                        ReplicationSourceManager.this.replicationQueues.removeQueue(str);
                    } else {
                        HashMap hashMap2 = new HashMap();
                        ReplicationSourceManager.this.walsByIdRecoveredQueues.put(str, hashMap2);
                        for (String str2 : sortedSet) {
                            String wALPrefixFromWALName = DefaultWALProvider.getWALPrefixFromWALName(str2);
                            SortedSet sortedSet2 = (SortedSet) hashMap2.get(wALPrefixFromWALName);
                            if (sortedSet2 == null) {
                                sortedSet2 = new TreeSet();
                                hashMap2.put(wALPrefixFromWALName, sortedSet2);
                            }
                            sortedSet2.add(str2);
                        }
                        ReplicationSourceInterface replicationSource = ReplicationSourceManager.this.getReplicationSource(ReplicationSourceManager.this.conf, ReplicationSourceManager.this.fs, ReplicationSourceManager.this, this.rq, this.rp, ReplicationSourceManager.this.server, str, this.clusterId, replicationPeerConfig, peer);
                        synchronized (ReplicationSourceManager.this.oldsources) {
                            if (this.rp.getPeerIds().contains(replicationSource.getPeerClusterId())) {
                                ReplicationSourceManager.this.oldsources.add(replicationSource);
                                Iterator it = sortedSet.iterator();
                                while (it.hasNext()) {
                                    replicationSource.enqueueLog(new Path(ReplicationSourceManager.this.oldLogDir, (String) it.next()));
                                }
                                replicationSource.startup();
                            } else {
                                replicationSource.terminate("Recovered queue doesn't belong to any current peer");
                                ReplicationSourceManager.this.closeRecoveredQueue(replicationSource);
                            }
                        }
                    }
                } catch (IOException e4) {
                    ReplicationSourceManager.LOG.error("Failed creating a source", e4);
                }
            }
        }
    }

    public ReplicationSourceManager(ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration configuration, Server server, FileSystem fileSystem, Path path, Path path2, UUID uuid) {
        this.replicationQueues = replicationQueues;
        this.replicationPeers = replicationPeers;
        this.replicationTracker = replicationTracker;
        this.server = server;
        this.conf = configuration;
        this.fs = fileSystem;
        this.logDir = path;
        this.oldLogDir = path2;
        this.sleepBeforeFailover = configuration.getLong("replication.sleep.before.failover", 30000L);
        this.clusterId = uuid;
        this.replicationTracker.registerListener(this);
        this.replicationPeers.getAllPeerIds();
        int i = configuration.getInt("replication.executor.workers", 1);
        this.executor = new ThreadPoolExecutor(i, i, 100L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
        threadFactoryBuilder.setNameFormat("ReplicationExecutor-%d");
        threadFactoryBuilder.setDaemon(true);
        this.executor.setThreadFactory(threadFactoryBuilder.build());
        this.rand = new Random();
        this.latestPaths = Collections.synchronizedSet(new HashSet());
        this.replicationForBulkLoadDataEnabled = configuration.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false);
    }

    public void logPositionAndCleanOldLogs(Path path, String str, long j, boolean z, boolean z2) {
        String name = path.getName();
        this.replicationQueues.setLogPosition(str, name, j);
        if (z2) {
            return;
        }
        cleanOldLogs(name, str, z);
    }

    public void cleanOldLogs(String str, String str2, boolean z) {
        String wALPrefixFromWALName = DefaultWALProvider.getWALPrefixFromWALName(str);
        if (z) {
            SortedSet<String> sortedSet = this.walsByIdRecoveredQueues.get(str2).get(wALPrefixFromWALName);
            if (sortedSet == null || sortedSet.first().equals(str)) {
                return;
            }
            cleanOldLogs(sortedSet, str, str2);
            return;
        }
        synchronized (this.walsById) {
            SortedSet<String> sortedSet2 = this.walsById.get(str2).get(wALPrefixFromWALName);
            if (sortedSet2 != null && !sortedSet2.first().equals(str)) {
                cleanOldLogs(sortedSet2, str, str2);
            }
        }
    }

    private void cleanOldLogs(SortedSet<String> sortedSet, String str, String str2) {
        SortedSet<String> headSet = sortedSet.headSet(str);
        LOG.debug("Removing " + headSet.size() + " logs in the list: " + headSet);
        Iterator<String> it = headSet.iterator();
        while (it.hasNext()) {
            this.replicationQueues.removeLog(str2, it.next());
        }
        headSet.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void init() throws IOException, ReplicationException {
        for (String str : this.replicationPeers.getPeerIds()) {
            addSource(str);
            if (this.replicationForBulkLoadDataEnabled) {
                this.replicationQueues.addPeerToHFileRefs(str);
            }
        }
        List<String> listOfReplicators = this.replicationQueues.getListOfReplicators();
        if (listOfReplicators == null || listOfReplicators.isEmpty()) {
            return;
        }
        List<String> listOfRegionServers = this.replicationTracker.getListOfRegionServers();
        LOG.info("Current list of replicators: " + listOfReplicators + " other RSs: " + listOfRegionServers);
        for (String str2 : listOfReplicators) {
            if (!listOfRegionServers.contains(str2)) {
                transferQueues(str2);
            }
        }
    }

    protected ReplicationSourceInterface addSource(String str) throws IOException, ReplicationException {
        ReplicationSourceInterface replicationSource = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, this.replicationPeers, this.server, str, this.clusterId, this.replicationPeers.getReplicationPeerConfig(str), this.replicationPeers.getPeer(str));
        synchronized (this.walsById) {
            this.sources.add(replicationSource);
            HashMap hashMap = new HashMap();
            this.walsById.put(str, hashMap);
            synchronized (this.latestPaths) {
                if (this.latestPaths.size() > 0) {
                    for (Path path : this.latestPaths) {
                        String name = path.getName();
                        String wALPrefixFromWALName = DefaultWALProvider.getWALPrefixFromWALName(name);
                        TreeSet treeSet = new TreeSet();
                        treeSet.add(name);
                        hashMap.put(wALPrefixFromWALName, treeSet);
                        try {
                            this.replicationQueues.addLog(str, name);
                            replicationSource.enqueueLog(path);
                        } catch (ReplicationException e) {
                            this.server.stop("Cannot add log to queue when creating a new source, queueId=" + str + ", filename=" + name);
                            throw e;
                        }
                    }
                }
            }
        }
        replicationSource.startup();
        return replicationSource;
    }

    public void deleteSource(String str, boolean z) {
        this.replicationQueues.removeQueue(str);
        if (z) {
            this.replicationPeers.peerRemoved(str);
        }
    }

    public void join() {
        this.executor.shutdown();
        if (this.sources.size() == 0) {
            this.replicationQueues.removeAllQueues();
        }
        Iterator<ReplicationSourceInterface> it = this.sources.iterator();
        while (it.hasNext()) {
            it.next().terminate("Region server is closing");
        }
    }

    protected Map<String, Map<String, SortedSet<String>>> getWALs() {
        return Collections.unmodifiableMap(this.walsById);
    }

    protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
        return Collections.unmodifiableMap(this.walsByIdRecoveredQueues);
    }

    public List<ReplicationSourceInterface> getSources() {
        return this.sources;
    }

    public List<ReplicationSourceInterface> getOldSources() {
        return this.oldsources;
    }

    public ReplicationSourceInterface getSource(String str) {
        for (ReplicationSourceInterface replicationSourceInterface : getSources()) {
            if (replicationSourceInterface.getPeerClusterId().equals(str)) {
                return replicationSourceInterface;
            }
        }
        return null;
    }

    @VisibleForTesting
    List<String> getAllQueues() {
        return this.replicationQueues.getAllQueues();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void preLogRoll(Path path) throws IOException {
        recordLog(path);
        String wALPrefixFromWALName = DefaultWALProvider.getWALPrefixFromWALName(path.getName());
        synchronized (this.latestPaths) {
            Iterator<Path> it = this.latestPaths.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                } else if (it.next().getName().contains(wALPrefixFromWALName)) {
                    it.remove();
                    break;
                }
            }
            this.latestPaths.add(path);
        }
    }

    private void recordLog(Path path) throws IOException {
        String name = path.getName();
        String wALPrefixFromWALName = DefaultWALProvider.getWALPrefixFromWALName(name);
        synchronized (this.replicationPeers) {
            for (String str : this.replicationPeers.getPeerIds()) {
                try {
                    this.replicationQueues.addLog(str, name);
                } catch (ReplicationException e) {
                    throw new IOException("Cannot add log to replication queue when creating a new source, queueId=" + str + ", filename=" + name, e);
                }
            }
        }
        synchronized (this.walsById) {
            for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
                String key = entry.getKey();
                Map<String, SortedSet<String>> value = entry.getValue();
                boolean z = false;
                for (Map.Entry<String, SortedSet<String>> entry2 : value.entrySet()) {
                    SortedSet<String> value2 = entry2.getValue();
                    if (this.sources.isEmpty()) {
                        value2.clear();
                    }
                    if (wALPrefixFromWALName.equals(entry2.getKey())) {
                        value2.add(name);
                        z = true;
                    }
                }
                if (!z) {
                    LOG.debug("Start tracking logs for wal group " + wALPrefixFromWALName + " for peer " + key);
                    TreeSet treeSet = new TreeSet();
                    treeSet.add(name);
                    value.put(wALPrefixFromWALName, treeSet);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void postLogRoll(Path path) throws IOException {
        Iterator<ReplicationSourceInterface> it = this.sources.iterator();
        while (it.hasNext()) {
            it.next().enqueueLog(path);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v33, types: [org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface] */
    protected ReplicationSourceInterface getReplicationSource(Configuration configuration, FileSystem fileSystem, ReplicationSourceManager replicationSourceManager, ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server, String str, UUID uuid, ReplicationPeerConfig replicationPeerConfig, ReplicationPeer replicationPeer) throws IOException {
        ReplicationSource replicationSource;
        RegionServerCoprocessorHost regionServerCoprocessorHost = null;
        TableDescriptors tableDescriptors = null;
        if (server instanceof HRegionServer) {
            regionServerCoprocessorHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
            tableDescriptors = ((HRegionServer) server).getTableDescriptors();
        }
        try {
            replicationSource = (ReplicationSourceInterface) Class.forName(configuration.get("replication.replicationsource.implementation", ReplicationSource.class.getCanonicalName())).newInstance();
        } catch (Exception e) {
            LOG.warn("Passed replication source implementation throws errors, defaulting to ReplicationSource", e);
            replicationSource = new ReplicationSource();
        }
        try {
            String replicationEndpointImpl = replicationPeerConfig.getReplicationEndpointImpl();
            if (replicationEndpointImpl == null) {
                replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
            }
            ReplicationEndpoint replicationEndpoint = (ReplicationEndpoint) Class.forName(replicationEndpointImpl).newInstance();
            if (regionServerCoprocessorHost != null) {
                ReplicationEndpoint postCreateReplicationEndPoint = regionServerCoprocessorHost.postCreateReplicationEndPoint(replicationEndpoint);
                if (postCreateReplicationEndPoint != null) {
                    replicationEndpoint = postCreateReplicationEndPoint;
                }
            }
            MetricsSource metricsSource = new MetricsSource(str);
            replicationSource.init(configuration, fileSystem, replicationSourceManager, replicationQueues, replicationPeers, server, str, uuid, replicationEndpoint, metricsSource);
            replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), fileSystem, replicationPeerConfig, str, uuid, replicationPeer, metricsSource, tableDescriptors, server));
            return replicationSource;
        } catch (Exception e2) {
            LOG.warn("Passed replication endpoint implementation throws errors while initializing ReplicationSource for peer: " + str, e2);
            throw new IOException(e2);
        }
    }

    private void transferQueues(String str) {
        try {
            this.executor.execute(new NodeFailoverWorker(str, this.replicationQueues, this.replicationPeers, this.clusterId));
        } catch (RejectedExecutionException e) {
            LOG.info("Cancelling the transfer of " + str + " because of " + e.getMessage());
        }
    }

    public void closeRecoveredQueue(ReplicationSourceInterface replicationSourceInterface) {
        LOG.info("Done with the recovered queue " + replicationSourceInterface.getPeerClusterZnode());
        replicationSourceInterface.getSourceMetrics().clear();
        this.oldsources.remove(replicationSourceInterface);
        deleteSource(replicationSourceInterface.getPeerClusterZnode(), false);
        this.walsByIdRecoveredQueues.remove(replicationSourceInterface.getPeerClusterZnode());
    }

    public void closeQueue(ReplicationSourceInterface replicationSourceInterface) {
        LOG.info("Done with the queue " + replicationSourceInterface.getPeerClusterZnode());
        replicationSourceInterface.getSourceMetrics().clear();
        this.sources.remove(replicationSourceInterface);
        deleteSource(replicationSourceInterface.getPeerClusterZnode(), true);
        this.walsById.remove(replicationSourceInterface.getPeerClusterZnode());
    }

    public void removePeer(String str) {
        LOG.info("Closing the following queue " + str + ", currently have " + this.sources.size() + " and another " + this.oldsources.size() + " that were recovered");
        ArrayList<ReplicationSourceInterface> arrayList = new ArrayList();
        synchronized (this.oldsources) {
            for (ReplicationSourceInterface replicationSourceInterface : this.oldsources) {
                if (str.equals(replicationSourceInterface.getPeerClusterId())) {
                    arrayList.add(replicationSourceInterface);
                }
            }
            for (ReplicationSourceInterface replicationSourceInterface2 : arrayList) {
                replicationSourceInterface2.terminate("Replication stream was removed by a user");
                closeRecoveredQueue(replicationSourceInterface2);
            }
        }
        LOG.info("Number of deleted recovered sources for " + str + ": " + arrayList.size());
        ArrayList<ReplicationSourceInterface> arrayList2 = new ArrayList();
        synchronized (this.replicationPeers) {
            for (ReplicationSourceInterface replicationSourceInterface3 : this.sources) {
                if (str.equals(replicationSourceInterface3.getPeerClusterId())) {
                    arrayList2.add(replicationSourceInterface3);
                }
            }
            if (arrayList2.isEmpty()) {
                LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. This could mean that ReplicationSourceInterface initialization failed for this peer and that replication on this peer may not be caught up. peerId=" + str);
            }
            for (ReplicationSourceInterface replicationSourceInterface4 : arrayList2) {
                replicationSourceInterface4.terminate("Replication stream was removed by a user");
                closeQueue(replicationSourceInterface4);
            }
            deleteSource(str, true);
        }
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationListener
    public void regionServerRemoved(String str) {
        transferQueues(str);
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationListener
    public void peerRemoved(String str) {
        removePeer(str);
        this.replicationQueues.removePeerFromHFileRefs(str);
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationListener
    public void peerListChanged(List<String> list) {
        for (String str : list) {
            try {
                if (this.replicationPeers.peerAdded(str)) {
                    addSource(str);
                    if (this.replicationForBulkLoadDataEnabled) {
                        this.replicationQueues.addPeerToHFileRefs(str);
                    }
                }
            } catch (Exception e) {
                LOG.error("Error while adding a new peer", e);
            }
        }
    }

    public Path getOldLogDir() {
        return this.oldLogDir;
    }

    public Path getLogDir() {
        return this.logDir;
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public ReplicationPeers getReplicationPeers() {
        return this.replicationPeers;
    }

    public String getStats() {
        StringBuffer stringBuffer = new StringBuffer();
        for (ReplicationSourceInterface replicationSourceInterface : this.sources) {
            stringBuffer.append("Normal source for cluster " + replicationSourceInterface.getPeerClusterId() + ": ");
            stringBuffer.append(replicationSourceInterface.getStats() + IOUtils.LINE_SEPARATOR_UNIX);
        }
        for (ReplicationSourceInterface replicationSourceInterface2 : this.oldsources) {
            stringBuffer.append("Recovered source for cluster/machine(s) " + replicationSourceInterface2.getPeerClusterId() + ": ");
            stringBuffer.append(replicationSourceInterface2.getStats() + IOUtils.LINE_SEPARATOR_UNIX);
        }
        return stringBuffer.toString();
    }

    public void addHFileRefs(TableName tableName, byte[] bArr, List<String> list) throws ReplicationException {
        Iterator<ReplicationSourceInterface> it = this.sources.iterator();
        while (it.hasNext()) {
            it.next().addHFileRefs(tableName, bArr, list);
        }
    }

    public void cleanUpHFileRefs(String str, List<String> list) {
        this.replicationQueues.removeHFileRefs(str, list);
    }
}
