package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsLogger;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.class */
public class BatchedStreamingWrite<ErrorT, ElementT> extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, PCollection<ErrorT>> {
    private final BigQueryServices bqServices;
    private final InsertRetryPolicy retryPolicy;
    private final TupleTag<ErrorT> failedOutputTag;
    private final AtomicCoder<ErrorT> failedOutputCoder;
    private final ErrorContainer<ErrorT> errorContainer;
    private final boolean skipInvalidRows;
    private final boolean ignoreUnknownValues;
    private final boolean ignoreInsertIds;
    private final SerializableFunction<ElementT, TableRow> toTableRow;
    private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
    private final Set<MetricName> metricFilter;
    private Counter byteCounter;
    private final boolean batchViaStateful;
    private static final TupleTag<Void> mainOutputTag = new TupleTag<>("mainOutput");
    private static final Duration BATCH_MAX_BUFFERING_DURATION = Duration.millis(200);

    /* JADX INFO: Access modifiers changed from: private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite$BatchAndInsertElements.class */
    public class BatchAndInsertElements extends DoFn<KV<String, TableRowInfo<ElementT>>, Void> {
        private transient Map<String, List<FailsafeValueInSingleWindow<TableRow, TableRow>>> tableRows;
        private transient Map<String, List<String>> uniqueIdsForTableRows;

