package org.apache.hadoop.hbase.regionserver.regionreplication;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.shaded.com.google.errorprone.annotations.RestrictedApi;
import org.apache.hadoop.hbase.shaded.org.agrona.collections.IntHashSet;
import org.apache.hadoop.hbase.shaded.org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.class */
public class RegionReplicationSink {
    private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class);
    public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number";
    public static final int RETRIES_NUMBER_DEFAULT = 3;
    public static final String RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.rpc.timeout.ms";
    public static final long RPC_TIMEOUT_MS_DEFAULT = 1000;
    public static final String OPERATION_TIMEOUT_MS = "hbase.region.read-replica.sink.operation.timeout.ms";
    public static final long OPERATION_TIMEOUT_MS_DEFAULT = 5000;
    public static final String META_EDIT_RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.meta-edit.rpc.timeout.ms";
    public static final long META_EDIT_RPC_TIMEOUT_MS_DEFAULT = 15000;
    public static final String META_EDIT_OPERATION_TIMEOUT_MS = "hbase.region.read-replica.sink.meta-edit.operation.timeout.ms";
    public static final long META_EDIT_OPERATION_TIMEOUT_MS_DEFAULT = 60000;
    public static final String BATCH_SIZE_CAPACITY = "hbase.region.read-replica.sink.size.capacity";
    public static final long BATCH_SIZE_CAPACITY_DEFAULT = 1048576;
    public static final String BATCH_COUNT_CAPACITY = "hbase.region.read-replica.sink.nb.capacity";
    public static final int BATCH_COUNT_CAPACITY_DEFAULT = 100;
    private final RegionInfo primary;
    private final TableDescriptor tableDesc;
    private final int regionReplication;
    private final RegionReplicationBufferManager manager;
    private final RegionReplicationFlushRequester flushRequester;
    private final AsyncClusterConnection conn;
    private final IntHashSet failedReplicas;
    private final Queue<SinkEntry> entries = new ArrayDeque();
    private final int retries;
    private final long rpcTimeoutNs;
    private final long operationTimeoutNs;
    private final long metaEditRpcTimeoutNs;
    private final long metaEditOperationTimeoutNs;
    private final long batchSizeCapacity;
    private final long batchCountCapacity;
    private volatile long pendingSize;
    private long lastFlushedSequenceId;
    private boolean sending;
    private boolean stopping;
    private boolean stopped;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink$SinkEntry.class */
    public static final class SinkEntry {
        final WALKeyImpl key;
        final WALEdit edit;
        final ServerCall<?> rpcCall;
        final long size;

        SinkEntry(WALKeyImpl wALKeyImpl, WALEdit wALEdit, ServerCall<?> serverCall) {
            this.key = wALKeyImpl;
            this.edit = wALEdit;
            this.rpcCall = serverCall;
            this.size = wALKeyImpl.estimatedSerializedSizeOf() + wALEdit.estimatedSerializedSizeOf();
            if (serverCall != null) {
                serverCall.retainByWAL();
            }
        }

        void replicated() {
            if (this.rpcCall != null) {
                this.rpcCall.releaseByWAL();
            }
        }
    }

    public RegionReplicationSink(Configuration configuration, RegionInfo regionInfo, TableDescriptor tableDescriptor, RegionReplicationBufferManager regionReplicationBufferManager, Runnable runnable, AsyncClusterConnection asyncClusterConnection) {
        Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(regionInfo), "%s is not primary", regionInfo);
        this.regionReplication = tableDescriptor.getRegionReplication();
        Preconditions.checkArgument(this.regionReplication > 1, "region replication should be greater than 1 but got %s", this.regionReplication);
        this.primary = regionInfo;
        this.tableDesc = tableDescriptor;
        this.manager = regionReplicationBufferManager;
        this.flushRequester = new RegionReplicationFlushRequester(configuration, runnable);
        this.conn = asyncClusterConnection;
        this.retries = configuration.getInt(RETRIES_NUMBER, 3);
        this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(configuration.getLong(RPC_TIMEOUT_MS, 1000L));
        this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(configuration.getLong(OPERATION_TIMEOUT_MS, 5000L));
        this.metaEditRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(configuration.getLong(META_EDIT_RPC_TIMEOUT_MS, 15000L));
        this.metaEditOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(configuration.getLong(META_EDIT_OPERATION_TIMEOUT_MS, 60000L));
        this.batchSizeCapacity = configuration.getLong(BATCH_SIZE_CAPACITY, 1048576L);
        this.batchCountCapacity = configuration.getInt(BATCH_COUNT_CAPACITY, 100);
        this.failedReplicas = new IntHashSet(this.regionReplication - 1);
    }

    void onComplete(List<SinkEntry> list, Map<Integer, MutableObject<Throwable>> map) {
        long j = Long.MIN_VALUE;
        long j2 = 0;
        for (SinkEntry sinkEntry : list) {
            j = Math.max(j, sinkEntry.key.getSequenceId());
            sinkEntry.replicated();
            j2 += sinkEntry.size;
        }
        this.manager.decrease(j2);
        synchronized (this.entries) {
            this.pendingSize -= j2;
            boolean z = false;
            for (Map.Entry<Integer, MutableObject<Throwable>> entry : map.entrySet()) {
                Integer key = entry.getKey();
                Throwable value2 = entry.getValue().getValue2();
                if (value2 != null) {
                    if (j > this.lastFlushedSequenceId) {
                        LOG.warn("Failed to replicate to secondary replica {} for {}, since the max sequence id of sunk entris is {}, which is greater than the last flush SN {}, we will stop replicating for a while and trigger a flush", new Object[]{key, this.primary, Long.valueOf(j), Long.valueOf(this.lastFlushedSequenceId), value2});
                        this.failedReplicas.add(key);
                        z = true;
                    } else {
                        LOG.warn("Failed to replicate to secondary replica {} for {}, since the max sequence id of sunk entris is {}, which is less than or equal to the last flush SN {}, we will not stop replicating", new Object[]{key, this.primary, Long.valueOf(j), Long.valueOf(this.lastFlushedSequenceId), value2});
                    }
                }
            }
            if (z) {
                this.flushRequester.requestFlush(j);
            }
            this.sending = false;
            if (this.stopping) {
                this.stopped = true;
                this.entries.notifyAll();
            } else if (!this.entries.isEmpty()) {
                send();
            }
        }
    }

    private void send() {
        long j;
        long j2;
        ArrayList arrayList = new ArrayList();
        long j3 = 0;
        boolean z = false;
        do {
            SinkEntry poll = this.entries.poll();
            if (poll == null) {
                break;
            }
            arrayList.add(poll);
            j3 += poll.size;
            z |= poll.edit.isMetaEdit();
            if (arrayList.size() >= this.batchCountCapacity) {
                break;
            }
        } while (j3 < this.batchSizeCapacity);
        int size = (this.regionReplication - 1) - this.failedReplicas.size();
        if (size <= 0) {
            return;
        }
        if (z) {
            j = this.metaEditRpcTimeoutNs;
            j2 = this.metaEditOperationTimeoutNs;
        } else {
            j = this.rpcTimeoutNs;
            j2 = this.operationTimeoutNs;
        }
        this.sending = true;
        List<WAL.Entry> list = (List) arrayList.stream().map(sinkEntry -> {
            return new WAL.Entry(sinkEntry.key, sinkEntry.edit);
        }).collect(Collectors.toList());
        AtomicInteger atomicInteger = new AtomicInteger(size);
        HashMap hashMap = new HashMap();
        for (int i = 1; i < this.regionReplication; i++) {
            if (!this.failedReplicas.contains(i)) {
                MutableObject mutableObject = new MutableObject();
                hashMap.put(Integer.valueOf(i), mutableObject);
                FutureUtils.addListener(this.conn.replicate(RegionReplicaUtil.getRegionInfoForReplica(this.primary, i), list, this.retries, j, j2), (r9, th) -> {
                    mutableObject.setValue(th);
                    if (atomicInteger.decrementAndGet() == 0) {
                        onComplete(arrayList, hashMap);
                    }
                });
            }
        }
    }

    private boolean isStartFlushAllStores(WALProtos.FlushDescriptor flushDescriptor) {
        if (flushDescriptor.getAction() == WALProtos.FlushDescriptor.FlushAction.CANNOT_FLUSH) {
            return true;
        }
        if (flushDescriptor.getAction() != WALProtos.FlushDescriptor.FlushAction.START_FLUSH) {
            return false;
        }
        Set set = (Set) flushDescriptor.getStoreFlushesList().stream().map(storeFlushDescriptor -> {
            return storeFlushDescriptor.getFamilyName().toByteArray();
        }).collect(Collectors.toCollection(() -> {
            return new TreeSet(Bytes.BYTES_COMPARATOR);
        }));
        if (set.size() != this.tableDesc.getColumnFamilyCount()) {
            return false;
        }
        return set.containsAll(this.tableDesc.getColumnFamilyNames());
    }

    Optional<WALProtos.FlushDescriptor> getStartFlushAllDescriptor(Cell cell) {
        if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
            return Optional.empty();
        }
        try {
            WALProtos.FlushDescriptor flushDescriptor = WALEdit.getFlushDescriptor(cell);
            return (flushDescriptor == null || !isStartFlushAllStores(flushDescriptor)) ? Optional.empty() : Optional.of(flushDescriptor);
        } catch (IOException e) {
            LOG.warn("Failed to parse FlushDescriptor from {}", cell);
            return Optional.empty();
        }
    }

    private long clearAllEntries() {
        long j = 0;
        for (SinkEntry sinkEntry : this.entries) {
            j += sinkEntry.size;
            sinkEntry.replicated();
        }
        this.entries.clear();
        this.pendingSize -= j;
        this.manager.decrease(j);
        return j;
    }

    public void add(WALKeyImpl wALKeyImpl, WALEdit wALEdit, ServerCall<?> serverCall) {
        if (this.tableDesc.hasRegionMemStoreReplication() || wALEdit.isMetaEdit()) {
            synchronized (this.entries) {
                if (this.stopping) {
                    return;
                }
                if (wALEdit.isMetaEdit()) {
                    Iterator<Cell> it = wALEdit.getCells().iterator();
                    while (it.hasNext()) {
                        getStartFlushAllDescriptor(it.next()).ifPresent(flushDescriptor -> {
                            long flushSequenceNumber = flushDescriptor.getFlushSequenceNumber();
                            this.lastFlushedSequenceId = flushSequenceNumber;
                            long size = this.entries.size();
                            long clearAllEntries = clearAllEntries();
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Got a flush all request with sequence id {}, clear {} pending entries with size {}, clear failed replicas {}", new Object[]{Long.valueOf(flushSequenceNumber), Long.valueOf(size), StringUtils.TraditionalBinaryPrefix.long2String(clearAllEntries, "", 1), this.failedReplicas});
                            }
                            this.failedReplicas.clear();
                            this.flushRequester.recordFlush(flushSequenceNumber);
                        });
                    }
                }
                if (this.failedReplicas.size() == this.regionReplication - 1) {
                    return;
                }
                SinkEntry sinkEntry = new SinkEntry(wALKeyImpl, wALEdit, serverCall);
                this.entries.add(sinkEntry);
                this.pendingSize += sinkEntry.size;
                if (!this.manager.increase(sinkEntry.size)) {
                    clearAllEntries();
                    for (int i = 1; i < this.regionReplication; i++) {
                        this.failedReplicas.add(i);
                    }
                    this.flushRequester.requestFlush(sinkEntry.key.getSequenceId());
                } else if (!this.sending) {
                    send();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long pendingSize() {
        return this.pendingSize;
    }

    public void stop() {
        synchronized (this.entries) {
            this.stopping = true;
            clearAllEntries();
            if (!this.sending) {
                this.stopped = true;
                this.entries.notifyAll();
            }
        }
    }

    public void waitUntilStopped() throws InterruptedException {
        synchronized (this.entries) {
            while (!this.stopped) {
                this.entries.wait();
            }
        }
    }

    @RestrictedApi(explanation = "Should only be called in tests", link = "", allowedOnPath = ".*/src/test/.*")
    IntHashSet getFailedReplicas() {
        IntHashSet intHashSet;
        synchronized (this.entries) {
            intHashSet = this.failedReplicas;
        }
        return intHashSet;
    }
}
