package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.class */
public class StreamingWriteTables<ElementT> extends PTransform<PCollection<KV<TableDestination, ElementT>>, WriteResult> {
    private BigQueryServices bigQueryServices;
    private InsertRetryPolicy retryPolicy;
    private boolean extendedErrorInfo;
    private static final String FAILED_INSERTS_TAG_ID = "failedInserts";
    private final boolean skipInvalidRows;
    private final boolean ignoreUnknownValues;
    private final boolean ignoreInsertIds;
    private final boolean autoSharding;
    private final Coder<ElementT> elementCoder;
    private final SerializableFunction<ElementT, TableRow> toTableRow;
    private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
    private final SerializableFunction<ElementT, String> deterministicRecordIdFn;

    public StreamingWriteTables() {
        this(new BigQueryServicesImpl(), InsertRetryPolicy.alwaysRetry(), false, false, false, false, false, null, null, null, null);
    }

    private StreamingWriteTables(BigQueryServices bigQueryServices, InsertRetryPolicy insertRetryPolicy, boolean z, boolean z2, boolean z3, boolean z4, boolean z5, Coder<ElementT> coder, SerializableFunction<ElementT, TableRow> serializableFunction, SerializableFunction<ElementT, TableRow> serializableFunction2, SerializableFunction<ElementT, String> serializableFunction3) {
        this.bigQueryServices = bigQueryServices;
        this.retryPolicy = insertRetryPolicy;
        this.extendedErrorInfo = z;
        this.skipInvalidRows = z2;
        this.ignoreUnknownValues = z3;
        this.ignoreInsertIds = z4;
        this.autoSharding = z5;
        this.elementCoder = coder;
        this.toTableRow = serializableFunction;
        this.toFailsafeTableRow = serializableFunction2;
        this.deterministicRecordIdFn = serializableFunction3;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingWriteTables<ElementT> withTestServices(BigQueryServices bigQueryServices) {
        return new StreamingWriteTables<>(bigQueryServices, this.retryPolicy, this.extendedErrorInfo, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds, this.autoSharding, this.elementCoder, this.toTableRow, this.toFailsafeTableRow, this.deterministicRecordIdFn);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingWriteTables<ElementT> withInsertRetryPolicy(InsertRetryPolicy insertRetryPolicy) {
        return new StreamingWriteTables<>(this.bigQueryServices, insertRetryPolicy, this.extendedErrorInfo, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds, this.autoSharding, this.elementCoder, this.toTableRow, this.toFailsafeTableRow, this.deterministicRecordIdFn);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingWriteTables<ElementT> withExtendedErrorInfo(boolean z) {
        return new StreamingWriteTables<>(this.bigQueryServices, this.retryPolicy, z, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds, this.autoSharding, this.elementCoder, this.toTableRow, this.toFailsafeTableRow, this.deterministicRecordIdFn);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingWriteTables<ElementT> withSkipInvalidRows(boolean z) {
        return new StreamingWriteTables<>(this.bigQueryServices, this.retryPolicy, this.extendedErrorInfo, z, this.ignoreUnknownValues, this.ignoreInsertIds, this.autoSharding, this.elementCoder, this.toTableRow, this.toFailsafeTableRow, this.deterministicRecordIdFn);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingWriteTables<ElementT> withIgnoreUnknownValues(boolean z) {
        return new StreamingWriteTables<>(this.bigQueryServices, this.retryPolicy, this.extendedErrorInfo, this.skipInvalidRows, z, this.ignoreInsertIds, this.autoSharding, this.elementCoder, this.toTableRow, this.toFailsafeTableRow, this.deterministicRecordIdFn);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingWriteTables<ElementT> withIgnoreInsertIds(boolean z) {
        return new StreamingWriteTables<>(this.bigQueryServices, this.retryPolicy, this.extendedErrorInfo, this.skipInvalidRows, this.ignoreUnknownValues, z, this.autoSharding, this.elementCoder, this.toTableRow, this.toFailsafeTableRow, this.deterministicRecordIdFn);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingWriteTables<ElementT> withAutoSharding(boolean z) {
        return new StreamingWriteTables<>(this.bigQueryServices, this.retryPolicy, this.extendedErrorInfo, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds, z, this.elementCoder, this.toTableRow, this.toFailsafeTableRow, this.deterministicRecordIdFn);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingWriteTables<ElementT> withElementCoder(Coder<ElementT> coder) {
        return new StreamingWriteTables<>(this.bigQueryServices, this.retryPolicy, this.extendedErrorInfo, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds, this.autoSharding, coder, this.toTableRow, this.toFailsafeTableRow, this.deterministicRecordIdFn);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingWriteTables<ElementT> withToTableRow(SerializableFunction<ElementT, TableRow> serializableFunction) {
        return new StreamingWriteTables<>(this.bigQueryServices, this.retryPolicy, this.extendedErrorInfo, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds, this.autoSharding, this.elementCoder, serializableFunction, this.toFailsafeTableRow, this.deterministicRecordIdFn);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingWriteTables<ElementT> withToFailsafeTableRow(SerializableFunction<ElementT, TableRow> serializableFunction) {
        return new StreamingWriteTables<>(this.bigQueryServices, this.retryPolicy, this.extendedErrorInfo, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds, this.autoSharding, this.elementCoder, this.toTableRow, serializableFunction, this.deterministicRecordIdFn);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingWriteTables<ElementT> withDeterministicRecordIdFn(SerializableFunction<ElementT, String> serializableFunction) {
        return new StreamingWriteTables<>(this.bigQueryServices, this.retryPolicy, this.extendedErrorInfo, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds, this.autoSharding, this.elementCoder, this.toTableRow, this.toFailsafeTableRow, serializableFunction);
    }

    public WriteResult expand(PCollection<KV<TableDestination, ElementT>> pCollection) {
        if (this.extendedErrorInfo) {
            TupleTag<T> tupleTag = new TupleTag<>(FAILED_INSERTS_TAG_ID);
            PCollectionTuple writeAndGetErrors = writeAndGetErrors(pCollection, tupleTag, BigQueryInsertErrorCoder.of(), ErrorContainer.BIG_QUERY_INSERT_ERROR_ERROR_CONTAINER);
            return WriteResult.withExtendedErrors(pCollection.getPipeline(), tupleTag, writeAndGetErrors.get(tupleTag), writeAndGetErrors.get(BatchedStreamingWrite.SUCCESSFUL_ROWS_TAG));
        }
        TupleTag<T> tupleTag2 = new TupleTag<>(FAILED_INSERTS_TAG_ID);
        PCollectionTuple writeAndGetErrors2 = writeAndGetErrors(pCollection, tupleTag2, TableRowJsonCoder.of(), ErrorContainer.TABLE_ROW_ERROR_CONTAINER);
        return WriteResult.in(pCollection.getPipeline(), tupleTag2, writeAndGetErrors2.get(tupleTag2), writeAndGetErrors2.get(BatchedStreamingWrite.SUCCESSFUL_ROWS_TAG));
    }

    private <T> PCollectionTuple writeAndGetErrors(PCollection<KV<TableDestination, ElementT>> pCollection, TupleTag<T> tupleTag, AtomicCoder<T> atomicCoder, ErrorContainer<T> errorContainer) {
        BigQueryOptions bigQueryOptions = (BigQueryOptions) pCollection.getPipeline().getOptions().as(BigQueryOptions.class);
        if (this.autoSharding && this.deterministicRecordIdFn == null) {
            return pCollection.apply("MapToTableSpec", MapElements.via(new SimpleFunction<KV<TableDestination, ElementT>, KV<String, ElementT>>() { // from class: org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteTables.1
                public KV<String, ElementT> apply(KV<TableDestination, ElementT> kv) {
                    return KV.of(((TableDestination) kv.getKey()).getTableSpec(), kv.getValue());
                }
            })).setCoder(KvCoder.of(StringUtf8Coder.of(), this.elementCoder)).apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds())).setCoder(KvCoder.of(StringUtf8Coder.of(), TableRowInfoCoder.of(this.elementCoder))).apply("StreamingWrite", new BatchedStreamingWrite(this.bigQueryServices, this.retryPolicy, tupleTag, atomicCoder, errorContainer, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds, this.toTableRow, this.toFailsafeTableRow).viaStateful());
        }
        PCollection coder = pCollection.apply("ShardTableWrites", ParDo.of(new GenerateShardedTable(bigQueryOptions.getNumStreamingKeys().intValue()))).setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), this.elementCoder)).apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds(this.deterministicRecordIdFn))).setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of(this.elementCoder)));
        if (this.deterministicRecordIdFn == null) {
            coder = (PCollection) coder.apply(Reshuffle.of());
        }
        return coder.apply("GlobalWindow", Window.into(new GlobalWindows()).triggering(DefaultTrigger.of()).discardingFiredPanes()).apply("StripShardId", MapElements.via(new SimpleFunction<KV<ShardedKey<String>, TableRowInfo<ElementT>>, KV<String, TableRowInfo<ElementT>>>() { // from class: org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteTables.2
            public KV<String, TableRowInfo<ElementT>> apply(KV<ShardedKey<String>, TableRowInfo<ElementT>> kv) {
                return KV.of((String) ((ShardedKey) kv.getKey()).getKey(), (TableRowInfo) kv.getValue());
            }
        })).setCoder(KvCoder.of(StringUtf8Coder.of(), TableRowInfoCoder.of(this.elementCoder))).apply("StreamingWrite", new BatchedStreamingWrite(this.bigQueryServices, this.retryPolicy, tupleTag, atomicCoder, errorContainer, this.skipInvalidRows, this.ignoreUnknownValues, this.ignoreInsertIds, this.toTableRow, this.toFailsafeTableRow).viaDoFnFinalization());
    }
}