        private BatchAndInsertElements() {
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.tableRows = new HashMap();
            this.uniqueIdsForTableRows = new HashMap();
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<String, TableRowInfo<ElementT>> kv, @DoFn.Timestamp Instant instant, BoundedWindow boundedWindow, PaneInfo paneInfo) {
            String str = (String) kv.getKey();
            this.tableRows.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            }).add(FailsafeValueInSingleWindow.of((TableRow) BatchedStreamingWrite.this.toTableRow.apply(((TableRowInfo) kv.getValue()).tableRow), instant, boundedWindow, paneInfo, (TableRow) BatchedStreamingWrite.this.toFailsafeTableRow.apply(((TableRowInfo) kv.getValue()).tableRow)));
            this.uniqueIdsForTableRows.computeIfAbsent(str, str3 -> {
                return new ArrayList();
            }).add(((TableRowInfo) kv.getValue()).uniqueId);
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn<KV<String, TableRowInfo<ElementT>>, Void>.FinishBundleContext finishBundleContext) throws Exception {
            ArrayList<ValueInSingleWindow> newArrayList = Lists.newArrayList();
            BigQueryOptions bigQueryOptions = (BigQueryOptions) finishBundleContext.getPipelineOptions().as(BigQueryOptions.class);
            for (Map.Entry<String, List<FailsafeValueInSingleWindow<TableRow, TableRow>>> entry : this.tableRows.entrySet()) {
                BatchedStreamingWrite.this.flushRows(BigQueryHelpers.parseTableSpec(entry.getKey()), entry.getValue(), this.uniqueIdsForTableRows.get(entry.getKey()), bigQueryOptions, newArrayList);
            }
            this.tableRows.clear();
            this.uniqueIdsForTableRows.clear();
            for (ValueInSingleWindow valueInSingleWindow : newArrayList) {
                finishBundleContext.output(BatchedStreamingWrite.this.failedOutputTag, valueInSingleWindow.getValue(), valueInSingleWindow.getTimestamp(), valueInSingleWindow.getWindow());
            }
            BatchedStreamingWrite.this.reportStreamingApiLogging(bigQueryOptions);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite$InsertBatchedElements.class */
    public class InsertBatchedElements extends DoFn<KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>>, Void> {
        private InsertBatchedElements() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>> kv, BoundedWindow boundedWindow, DoFn<KV<ShardedKey<String>, Iterable<TableRowInfo<ElementT>>>, Void>.ProcessContext processContext, DoFn.MultiOutputReceiver multiOutputReceiver) throws InterruptedException {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            for (TableRowInfo tableRowInfo : (Iterable) kv.getValue()) {
                arrayList.add(FailsafeValueInSingleWindow.of((TableRow) BatchedStreamingWrite.this.toTableRow.apply(tableRowInfo.tableRow), processContext.timestamp(), boundedWindow, processContext.pane(), (TableRow) BatchedStreamingWrite.this.toFailsafeTableRow.apply(tableRowInfo.tableRow)));
                arrayList2.add(tableRowInfo.uniqueId);
            }
            BigQueryOptions bigQueryOptions = (BigQueryOptions) processContext.getPipelineOptions().as(BigQueryOptions.class);
            TableReference parseTableSpec = BigQueryHelpers.parseTableSpec((String) ((ShardedKey) kv.getKey()).getKey());
            ArrayList newArrayList = Lists.newArrayList();
            BatchedStreamingWrite.this.flushRows(parseTableSpec, arrayList, arrayList2, bigQueryOptions, newArrayList);
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                multiOutputReceiver.get(BatchedStreamingWrite.this.failedOutputTag).output(((ValueInSingleWindow) it.next()).getValue());
            }
            BatchedStreamingWrite.this.reportStreamingApiLogging(bigQueryOptions);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite$ViaBundleFinalization.class */
    public class ViaBundleFinalization extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, PCollection<ErrorT>> {
        private ViaBundleFinalization() {
        }

        public PCollection<ErrorT> expand(PCollection<KV<String, TableRowInfo<ElementT>>> pCollection) {
            PCollection<ErrorT> pCollection2 = pCollection.apply(ParDo.of(new BatchAndInsertElements()).withOutputTags(BatchedStreamingWrite.mainOutputTag, TupleTagList.of(BatchedStreamingWrite.this.failedOutputTag))).get(BatchedStreamingWrite.this.failedOutputTag);
            pCollection2.setCoder(BatchedStreamingWrite.this.failedOutputCoder);
            return pCollection2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite$ViaStateful.class */
    public class ViaStateful extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, PCollection<ErrorT>> {
        private ViaStateful() {
        }

        public PCollection<ErrorT> expand(PCollection<KV<String, TableRowInfo<ElementT>>> pCollection) {
            PCollection<ErrorT> pCollection2 = pCollection.apply(Window.into(new GlobalWindows()).triggering(DefaultTrigger.of()).discardingFiredPanes()).apply(GroupIntoBatches.ofSize(((BigQueryOptions) pCollection.getPipeline().getOptions().as(BigQueryOptions.class)).getMaxStreamingRowsToBatch().longValue()).withMaxBufferingDuration(BatchedStreamingWrite.BATCH_MAX_BUFFERING_DURATION).withShardedKey()).setCoder(KvCoder.of(ShardedKey.Coder.of(StringUtf8Coder.of()), IterableCoder.of((TableRowInfoCoder) pCollection.getCoder().getCoderArguments().get(1)))).apply(ParDo.of(new InsertBatchedElements()).withOutputTags(BatchedStreamingWrite.mainOutputTag, TupleTagList.of(BatchedStreamingWrite.this.failedOutputTag))).get(BatchedStreamingWrite.this.failedOutputTag);
            pCollection2.setCoder(BatchedStreamingWrite.this.failedOutputCoder);
            return pCollection2;
        }
    }

    public BatchedStreamingWrite(BigQueryServices bigQueryServices, InsertRetryPolicy insertRetryPolicy, TupleTag<ErrorT> tupleTag, AtomicCoder<ErrorT> atomicCoder, ErrorContainer<ErrorT> errorContainer, boolean z, boolean z2, boolean z3, SerializableFunction<ElementT, TableRow> serializableFunction, SerializableFunction<ElementT, TableRow> serializableFunction2) {
        this.byteCounter = SinkMetrics.bytesWritten();
        this.bqServices = bigQueryServices;
        this.retryPolicy = insertRetryPolicy;
        this.failedOutputTag = tupleTag;
        this.failedOutputCoder = atomicCoder;
        this.errorContainer = errorContainer;
        this.skipInvalidRows = z;
        this.ignoreUnknownValues = z2;
        this.ignoreInsertIds = z3;
        this.toTableRow = serializableFunction;
        this.toFailsafeTableRow = serializableFunction2;
        this.metricFilter = getMetricFilter();
        this.batchViaStateful = false;
    }

    private BatchedStreamingWrite(BigQueryServices bigQueryServices, InsertRetryPolicy insertRetryPolicy, TupleTag<ErrorT> tupleTag, AtomicCoder<ErrorT> atomicCoder, ErrorContainer<ErrorT> errorContainer, boolean z, boolean z2, boolean z3, SerializableFunction<ElementT, TableRow> serializableFunction, SerializableFunction<ElementT, TableRow> serializableFunction2, boolean z4) {
        this.byteCounter = SinkMetrics.bytesWritten();
        this.bqServices = bigQueryServices;
        this.retryPolicy = insertRetryPolicy;
        this.failedOutputTag = tupleTag;
        this.failedOutputCoder = atomicCoder;
        this.errorContainer = errorContainer;
        this.skipInvalidRows = z;
        this.ignoreUnknownValues = z2;
        this.ignoreInsertIds = z3;
        this.toTableRow = serializableFunction;
        this.toFailsafeTableRow = serializableFunction2;
        this.metricFilter = getMetricFilter();
        this.batchViaStateful = z4;
    }

    private static Set<MetricName> getMetricFilter() {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        builder.add(MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_LATENCIES, BigQueryServicesImpl.API_METRIC_LABEL));
        Iterator<String> it = BigQueryServicesImpl.DatasetServiceImpl.CANONICAL_STATUS_MAP.values().iterator();
        while (it.hasNext()) {
            builder.add(MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, ImmutableMap.builder().putAll(BigQueryServicesImpl.API_METRIC_LABEL).put("STATUS", it.next()).build()));
        }
        builder.add(MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, ImmutableMap.builder().putAll(BigQueryServicesImpl.API_METRIC_LABEL).put("STATUS", "unknown").build()));
        return builder.build();
    }

    public BatchedStreamingWrite<ErrorT, ElementT> viaDoFnFinalization() {
        return new BatchedStreamingWrite<>(this.bqServices, this.retryPolicy, this.failedOutputTag, this.failedOutputCoder, this.errorContainer, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds, this.toTableRow, this.toFailsafeTableRow, false);
    }

    public BatchedStreamingWrite<ErrorT, ElementT> viaStateful() {
        return new BatchedStreamingWrite<>(this.bqServices, this.retryPolicy, this.failedOutputTag, this.failedOutputCoder, this.errorContainer, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds, this.toTableRow, this.toFailsafeTableRow, true);
    }

    public PCollection<ErrorT> expand(PCollection<KV<String, TableRowInfo<ElementT>>> pCollection) {
        return this.batchViaStateful ? pCollection.apply(new ViaStateful()) : pCollection.apply(new ViaBundleFinalization());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flushRows(TableReference tableReference, List<FailsafeValueInSingleWindow<TableRow, TableRow>> list, List<String> list2, BigQueryOptions bigQueryOptions, List<ValueInSingleWindow<ErrorT>> list3) throws InterruptedException {
        if (list.isEmpty()) {
            return;
        }
        try {
            this.byteCounter.inc(this.bqServices.getDatasetService(bigQueryOptions).insertAll(tableReference, list, list2, this.retryPolicy, list3, this.errorContainer, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reportStreamingApiLogging(BigQueryOptions bigQueryOptions) {
        MetricsLogger processWideContainer = MetricsEnvironment.getProcessWideContainer();
        if (processWideContainer instanceof MetricsLogger) {
            processWideContainer.tryLoggingMetrics("BigQuery HTTP API Metrics: \n", this.metricFilter, bigQueryOptions.getBqStreamingApiLoggingFrequencySec().intValue() * 1000, true);
        }
    }
}
