package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;

/* loaded from: input_file:lib/hbase-0.94.3.jar:org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.class */
public class ReplicationSink {
    private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
    public static final String REPLICATION_LOG_DIR = ".replogs";
    private final Configuration conf;
    private final ExecutorService sharedThreadPool;
    private final HConnection sharedHtableCon;
    private final ReplicationSinkMetrics metrics;

    public ReplicationSink(Configuration configuration, Stoppable stoppable) throws IOException {
        this.conf = HBaseConfiguration.create(configuration);
        decorateConf();
        this.sharedHtableCon = HConnectionManager.createConnection(this.conf);
        this.sharedThreadPool = new ThreadPoolExecutor(1, configuration.getInt("hbase.htable.threads.max", Integer.MAX_VALUE), configuration.getLong("hbase.htable.threads.keepalivetime", 60L), TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory("hbase-repl"));
        ((ThreadPoolExecutor) this.sharedThreadPool).allowCoreThreadTimeOut(true);
        this.metrics = new ReplicationSinkMetrics();
    }

    private void decorateConf() {
        this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, this.conf.getInt("replication.sink.client.retries.number", 4));
        this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, this.conf.getInt("replication.sink.client.ops.timeout", 10000));
    }

    public void replicateEntries(HLog.Entry[] entryArr) throws IOException {
        if (entryArr.length == 0) {
            return;
        }
        try {
            long j = 0;
            TreeMap treeMap = new TreeMap(Bytes.BYTES_COMPARATOR);
            for (HLog.Entry entry : entryArr) {
                WALEdit edit = entry.getEdit();
                byte[] tablename = entry.getKey().getTablename();
                Put put = null;
                Delete delete = null;
                KeyValue keyValue = null;
                for (KeyValue keyValue2 : edit.getKeyValues()) {
                    if (keyValue == null || keyValue.getType() != keyValue2.getType() || !keyValue.matchingRow(keyValue2)) {
                        if (keyValue2.isDelete()) {
                            delete = new Delete(keyValue2.getRow());
                            delete.setClusterId(entry.getKey().getClusterId());
                            addToMultiMap(treeMap, tablename, delete);
                        } else {
                            put = new Put(keyValue2.getRow());
                            put.setClusterId(entry.getKey().getClusterId());
                            addToMultiMap(treeMap, tablename, put);
                        }
                    }
                    if (keyValue2.isDelete()) {
                        delete.addDeleteMarker(keyValue2);
                    } else {
                        put.add(keyValue2);
                    }
                    keyValue = keyValue2;
                }
                j++;
            }
            for (byte[] bArr : treeMap.keySet()) {
                batch(bArr, (List) treeMap.get(bArr));
            }
            this.metrics.setAgeOfLastAppliedOp(entryArr[entryArr.length - 1].getKey().getWriteTime());
            this.metrics.appliedBatchesRate.inc(1);
            LOG.info("Total replicated: " + j);
        } catch (IOException e) {
            LOG.error("Unable to accept edit because:", e);
            throw e;
        }
    }

    private <K, V> List<V> addToMultiMap(Map<K, List<V>> map, K k, V v) {
        List<V> list = map.get(k);
        if (list == null) {
            list = new ArrayList();
            map.put(k, list);
        }
        list.add(v);
        return list;
    }

    public void stopReplicationSinkServices() {
        try {
            this.sharedThreadPool.shutdown();
            if (!this.sharedThreadPool.awaitTermination(60000L, TimeUnit.MILLISECONDS)) {
                this.sharedThreadPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while closing the table pool", e);
            Thread.currentThread().interrupt();
        }
        try {
            this.sharedHtableCon.close();
        } catch (IOException e2) {
            LOG.warn("IOException while closing the connection", e2);
        }
    }

    private void batch(byte[] bArr, List<Row> list) throws IOException {
        if (list.isEmpty()) {
            return;
        }
        HTable hTable = null;
        try {
            try {
                hTable = new HTable(bArr, this.sharedHtableCon, this.sharedThreadPool);
                hTable.batch(list);
                this.metrics.appliedOpsRate.inc(list.size());
                if (hTable != null) {
                    hTable.close();
                }
            } catch (InterruptedException e) {
                throw new IOException(e);
            }
        } catch (Throwable th) {
            if (hTable != null) {
                hTable.close();
            }
            throw th;
        }
    }
}
