package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.class */
public class ReplicationSink {
    private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
    private final Configuration conf;
    private final HConnection sharedHtableCon;
    private final MetricsSink metrics;
    private final AtomicLong totalReplicatedEdits = new AtomicLong();

    public ReplicationSink(Configuration configuration, Stoppable stoppable) throws IOException {
        this.conf = HBaseConfiguration.create(configuration);
        decorateConf();
        this.metrics = new MetricsSink();
        this.sharedHtableCon = HConnectionManager.createConnection(this.conf);
    }

    private void decorateConf() {
        this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, this.conf.getInt("replication.sink.client.retries.number", 4));
        this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, this.conf.getInt("replication.sink.client.ops.timeout", 10000));
        String str = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
        if (StringUtils.isNotEmpty(str)) {
            this.conf.set(HConstants.RPC_CODEC_CONF_KEY, str);
        }
    }

    public void replicateEntries(List<AdminProtos.WALEntry> list, CellScanner cellScanner) throws IOException {
        if (list.isEmpty()) {
            return;
        }
        if (cellScanner == null) {
            throw new NullPointerException("TODO: Add handling of null CellScanner");
        }
        try {
            long j = 0;
            TreeMap treeMap = new TreeMap();
            for (AdminProtos.WALEntry wALEntry : list) {
                TableName valueOf = TableName.valueOf(wALEntry.getKey().getTableName().toByteArray());
                Cell cell = null;
                Mutation mutation = null;
                int associatedCellCount = wALEntry.getAssociatedCellCount();
                for (int i = 0; i < associatedCellCount; i++) {
                    if (!cellScanner.advance()) {
                        throw new ArrayIndexOutOfBoundsException("Expected=" + associatedCellCount + ", index=" + i);
                    }
                    Cell current = cellScanner.current();
                    if (isNewRowOrType(cell, current)) {
                        mutation = CellUtil.isDelete(current) ? new Delete(current.getRowArray(), current.getRowOffset(), current.getRowLength()) : new Put(current.getRowArray(), current.getRowOffset(), current.getRowLength());
                        ArrayList arrayList = new ArrayList();
                        Iterator<HBaseProtos.UUID> it2 = wALEntry.getKey().getClusterIdsList().iterator();
                        while (it2.hasNext()) {
                            arrayList.add(toUUID(it2.next()));
                        }
                        mutation.setClusterIds(arrayList);
                        addToHashMultiMap(treeMap, valueOf, arrayList, mutation);
                    }
                    if (CellUtil.isDelete(current)) {
                        ((Delete) mutation).addDeleteMarker(KeyValueUtil.ensureKeyValue(current));
                    } else {
                        ((Put) mutation).add(KeyValueUtil.ensureKeyValue(current));
                    }
                    cell = current;
                }
                j++;
            }
            for (Map.Entry entry : treeMap.entrySet()) {
                batch((TableName) entry.getKey(), ((Map) entry.getValue()).values());
            }
            int size = list.size();
            this.metrics.setAgeOfLastAppliedOp(list.get(size - 1).getKey().getWriteTime());
            this.metrics.applyBatch(size);
            this.totalReplicatedEdits.addAndGet(j);
        } catch (IOException e) {
            LOG.error("Unable to accept edit because:", e);
            throw e;
        }
    }

    private boolean isNewRowOrType(Cell cell, Cell cell2) {
        return (cell != null && cell.getTypeByte() == cell2.getTypeByte() && CellUtil.matchingRow(cell, cell2)) ? false : true;
    }

    private UUID toUUID(HBaseProtos.UUID uuid) {
        return new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
    }

    private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 k1, K2 k2, V v) {
        Map<K2, List<V>> map2 = map.get(k1);
        if (map2 == null) {
            map2 = new HashMap();
            map.put(k1, map2);
        }
        List<V> list = map2.get(k2);
        if (list == null) {
            list = new ArrayList();
            map2.put(k2, list);
        }
        list.add(v);
        return list;
    }

    public void stopReplicationSinkServices() {
        try {
            this.sharedHtableCon.close();
        } catch (IOException e) {
            LOG.warn("IOException while closing the connection", e);
        }
    }

    protected void batch(TableName tableName, Collection<List<Row>> collection) throws IOException {
        if (collection.isEmpty()) {
            return;
        }
        HTableInterface hTableInterface = null;
        try {
            try {
                hTableInterface = this.sharedHtableCon.getTable(tableName);
                Iterator<List<Row>> it2 = collection.iterator();
                while (it2.hasNext()) {
                    hTableInterface.batch(it2.next());
                }
                if (hTableInterface != null) {
                    hTableInterface.close();
                }
            } catch (InterruptedException e) {
                throw ((InterruptedIOException) new InterruptedIOException().initCause(e));
            }
        } catch (Throwable th) {
            if (hTableInterface != null) {
                hTableInterface.close();
            }
            throw th;
        }
    }

    public String getStats() {
        return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + ", total replicated edits: " + this.totalReplicatedEdits;
    }

    public MetricsSink getSinkMetrics() {
        return this.metrics;
    }
}
