package com.google.cloud.bigtable.hbase.replication;

import com.google.bigtable.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import com.google.cloud.bigtable.hbase.replication.adapters.BigtableWALEntry;
import com.google.cloud.bigtable.hbase.replication.adapters.IncompatibleMutationAdapter;
import com.google.cloud.bigtable.hbase.replication.adapters.IncompatibleMutationAdapterFactory;
import com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration;
import com.google.cloud.bigtable.hbase.replication.metrics.MetricsExporter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.ByteRange;
import org.apache.hadoop.hbase.util.SimpleByteRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicator.class */
public class CloudBigtableReplicator {
    private static final Logger LOG = LoggerFactory.getLogger(CloudBigtableReplicator.class);
    private long batchSizeThresholdInBytes;
    private IncompatibleMutationAdapter incompatibleMutationAdapter;
    private boolean isDryRun;
    private SharedResources sharedResources;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicator$SharedResources.class */
    public static class SharedResources {
        private static SharedResources INSTANCE;
        private final ExecutorService executorService;
        private final Connection connection;
        private static int numReferences = 0;

        @VisibleForTesting
        SharedResources(Connection connection, ExecutorService executorService) {
            this.connection = connection;
            this.executorService = executorService;
        }

        static synchronized SharedResources getInstance(Configuration configuration) {
            numReferences++;
            if (INSTANCE == null) {
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(configuration.getInt(HBaseToCloudBigtableReplicationConfiguration.NUM_REPLICATION_SINK_THREADS_KEY, 10), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("cloud-bigtable-replication-sink-%d").build());
                Configuration configuration2 = new Configuration(configuration);
                String str = configuration2.get("google.bigtable.project.id");
                String str2 = configuration2.get("google.bigtable.instance.id");
                configuration2.set(BigtableOptionsFactory.CUSTOM_USER_AGENT_KEY, "HBaseReplication");
                Connection connect = BigtableConfiguration.connect(configuration2);
                CloudBigtableReplicator.LOG.info(String.format("Created a connection to CBT. projects/%s/instances/%s", str, str2));
                INSTANCE = new SharedResources(connect, newFixedThreadPool);
            }
            return INSTANCE;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static synchronized void decrementReferenceCount() {
            int i = numReferences - 1;
            numReferences = i;
            if (i > 0) {
                return;
            }
            try {
                try {
                    INSTANCE.connection.close();
                } catch (Exception e) {
                    CloudBigtableReplicator.LOG.warn("Failed to close connection to Cloud Bigtable", e);
                }
                INSTANCE.executorService.shutdown();
                try {
                    INSTANCE.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
                } catch (Exception e2) {
                    CloudBigtableReplicator.LOG.warn("Failed to shut down the Cloud Bigtable replication thread pool.", e2);
                }
                INSTANCE = null;
            } catch (Throwable th) {
                INSTANCE = null;
                throw th;
            }
        }
    }

    @VisibleForTesting
    synchronized void start(SharedResources sharedResources, IncompatibleMutationAdapter incompatibleMutationAdapter, long j, boolean z) {
        this.sharedResources = sharedResources;
        this.incompatibleMutationAdapter = incompatibleMutationAdapter;
        this.batchSizeThresholdInBytes = j;
        this.isDryRun = z;
        if (z) {
            LOG.info("Replicating to Cloud Bigtable in dry-run mode. No mutations will be applied to Cloud Bigtable.");
        }
    }

    public synchronized void start(Configuration configuration, MetricsExporter metricsExporter) {
        LOG.info("Starting replication to CBT.");
        SharedResources sharedResources = SharedResources.getInstance(configuration);
        start(sharedResources, new IncompatibleMutationAdapterFactory(configuration, metricsExporter, sharedResources.connection).createIncompatibleMutationAdapter(), configuration.getLong(HBaseToCloudBigtableReplicationConfiguration.BATCH_SIZE_KEY, HBaseToCloudBigtableReplicationConfiguration.DEFAULT_BATCH_SIZE_IN_BYTES), configuration.getBoolean(HBaseToCloudBigtableReplicationConfiguration.ENABLE_DRY_RUN_MODE_KEY, false));
    }

    public void stop() {
        LOG.info("Stopping replication to CBT.");
        SharedResources.decrementReferenceCount();
    }

    public UUID getPeerUUID() {
        return UUID.nameUUIDFromBytes("Cloud-bigtable".getBytes(StandardCharsets.UTF_8));
    }

    public boolean replicate(Map<String, List<BigtableWALEntry>> map) {
        long currentTimeMillis = System.currentTimeMillis();
        boolean z = true;
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, List<BigtableWALEntry>> entry : map.entrySet()) {
            arrayList.addAll(replicateTable(entry.getKey(), entry.getValue()));
        }
        try {
            try {
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    z = ((Boolean) ((Future) it.next()).get()).booleanValue() && z;
                }
                LOG.trace("Exiting CBT replicate method after {} ms, Succeeded: {} ", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Boolean.valueOf(z));
            } catch (Exception e) {
                LOG.error("Failed to replicate a batch ", e);
                z = false;
                LOG.trace("Exiting CBT replicate method after {} ms, Succeeded: {} ", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), false);
            }
            return z;
        } catch (Throwable th) {
            LOG.trace("Exiting CBT replicate method after {} ms, Succeeded: {} ", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Boolean.valueOf(z));
            throw th;
        }
    }

    private List<Future<Boolean>> replicateTable(String str, List<BigtableWALEntry> list) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        int i = 0;
        Iterator<BigtableWALEntry> it = list.iterator();
        while (it.hasNext()) {
            arrayList2.addAll(this.incompatibleMutationAdapter.adaptIncompatibleMutations(it.next()));
        }
        if (this.isDryRun) {
            return Arrays.asList(CompletableFuture.completedFuture(true));
        }
        Map map = (Map) arrayList2.stream().collect(Collectors.groupingBy(cell -> {
            return new SimpleByteRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
        }));
        HashMap hashMap = new HashMap();
        int i2 = 0;
        for (Map.Entry<ByteRange, List<Cell>> entry : map.entrySet()) {
            i2 += entry.getValue().size();
            i += getRowSize(entry);
            hashMap.put(entry.getKey(), entry.getValue());
            if (i >= this.batchSizeThresholdInBytes || i2 >= 99999) {
                LOG.trace("Replicating a batch of " + hashMap.size() + " rows and " + i2 + " cells with heap size " + i + " for table: " + str);
                arrayList.add(replicateBatch(str, hashMap));
                hashMap = new HashMap();
                i2 = 0;
                i = 0;
            }
        }
        if (!hashMap.isEmpty()) {
            arrayList.add(replicateBatch(str, hashMap));
        }
        return arrayList;
    }

    private int getRowSize(Map.Entry<ByteRange, List<Cell>> entry) {
        int i = 0;
        Iterator<Cell> it = entry.getValue().iterator();
        while (it.hasNext()) {
            i = (int) (i + CellUtil.estimatedHeapSizeOf(it.next()));
        }
        return i;
    }

    private Future<Boolean> replicateBatch(String str, Map<ByteRange, List<Cell>> map) {
        try {
            return this.sharedResources.executorService.submit(new CloudBigtableReplicationTask(str, this.sharedResources.connection, map));
        } catch (Exception e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            LOG.error("Failed to submit a batch for table: " + str, e);
            return CompletableFuture.completedFuture(false);
        }
    }
}
