package org.apache.hadoop.hbase.replication.regionserver;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.zookeeper.KeeperException;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/Replication.class */
public class Replication extends WALActionsListener.Base implements ReplicationSourceService, ReplicationSinkService {
    private static final Log LOG;
    private boolean replication;
    private ReplicationSourceManager replicationManager;
    private ReplicationQueues replicationQueues;
    private ReplicationPeers replicationPeers;
    private ReplicationTracker replicationTracker;
    private Configuration conf;
    private ReplicationSink replicationSink;
    private Server server;
    private ScheduledExecutorService scheduleThreadPool;
    private int statsThreadPeriod;
    private ReplicationLoad replicationLoad;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/Replication$ReplicationStatisticsThread.class */
    static class ReplicationStatisticsThread extends Thread {
        private final ReplicationSink replicationSink;
        private final ReplicationSourceManager replicationManager;

        public ReplicationStatisticsThread(ReplicationSink replicationSink, ReplicationSourceManager replicationSourceManager) {
            super("ReplicationStatisticsThread");
            this.replicationManager = replicationSourceManager;
            this.replicationSink = replicationSink;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            printStats(this.replicationManager.getStats());
            printStats(this.replicationSink.getStats());
        }

        private void printStats(String str) {
            if (str.isEmpty()) {
                return;
            }
            Replication.LOG.info(str);
        }
    }

    public Replication(Server server, FileSystem fileSystem, Path path, Path path2) throws IOException {
        initialize(server, fileSystem, path, path2);
    }

    public Replication() {
    }

    @Override // org.apache.hadoop.hbase.regionserver.ReplicationService
    public void initialize(Server server, FileSystem fileSystem, Path path, Path path2) throws IOException {
        this.server = server;
        this.conf = this.server.getConfiguration();
        this.replication = isReplication(this.conf);
        this.scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d").setDaemon(true).build());
        if (!this.replication) {
            this.replicationManager = null;
            this.replicationQueues = null;
            this.replicationPeers = null;
            this.replicationTracker = null;
            this.replicationLoad = null;
            return;
        }
        try {
            this.replicationQueues = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
            this.replicationQueues.init(this.server.getServerName().toString());
            this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
            this.replicationPeers.init();
            this.replicationTracker = ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers, this.conf, this.server, this.server);
            try {
                this.replicationManager = new ReplicationSourceManager(this.replicationQueues, this.replicationPeers, this.replicationTracker, this.conf, this.server, fileSystem, path, path2, ZKClusterId.getUUIDForCluster(this.server.getZooKeeper()));
                this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 300);
                LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
                this.replicationLoad = new ReplicationLoad();
            } catch (KeeperException e) {
                throw new IOException("Could not read cluster id", e);
            }
        } catch (ReplicationException e2) {
            throw new IOException("Failed replication handler create", e2);
        }
    }

    public static boolean isReplication(Configuration configuration) {
        return configuration.getBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
    }

    @Override // org.apache.hadoop.hbase.regionserver.ReplicationSourceService
    public WALActionsListener getWALActionsListener() {
        return this;
    }

    @Override // org.apache.hadoop.hbase.regionserver.ReplicationService
    public void stopReplicationService() {
        join();
    }

    public void join() {
        if (this.replication) {
            this.replicationManager.join();
            if (this.replicationSink != null) {
                this.replicationSink.stopReplicationSinkServices();
            }
        }
        this.scheduleThreadPool.shutdown();
    }

    @Override // org.apache.hadoop.hbase.regionserver.ReplicationSinkService
    public void replicateLogEntries(List<AdminProtos.WALEntry> list, CellScanner cellScanner) throws IOException {
        if (this.replication) {
            this.replicationSink.replicateEntries(list, cellScanner);
        }
    }

    @Override // org.apache.hadoop.hbase.regionserver.ReplicationService
    public void startReplicationService() throws IOException {
        if (this.replication) {
            try {
                this.replicationManager.init();
                this.replicationSink = new ReplicationSink(this.conf, this.server);
                this.scheduleThreadPool.scheduleAtFixedRate(new ReplicationStatisticsThread(this.replicationSink, this.replicationManager), this.statsThreadPeriod, this.statsThreadPeriod, TimeUnit.SECONDS);
            } catch (ReplicationException e) {
                throw new IOException(e);
            }
        }
    }

    public ReplicationSourceManager getReplicationManager() {
        return this.replicationManager;
    }

    @Override // org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.Base, org.apache.hadoop.hbase.regionserver.wal.WALActionsListener
    public void visitLogEntryBeforeWrite(HTableDescriptor hTableDescriptor, WALKey wALKey, WALEdit wALEdit) {
        scopeWALEdits(hTableDescriptor, wALKey, wALEdit);
    }

    public static void scopeWALEdits(HTableDescriptor hTableDescriptor, WALKey wALKey, WALEdit wALEdit) {
        TreeMap treeMap = new TreeMap(Bytes.BYTES_COMPARATOR);
        Iterator<Cell> it2 = wALEdit.getCells().iterator();
        while (it2.hasNext()) {
            Cell next = it2.next();
            byte[] family = next.getFamily();
            if (!CellUtil.matchingFamily(next, WALEdit.METAFAMILY)) {
                if (!$assertionsDisabled && hTableDescriptor.getFamily(family) == null) {
                    throw new AssertionError();
                }
                int scope = hTableDescriptor.getFamily(family).getScope();
                if (scope != 0 && !treeMap.containsKey(family)) {
                    treeMap.put(family, Integer.valueOf(scope));
                }
            }
        }
        if (treeMap.isEmpty()) {
            return;
        }
        wALKey.setScopes(treeMap);
    }

    @Override // org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.Base, org.apache.hadoop.hbase.regionserver.wal.WALActionsListener
    public void preLogRoll(Path path, Path path2) throws IOException {
        getReplicationManager().preLogRoll(path2);
    }

    @Override // org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.Base, org.apache.hadoop.hbase.regionserver.wal.WALActionsListener
    public void postLogRoll(Path path, Path path2) throws IOException {
        getReplicationManager().postLogRoll(path2);
    }

    public static void decorateMasterConfiguration(Configuration configuration) {
        if (isReplication(configuration)) {
            String str = configuration.get(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
            String canonicalName = ReplicationLogCleaner.class.getCanonicalName();
            if (str.contains(canonicalName)) {
                return;
            }
            configuration.set(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, str + "," + canonicalName);
        }
    }

    @Override // org.apache.hadoop.hbase.regionserver.ReplicationService
    public ReplicationLoad refreshAndGetReplicationLoad() {
        if (this.replicationLoad == null) {
            return null;
        }
        buildReplicationLoad();
        return this.replicationLoad;
    }

    private void buildReplicationLoad() {
        ArrayList arrayList = new ArrayList();
        for (ReplicationSourceInterface replicationSourceInterface : this.replicationManager.getSources()) {
            if (replicationSourceInterface instanceof ReplicationSource) {
                arrayList.add(((ReplicationSource) replicationSourceInterface).getSourceMetrics());
            }
        }
        for (ReplicationSourceInterface replicationSourceInterface2 : this.replicationManager.getOldSources()) {
            if (replicationSourceInterface2 instanceof ReplicationSource) {
                arrayList.add(((ReplicationSource) replicationSourceInterface2).getSourceMetrics());
            }
        }
        this.replicationLoad.buildReplicationLoad(arrayList, this.replicationSink.getSinkMetrics());
    }

    static {
        $assertionsDisabled = !Replication.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(Replication.class);
    }
}
