package com.spotify.scio.bigtable;

import com.google.api.client.util.Lists;
import com.google.bigtable.repackaged.com.google.cloud.config.BulkOptions;
import com.google.cloud.bigtable.dataflow.AbstractCloudBigtableTableDoFn;
import com.google.cloud.bigtable.dataflow.CloudBigtableConfiguration;
import com.google.cloud.bigtable.dataflow.CloudBigtableIO;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;

/* loaded from: input_file:com/spotify/scio/bigtable/BigtableMultiTableWrite.class */
public class BigtableMultiTableWrite {

    /* loaded from: input_file:com/spotify/scio/bigtable/BigtableMultiTableWrite$CloudBigtableMultiTableBufferedWriteFn.class */
    public static class CloudBigtableMultiTableBufferedWriteFn extends AbstractCloudBigtableTableDoFn<KV<String, Iterable<Mutation>>, Void> {
        private static final long serialVersionUID = 2;
        private Map<String, BufferedMutator> mutators;
        private final Aggregator<Long, Long> mutationsCounter;
        private final Aggregator<Long, Long> exceptionsCounter;

        public CloudBigtableMultiTableBufferedWriteFn(CloudBigtableConfiguration cloudBigtableConfiguration) {
            super(cloudBigtableConfiguration);
            this.mutationsCounter = createAggregator("mutations", new Sum.SumLongFn());
            this.exceptionsCounter = createAggregator("exceptions", new Sum.SumLongFn());
        }

        public void startBundle(DoFn<KV<String, Iterable<Mutation>>, Void>.Context context) throws Exception {
            this.mutators = Maps.newConcurrentMap();
        }

        private synchronized BufferedMutator getBufferedMutator(DoFn<KV<String, Iterable<Mutation>>, Void>.Context context, String str) throws IOException {
            BufferedMutator bufferedMutator = this.mutators.get(str);
            if (bufferedMutator == null) {
                bufferedMutator = getConnection().getBufferedMutator(new BufferedMutatorParams(TableName.valueOf(str)).writeBufferSize(BulkOptions.BIGTABLE_MAX_MEMORY_DEFAULT).listener(createExceptionListener(context)));
                this.mutators.put(str, bufferedMutator);
            }
            return bufferedMutator;
        }

        protected BufferedMutator.ExceptionListener createExceptionListener(final DoFn<KV<String, Iterable<Mutation>>, Void>.Context context) {
            return new BufferedMutator.ExceptionListener() { // from class: com.spotify.scio.bigtable.BigtableMultiTableWrite.CloudBigtableMultiTableBufferedWriteFn.1
                public void onException(RetriesExhaustedWithDetailsException retriesExhaustedWithDetailsException, BufferedMutator bufferedMutator) throws RetriesExhaustedWithDetailsException {
                    CloudBigtableMultiTableBufferedWriteFn.this.logExceptions(context, retriesExhaustedWithDetailsException);
                    throw retriesExhaustedWithDetailsException;
                }
            };
        }

        public void processElement(DoFn<KV<String, Iterable<Mutation>>, Void>.ProcessContext processContext) throws Exception {
            KV kv = (KV) processContext.element();
            getBufferedMutator(processContext, (String) kv.getKey()).mutate(Lists.newArrayList((Iterable) kv.getValue()));
            this.mutationsCounter.addValue(Long.valueOf(r0.size()));
        }

        public void finishBundle(DoFn<KV<String, Iterable<Mutation>>, Void>.Context context) throws Exception {
            try {
                try {
                    Iterator<BufferedMutator> it = this.mutators.values().iterator();
                    while (it.hasNext()) {
                        it.next().close();
                    }
                } catch (RetriesExhaustedWithDetailsException e) {
                    this.exceptionsCounter.addValue(Long.valueOf(e.getCauses().size()));
                    logExceptions(context, e);
                    rethrowException(e);
                    super.finishBundle(context);
                }
            } finally {
                super.finishBundle(context);
            }
        }
    }

    public static PTransform<PCollection<KV<String, Iterable<Mutation>>>, PDone> writeToMultipleTables(CloudBigtableConfiguration cloudBigtableConfiguration) {
        validateConfig(cloudBigtableConfiguration);
        return new CloudBigtableIO.CloudBigtableWriteTransform(new CloudBigtableMultiTableBufferedWriteFn(cloudBigtableConfiguration));
    }

    private static void checkNotNullOrEmpty(String str, String str2) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "A " + str2 + " must be set to configure Bigtable properly.");
    }

    private static void validateConfig(CloudBigtableConfiguration cloudBigtableConfiguration) {
        checkNotNullOrEmpty(cloudBigtableConfiguration.getProjectId(), "projectId");
        checkNotNullOrEmpty(cloudBigtableConfiguration.getZoneId(), "zoneId");
        checkNotNullOrEmpty(cloudBigtableConfiguration.getClusterId(), "clusterId");
    }
}
