package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.core.ApiFutures;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.DescriptorProtos;
import io.grpc.Status;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager;
import org.apache.beam.sdk.io.gcp.bigquery.SplittingIterable;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.checkerframework.dataflow.qual.SideEffectFree;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.class */
public class StorageApiWritesShardedRecords<DestinationT, ElementT> extends PTransform<PCollection<KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>>>, PCollectionTuple> {
    private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
    private final BigQueryIO.Write.CreateDisposition createDisposition;
    private final String kmsKey;
    private final BigQueryServices bqServices;
    private final Coder<DestinationT> destinationCoder;
    private final Coder<BigQueryStorageApiInsertError> failedRowsCoder;
    private final boolean autoUpdateSchema;
    private final boolean ignoreUnknownValues;
    private final AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation;
    private final TupleTag<BigQueryStorageApiInsertError> failedRowsTag;
    private final TupleTag<TableRow> successfulRowsTag;
    private final Coder<TableRow> succussfulRowsCoder;
    private static final Logger LOG = LoggerFactory.getLogger(StorageApiWritesShardedRecords.class);
    private static final Duration DEFAULT_STREAM_IDLE_TIME = Duration.standardHours(1);
    private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();
    private static final Cache<ShardedKey<?>, AppendClientInfo> APPEND_CLIENTS = CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).removalListener(removalNotification -> {
        AppendClientInfo appendClientInfo = (AppendClientInfo) removalNotification.getValue();
        if (appendClientInfo != null) {
            appendClientInfo.close();
        }
    }).build();
    private final Duration streamIdleTime = DEFAULT_STREAM_IDLE_TIME;
    private final TupleTag<KV<String, StorageApiFlushAndFinalizeDoFn.Operation>> flushTag = new TupleTag<>("flushTag");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords$AppendRowsContext.class */
    public class AppendRowsContext extends RetryManager.Operation.Context<AppendRowsResponse> {
        final ShardedKey<DestinationT> key;
        String streamName = ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME;
        BigQueryServices.StreamAppendClient client = null;
        long offset = -1;
        long numRows = 0;
        long tryIteration = 0;
        ProtoRows protoRows;
        List<Instant> timestamps;

        AppendRowsContext(ShardedKey<DestinationT> shardedKey, ProtoRows protoRows, List<Instant> list) {
            this.key = shardedKey;
            this.protoRows = protoRows;
            this.timestamps = list;
        }

        @SideEffectFree
        public String toString() {
            return "Context: key=" + this.key + " streamName=" + this.streamName + " offset=" + this.offset + " numRows=" + this.numRows + " tryIteration: " + this.tryIteration;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords$ThrowingRunnable.class */
    public interface ThrowingRunnable {
        void run() throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords$WriteRecordsDoFn.class */
    public class WriteRecordsDoFn extends DoFn<KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>>, KV<String, StorageApiFlushAndFinalizeDoFn.Operation>> {
        private TwoLevelMessageConverterCache<DestinationT, ElementT> messageConverters;
        private final Duration streamIdleTime;
        private final long splitSize;
        private final long maxRequestSize;
        private final Counter recordsAppended = Metrics.counter(WriteRecordsDoFn.class, "recordsAppended");
        private final Counter streamsCreated = Metrics.counter(WriteRecordsDoFn.class, "streamsCreated");
        private final Counter streamsIdle = Metrics.counter(WriteRecordsDoFn.class, "idleStreamsFinalized");
        private final Counter appendFailures = Metrics.counter(WriteRecordsDoFn.class, "appendFailures");
        private final Counter appendOffsetFailures = Metrics.counter(WriteRecordsDoFn.class, "appendOffsetFailures");
        private final Counter flushesScheduled = Metrics.counter(WriteRecordsDoFn.class, "flushesScheduled");
        private final Distribution appendLatencyDistribution = Metrics.distribution(WriteRecordsDoFn.class, "appendLatencyDistributionMs");
        private final Distribution appendSizeDistribution = Metrics.distribution(WriteRecordsDoFn.class, "appendSizeDistribution");
        private final Distribution appendSplitDistribution = Metrics.distribution(WriteRecordsDoFn.class, "appendSplitDistribution");
        private final Counter rowsSentToFailedRowsCollection = Metrics.counter(WriteRecordsDoFn.class, "rowsSentToFailedRowsCollection");
        private Map<DestinationT, TableDestination> destinations = Maps.newHashMap();
        private transient BigQueryServices.DatasetService datasetServiceInternal = null;
        private transient BigQueryServices.WriteStreamService writeStreamServiceInternal = null;

        @DoFn.StateId("streamName")
        private final StateSpec<ValueState<String>> streamNameSpec = StateSpecs.value();

        @DoFn.StateId("streamOffset")
        private final StateSpec<ValueState<Long>> streamOffsetSpec = StateSpecs.value();

        @DoFn.StateId("updatedSchema")
        private final StateSpec<ValueState<TableSchema>> updatedSchema = StateSpecs.value(ProtoCoder.of(TableSchema.class));

        @DoFn.TimerId("idleTimer")
        private final TimerSpec idleTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

        public WriteRecordsDoFn(String str, Duration duration, long j, long j2) {
            this.messageConverters = new TwoLevelMessageConverterCache<>(str);
            this.streamIdleTime = duration;
            this.splitSize = j;
            this.maxRequestSize = j2;
        }

        @DoFn.StartBundle
        public void startBundle() throws IOException {
            this.destinations = Maps.newHashMap();
        }

        String getOrCreateStream(String str, ValueState<String> valueState, ValueState<Long> valueState2, Timer timer, BigQueryServices.WriteStreamService writeStreamService, Callable<Boolean> callable) {
            try {
                String str2 = (String) valueState.read();
                AtomicReference atomicReference = new AtomicReference();
                if (str2 == null || ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME.equals(str2)) {
                    CreateTableHelpers.createTableWrapper(() -> {
                        atomicReference.set(writeStreamService.createWriteStream(str, WriteStream.Type.BUFFERED).getName());
                        return null;
                    }, callable);
                    valueState.write((String) atomicReference.get());
                    valueState2.write(0L);
                    this.streamsCreated.inc();
                } else {
                    atomicReference.set(str2);
                }
                timer.offset(this.streamIdleTime).withNoOutputTimestamp().setRelative();
                return (String) atomicReference.get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private BigQueryServices.DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
            if (this.datasetServiceInternal == null) {
                this.datasetServiceInternal = StorageApiWritesShardedRecords.this.bqServices.getDatasetService((BigQueryOptions) pipelineOptions.as(BigQueryOptions.class));
            }
            return this.datasetServiceInternal;
        }

        private BigQueryServices.WriteStreamService getWriteStreamService(PipelineOptions pipelineOptions) throws IOException {
            if (this.writeStreamServiceInternal == null) {
                this.writeStreamServiceInternal = StorageApiWritesShardedRecords.this.bqServices.getWriteStreamService((BigQueryOptions) pipelineOptions.as(BigQueryOptions.class));
            }
            return this.writeStreamServiceInternal;
        }

        @DoFn.Teardown
        public void onTeardown() {
            try {
                if (this.writeStreamServiceInternal != null) {
                    this.writeStreamServiceInternal.close();
                    this.writeStreamServiceInternal = null;
                }
                if (this.datasetServiceInternal != null) {
                    this.datasetServiceInternal.close();
                    this.datasetServiceInternal = null;
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @DoFn.ProcessElement
        public void process(DoFn<KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>>, KV<String, StorageApiFlushAndFinalizeDoFn.Operation>>.ProcessContext processContext, PipelineOptions pipelineOptions, @DoFn.Element KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>> kv, @DoFn.Timestamp Instant instant, @DoFn.StateId("streamName") @DoFn.AlwaysFetched ValueState<String> valueState, @DoFn.StateId("streamOffset") @DoFn.AlwaysFetched ValueState<Long> valueState2, @DoFn.StateId("updatedSchema") ValueState<TableSchema> valueState3, @DoFn.TimerId("idleTimer") Timer timer, DoFn.MultiOutputReceiver multiOutputReceiver) throws Exception {
            BigQueryOptions bigQueryOptions = (BigQueryOptions) pipelineOptions.as(BigQueryOptions.class);
            if (StorageApiWritesShardedRecords.this.autoUpdateSchema) {
                valueState3.readLater();
            }
            StorageApiWritesShardedRecords.this.dynamicDestinations.setSideInputAccessorFromProcessContext(processContext);
            TableDestination tableDestination = (TableDestination) this.destinations.computeIfAbsent(((ShardedKey) kv.getKey()).getKey(), obj -> {
                TableDestination table = StorageApiWritesShardedRecords.this.dynamicDestinations.getTable(obj);
                Preconditions.checkArgument(table != null, "DynamicDestinations.getTable() may not return null, but %s returned null for destination %s", StorageApiWritesShardedRecords.this.dynamicDestinations, obj);
                return table;
            });
            String tableUrn = tableDestination.getTableUrn(bigQueryOptions);
            String shortTableUrn = tableDestination.getShortTableUrn();
            BigQueryServices.DatasetService datasetService = getDatasetService(pipelineOptions);
            BigQueryServices.WriteStreamService writeStreamService = getWriteStreamService(pipelineOptions);
            Coder<DestinationT> destinationCoder = StorageApiWritesShardedRecords.this.dynamicDestinations.getDestinationCoder();
            Callable callable = () -> {
                Object key = ((ShardedKey) kv.getKey()).getKey();
                CreateTableHelpers.possiblyCreateTable((BigQueryOptions) processContext.getPipelineOptions().as(BigQueryOptions.class), tableDestination, () -> {
                    return StorageApiWritesShardedRecords.this.dynamicDestinations.getSchema(key);
                }, () -> {
                    return StorageApiWritesShardedRecords.this.dynamicDestinations.getTableConstraints(key);
                }, StorageApiWritesShardedRecords.this.createDisposition, destinationCoder, StorageApiWritesShardedRecords.this.kmsKey, StorageApiWritesShardedRecords.this.bqServices);
                return true;
            };
            Supplier supplier = () -> {
                return getOrCreateStream(tableUrn, valueState, valueState2, timer, writeStreamService, callable);
            };
            Callable callable2 = () -> {
                TableSchema tableSchema;
                DescriptorProtos.DescriptorProto descriptor;
                TableSchema tableSchema2 = (TableSchema) valueState3.read();
                if (!StorageApiWritesShardedRecords.this.autoUpdateSchema || tableSchema2 == null) {
                    StorageApiDynamicDestinations.MessageConverter messageConverter = this.messageConverters.get(((ShardedKey) kv.getKey()).getKey(), StorageApiWritesShardedRecords.this.dynamicDestinations, datasetService);
                    tableSchema = messageConverter.getTableSchema();
                    descriptor = messageConverter.getDescriptor(false);
                } else {
                    tableSchema = tableSchema2;
                    descriptor = TableRowToStorageApiProto.descriptorSchemaFromTableSchema(tableSchema, true, false);
                }
                AppendClientInfo withAppendClient = AppendClientInfo.of((TableSchema) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(tableSchema), descriptor, (Consumer<BigQueryServices.StreamAppendClient>) streamAppendClient -> {
                    StorageApiWritesShardedRecords.runAsyncIgnoreFailure(StorageApiWritesShardedRecords.closeWriterExecutor, () -> {
                        streamAppendClient.unpin();
                        streamAppendClient.close();
                    });
                }).withAppendClient(writeStreamService, supplier, false, StorageApiWritesShardedRecords.this.defaultMissingValueInterpretation);
                ((BigQueryServices.StreamAppendClient) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(withAppendClient.getStreamAppendClient())).pin();
                return withAppendClient;
            };
            AtomicReference atomicReference = new AtomicReference((AppendClientInfo) StorageApiWritesShardedRecords.APPEND_CLIENTS.get((ShardedKey) kv.getKey(), callable2));
            if (!((String) supplier.get()).equals(((AppendClientInfo) atomicReference.get()).getStreamName())) {
                StorageApiWritesShardedRecords.APPEND_CLIENTS.invalidate(kv.getKey());
                atomicReference.set((AppendClientInfo) StorageApiWritesShardedRecords.APPEND_CLIENTS.get((ShardedKey) kv.getKey(), callable2));
            }
            TableSchema tableSchema = (TableSchema) valueState3.read();
            if (StorageApiWritesShardedRecords.this.autoUpdateSchema && tableSchema != null && ((AppendClientInfo) atomicReference.get()).hasSchemaChanged(tableSchema)) {
                atomicReference.set(AppendClientInfo.of(tableSchema, ((AppendClientInfo) atomicReference.get()).getCloseAppendClient(), false));
                StorageApiWritesShardedRecords.APPEND_CLIENTS.invalidate(kv.getKey());
                StorageApiWritesShardedRecords.APPEND_CLIENTS.put((ShardedKey) kv.getKey(), (AppendClientInfo) atomicReference.get());
            }
            SplittingIterable splittingIterable = new SplittingIterable((Iterable) kv.getValue(), this.splitSize, (tableRow, z) -> {
                return ((AppendClientInfo) atomicReference.get()).encodeUnknownFields(tableRow, z);
            }, byteString -> {
                return ((AppendClientInfo) atomicReference.get()).toTableRow(byteString);
            }, (timestampedValue, str) -> {
                multiOutputReceiver.get(StorageApiWritesShardedRecords.this.failedRowsTag).outputWithTimestamp(new BigQueryStorageApiInsertError((TableRow) timestampedValue.getValue(), str), timestampedValue.getTimestamp());
                this.rowsSentToFailedRowsCollection.inc();
                BigQuerySinkMetrics.appendRowsRowStatusCounter(BigQuerySinkMetrics.RowStatus.FAILED, BigQuerySinkMetrics.PAYLOAD_TOO_LARGE, shortTableUrn).inc(1L);
            }, StorageApiWritesShardedRecords.this.autoUpdateSchema, StorageApiWritesShardedRecords.this.ignoreUnknownValues, instant);
            BiConsumer biConsumer = (iterable, bool) -> {
                try {
                    if (bool.booleanValue()) {
                        valueState.write(ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME);
                    }
                    atomicReference.set(((AppendClientInfo) atomicReference.get()).withAppendClient(writeStreamService, supplier, false, StorageApiWritesShardedRecords.this.defaultMissingValueInterpretation));
                    BigQueryServices.StreamAppendClient streamAppendClient = (BigQueryServices.StreamAppendClient) org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull(((AppendClientInfo) atomicReference.get()).getStreamAppendClient());
                    String str2 = (String) org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull((String) valueState.read());
                    long longValue = ((Long) org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull((Long) valueState2.read())).longValue();
                    Iterator it = iterable.iterator();
                    while (it.hasNext()) {
                        AppendRowsContext appendRowsContext = (AppendRowsContext) it.next();
                        appendRowsContext.streamName = str2;
                        streamAppendClient.pin();
                        appendRowsContext.client = ((AppendClientInfo) atomicReference.get()).getStreamAppendClient();
                        appendRowsContext.offset = longValue;
                        appendRowsContext.tryIteration++;
                        longValue = appendRowsContext.offset + appendRowsContext.protoRows.getSerializedRowsCount();
                    }
                    valueState2.write(Long.valueOf(longValue));
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            };
            Consumer consumer = iterable2 -> {
                StorageApiWritesShardedRecords.APPEND_CLIENTS.invalidate(kv.getKey());
                atomicReference.set(((AppendClientInfo) atomicReference.get()).withNoAppendClient());
                StorageApiWritesShardedRecords.APPEND_CLIENTS.put((ShardedKey) kv.getKey(), (AppendClientInfo) atomicReference.get());
                Iterator it = iterable2.iterator();
                while (it.hasNext()) {
                    AppendRowsContext appendRowsContext = (AppendRowsContext) it.next();
                    if (appendRowsContext.client != null) {
                        ExecutorService executorService = StorageApiWritesShardedRecords.closeWriterExecutor;
                        BigQueryServices.StreamAppendClient streamAppendClient = appendRowsContext.client;
                        Objects.requireNonNull(streamAppendClient);
                        StorageApiWritesShardedRecords.runAsyncIgnoreFailure(executorService, streamAppendClient::unpin);
                        appendRowsContext.client = null;
                    }
                }
            };
            Function function = appendRowsContext -> {
                if (appendRowsContext.protoRows.getSerializedRowsCount() == 0) {
                    return ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
                }
                try {
                    atomicReference.set(((AppendClientInfo) atomicReference.get()).withAppendClient(writeStreamService, supplier, false, StorageApiWritesShardedRecords.this.defaultMissingValueInterpretation));
                    return ((BigQueryServices.StreamAppendClient) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(((AppendClientInfo) atomicReference.get()).getStreamAppendClient())).appendRows(appendRowsContext.offset, appendRowsContext.protoRows);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            };
            Function function2 = iterable3 -> {
                AppendRowsContext appendRowsContext2 = (AppendRowsContext) org.apache.beam.sdk.util.Preconditions.checkStateNotNull((AppendRowsContext) Iterables.getFirst(iterable3, (Object) null));
                BigQuerySinkMetrics.reportFailedRPCMetrics(appendRowsContext2, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS);
                String throwableToGRPCCodeString = BigQuerySinkMetrics.throwableToGRPCCodeString(appendRowsContext2.getError());
                if (appendRowsContext2.getError() == null || !(appendRowsContext2.getError() instanceof Exceptions.AppendSerializtionError)) {
                    Status.Code code = Status.fromThrowable((Throwable) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(appendRowsContext2.getError())).getCode();
                    boolean z2 = code.equals(Status.Code.OUT_OF_RANGE) || code.equals(Status.Code.ALREADY_EXISTS);
                    boolean equals = code.equals(Status.Code.RESOURCE_EXHAUSTED);
                    if (!z2) {
                        StorageApiWritesShardedRecords.LOG.error("Got error " + appendRowsContext2.getError() + " closing " + appendRowsContext2.streamName);
                    }
                    try {
                        callable.call();
                        if (!equals) {
                            consumer.accept(iterable3);
                        }
                        this.appendFailures.inc();
                        BigQuerySinkMetrics.appendRowsRowStatusCounter(BigQuerySinkMetrics.RowStatus.RETRIED, throwableToGRPCCodeString, shortTableUrn).inc(appendRowsContext2.protoRows.getSerializedRowsCount());
                        boolean z3 = (appendRowsContext2.getError() instanceof Exceptions.StreamFinalizedException) || code.equals(Status.Code.INVALID_ARGUMENT) || code.equals(Status.Code.NOT_FOUND) || code.equals(Status.Code.FAILED_PRECONDITION);
                        if (!z2 && !z3) {
                            return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
                        }
                        this.appendOffsetFailures.inc();
                        StorageApiWritesShardedRecords.LOG.warn("Append to " + appendRowsContext2 + " failed with " + appendRowsContext2.getError() + " Will retry with a new stream");
                        multiOutputReceiver.get(StorageApiWritesShardedRecords.this.flushTag).output(KV.of(appendRowsContext2.streamName, new StorageApiFlushAndFinalizeDoFn.Operation(appendRowsContext2.offset - 1, true)));
                        biConsumer.accept(iterable3, true);
                        return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                Exceptions.AppendSerializtionError appendSerializtionError = (Exceptions.AppendSerializtionError) org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull(appendRowsContext2.getError());
                Set keySet = appendSerializtionError.getRowIndexToErrorMessage().keySet();
                Iterator it = keySet.iterator();
                while (it.hasNext()) {
                    int intValue = ((Integer) it.next()).intValue();
                    multiOutputReceiver.get(StorageApiWritesShardedRecords.this.failedRowsTag).outputWithTimestamp(new BigQueryStorageApiInsertError(((AppendClientInfo) atomicReference.get()).toTableRow(appendRowsContext2.protoRows.getSerializedRows(intValue)), (String) appendSerializtionError.getRowIndexToErrorMessage().get(Integer.valueOf(intValue))), appendRowsContext2.timestamps.get(intValue));
                }
                int size = keySet.size();
                this.rowsSentToFailedRowsCollection.inc(size);
                BigQuerySinkMetrics.appendRowsRowStatusCounter(BigQuerySinkMetrics.RowStatus.FAILED, throwableToGRPCCodeString, shortTableUrn).inc(size);
                ProtoRows.Builder newBuilder = ProtoRows.newBuilder();
                ArrayList newArrayList = Lists.newArrayList();
                for (int i = 0; i < appendRowsContext2.protoRows.getSerializedRowsCount(); i++) {
                    if (!keySet.contains(Integer.valueOf(i))) {
                        newBuilder.addSerializedRows(appendRowsContext2.protoRows.getSerializedRows(i));
                        newArrayList.add(appendRowsContext2.timestamps.get(i));
                    }
                }
                appendRowsContext2.protoRows = newBuilder.build();
                appendRowsContext2.timestamps = newArrayList;
                BigQuerySinkMetrics.appendRowsRowStatusCounter(BigQuerySinkMetrics.RowStatus.RETRIED, throwableToGRPCCodeString, shortTableUrn).inc(appendRowsContext2.protoRows.getSerializedRowsCount());
                long j = appendRowsContext2.offset;
                Iterator it2 = iterable3.iterator();
                while (it2.hasNext()) {
                    ((AppendRowsContext) it2.next()).offset = j;
                    j += r0.protoRows.getSerializedRowsCount();
                }
                valueState2.write(Long.valueOf(j));
                return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
            };
            Consumer consumer2 = appendRowsContext2 -> {
                multiOutputReceiver.get(StorageApiWritesShardedRecords.this.flushTag).output(KV.of(appendRowsContext2.streamName, new StorageApiFlushAndFinalizeDoFn.Operation((appendRowsContext2.offset + appendRowsContext2.protoRows.getSerializedRowsCount()) - 1, false)));
                this.flushesScheduled.inc(appendRowsContext2.protoRows.getSerializedRowsCount());
                BigQuerySinkMetrics.reportSuccessfulRpcMetrics(appendRowsContext2, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableUrn);
                BigQuerySinkMetrics.appendRowsRowStatusCounter(BigQuerySinkMetrics.RowStatus.SUCCESSFUL, BigQuerySinkMetrics.OK, shortTableUrn);
                if (StorageApiWritesShardedRecords.this.successfulRowsTag != null) {
                    for (int i = 0; i < appendRowsContext2.protoRows.getSerializedRowsCount(); i++) {
                        multiOutputReceiver.get(StorageApiWritesShardedRecords.this.successfulRowsTag).outputWithTimestamp(((AppendClientInfo) atomicReference.get()).toTableRow(appendRowsContext2.protoRows.getSerializedRows(i)), appendRowsContext2.timestamps.get(i));
                    }
                }
            };
            java.time.Instant now = java.time.Instant.now();
            ArrayList<AppendRowsContext> newArrayList = Lists.newArrayList();
            RetryManager retryManager = new RetryManager(Duration.standardSeconds(1L), Duration.standardSeconds(10L), 1000, BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
            int i = 0;
            for (SplittingIterable.Value value : splittingIterable) {
                if (value.getProtoRows().getSerializedSize() >= this.maxRequestSize) {
                    if (value.getProtoRows().getSerializedRowsCount() > 1) {
                        StorageApiWritesShardedRecords.LOG.error("A request containing more than one row is over the request size limit of " + this.maxRequestSize + ". This is unexpected. All rows in the request will be sent to the failed-rows PCollection.");
                    }
                    for (int i2 = 0; i2 < value.getProtoRows().getSerializedRowsCount(); i2++) {
                        multiOutputReceiver.get(StorageApiWritesShardedRecords.this.failedRowsTag).outputWithTimestamp(new BigQueryStorageApiInsertError(((AppendClientInfo) atomicReference.get()).toTableRow(value.getProtoRows().getSerializedRows(i2)), "Row payload too large. Maximum size " + this.maxRequestSize), value.getTimestamps().get(i2));
                    }
                    int serializedRowsCount = value.getProtoRows().getSerializedRowsCount();
                    this.rowsSentToFailedRowsCollection.inc(serializedRowsCount);
                    BigQuerySinkMetrics.appendRowsRowStatusCounter(BigQuerySinkMetrics.RowStatus.FAILED, BigQuerySinkMetrics.PAYLOAD_TOO_LARGE, shortTableUrn).inc(serializedRowsCount);
                } else {
                    i++;
                    AppendRowsContext appendRowsContext3 = new AppendRowsContext((ShardedKey) kv.getKey(), value.getProtoRows(), value.getTimestamps());
                    newArrayList.add(appendRowsContext3);
                    retryManager.addOperation(function, function2, consumer2, appendRowsContext3);
                    this.recordsAppended.inc(value.getProtoRows().getSerializedRowsCount());
                    this.appendSizeDistribution.update(appendRowsContext3.protoRows.getSerializedRowsCount());
                }
            }
            if (i > 0) {
                biConsumer.accept(newArrayList, false);
                try {
                    retryManager.run(true);
                    for (AppendRowsContext appendRowsContext4 : newArrayList) {
                        if (appendRowsContext4.client != null) {
                            ExecutorService executorService = StorageApiWritesShardedRecords.closeWriterExecutor;
                            BigQueryServices.StreamAppendClient streamAppendClient = appendRowsContext4.client;
                            Objects.requireNonNull(streamAppendClient);
                            StorageApiWritesShardedRecords.runAsyncIgnoreFailure(executorService, streamAppendClient::unpin);
                        }
                    }
                    this.appendSplitDistribution.update(i);
                    if (StorageApiWritesShardedRecords.this.autoUpdateSchema) {
                        BigQueryServices.StreamAppendClient streamAppendClient2 = ((AppendClientInfo) atomicReference.get()).getStreamAppendClient();
                        TableSchema tableSchema2 = ((AppendClientInfo) atomicReference.get()).getTableSchema();
                        TableSchema updatedSchema = streamAppendClient2 != null ? streamAppendClient2.getUpdatedSchema() : null;
                        if (updatedSchema != null) {
                            Optional<TableSchema> updatedSchema2 = TableSchemaUpdateUtils.getUpdatedSchema(tableSchema2, updatedSchema);
                            if (updatedSchema2.isPresent()) {
                                atomicReference.set(AppendClientInfo.of(updatedSchema2.get(), ((AppendClientInfo) atomicReference.get()).getCloseAppendClient(), false));
                                StorageApiWritesShardedRecords.APPEND_CLIENTS.invalidate(kv.getKey());
                                StorageApiWritesShardedRecords.APPEND_CLIENTS.put((ShardedKey) kv.getKey(), (AppendClientInfo) atomicReference.get());
                                StorageApiWritesShardedRecords.LOG.debug("Fetched updated schema for table {}:\n\t{}", tableUrn, updatedSchema);
                                valueState3.write(updatedSchema2.get());
                            }
                        }
                    }
                    this.appendLatencyDistribution.update(java.time.Duration.between(now, java.time.Instant.now()).toMillis());
                } catch (Throwable th) {
                    for (AppendRowsContext appendRowsContext5 : newArrayList) {
                        if (appendRowsContext5.client != null) {
                            ExecutorService executorService2 = StorageApiWritesShardedRecords.closeWriterExecutor;
                            BigQueryServices.StreamAppendClient streamAppendClient3 = appendRowsContext5.client;
                            Objects.requireNonNull(streamAppendClient3);
                            StorageApiWritesShardedRecords.runAsyncIgnoreFailure(executorService2, streamAppendClient3::unpin);
                        }
                    }
                    throw th;
                }
            }
            timer.offset(this.streamIdleTime).withNoOutputTimestamp().setRelative();
        }

        private void finalizeStream(@DoFn.StateId("streamName") @DoFn.AlwaysFetched ValueState<String> valueState, @DoFn.StateId("streamOffset") @DoFn.AlwaysFetched ValueState<Long> valueState2, ShardedKey<DestinationT> shardedKey, DoFn.MultiOutputReceiver multiOutputReceiver, Instant instant) {
            String str = (String) MoreObjects.firstNonNull((String) valueState.read(), ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME);
            if (Strings.isNullOrEmpty(str)) {
                return;
            }
            multiOutputReceiver.get(StorageApiWritesShardedRecords.this.flushTag).outputWithTimestamp(KV.of(str, new StorageApiFlushAndFinalizeDoFn.Operation(((Long) MoreObjects.firstNonNull((Long) valueState2.read(), 0L)).longValue() - 1, true)), instant);
            valueState.clear();
            valueState2.clear();
            StorageApiWritesShardedRecords.APPEND_CLIENTS.invalidate(shardedKey);
        }

        @DoFn.OnTimer("idleTimer")
        public void onTimer(@DoFn.Key ShardedKey<DestinationT> shardedKey, @DoFn.StateId("streamName") @DoFn.AlwaysFetched ValueState<String> valueState, @DoFn.StateId("streamOffset") @DoFn.AlwaysFetched ValueState<Long> valueState2, DoFn.MultiOutputReceiver multiOutputReceiver, BoundedWindow boundedWindow) {
            finalizeStream(valueState, valueState2, shardedKey, multiOutputReceiver, boundedWindow.maxTimestamp());
            this.streamsIdle.inc();
        }

        @DoFn.OnWindowExpiration
        public void onWindowExpiration(@DoFn.Key ShardedKey<DestinationT> shardedKey, @DoFn.StateId("streamName") @DoFn.AlwaysFetched ValueState<String> valueState, @DoFn.StateId("streamOffset") @DoFn.AlwaysFetched ValueState<Long> valueState2, DoFn.MultiOutputReceiver multiOutputReceiver, BoundedWindow boundedWindow) {
            finalizeStream(valueState, valueState2, shardedKey, multiOutputReceiver, boundedWindow.maxTimestamp());
        }

        public Duration getAllowedTimestampSkew() {
            return Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void clearCache() {
        APPEND_CLIENTS.invalidateAll();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void runAsyncIgnoreFailure(ExecutorService executorService, ThrowingRunnable throwingRunnable) {
        executorService.submit(() -> {
            try {
                throwingRunnable.run();
            } catch (Exception e) {
                System.err.println("Exception happened while executing async task. Ignoring: " + (e.toString() + "\n" + ((String) Arrays.stream(e.getStackTrace()).map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining("\n")))));
            }
        });
    }

    public StorageApiWritesShardedRecords(StorageApiDynamicDestinations<ElementT, DestinationT> storageApiDynamicDestinations, BigQueryIO.Write.CreateDisposition createDisposition, String str, BigQueryServices bigQueryServices, Coder<DestinationT> coder, Coder<BigQueryStorageApiInsertError> coder2, Coder<TableRow> coder3, TupleTag<BigQueryStorageApiInsertError> tupleTag, TupleTag<TableRow> tupleTag2, boolean z, boolean z2, AppendRowsRequest.MissingValueInterpretation missingValueInterpretation) {
        this.dynamicDestinations = storageApiDynamicDestinations;
        this.createDisposition = createDisposition;
        this.kmsKey = str;
        this.bqServices = bigQueryServices;
        this.destinationCoder = coder;
        this.failedRowsCoder = coder2;
        this.failedRowsTag = tupleTag;
        this.successfulRowsTag = tupleTag2;
        this.succussfulRowsCoder = coder3;
        this.autoUpdateSchema = z;
        this.ignoreUnknownValues = z2;
        this.defaultMissingValueInterpretation = missingValueInterpretation;
    }

    public PCollectionTuple expand(PCollection<KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>>> pCollection) {
        BigQueryOptions bigQueryOptions = (BigQueryOptions) pCollection.getPipeline().getOptions().as(BigQueryOptions.class);
        long intValue = bigQueryOptions.getStorageApiAppendThresholdBytes().intValue();
        long longValue = bigQueryOptions.getStorageWriteApiMaxRequestSize().longValue();
        String str = pCollection.getName() + "/" + getName();
        TupleTagList of = TupleTagList.of(this.failedRowsTag);
        if (this.successfulRowsTag != null) {
            of = of.and(this.successfulRowsTag);
        }
        PCollectionTuple apply = pCollection.apply("Write Records", ParDo.of(new WriteRecordsDoFn(str, this.streamIdleTime, intValue, longValue)).withSideInputs(this.dynamicDestinations.getSideInputs()).withOutputTags(this.flushTag, of));
        try {
            SchemaRegistry schemaRegistry = pCollection.getPipeline().getSchemaRegistry();
            apply.get(this.flushTag).setCoder(KvCoder.of(StringUtf8Coder.of(), SchemaCoder.of(schemaRegistry.getSchema(StorageApiFlushAndFinalizeDoFn.Operation.class), TypeDescriptor.of(StorageApiFlushAndFinalizeDoFn.Operation.class), schemaRegistry.getToRowFunction(StorageApiFlushAndFinalizeDoFn.Operation.class), schemaRegistry.getFromRowFunction(StorageApiFlushAndFinalizeDoFn.Operation.class)))).apply(Window.configure().triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(1L)))).discardingFiredPanes()).apply("maxFlushPosition", Combine.perKey(Max.naturalOrder(new StorageApiFlushAndFinalizeDoFn.Operation(-1L, false)))).apply("Flush and finalize writes", ParDo.of(new StorageApiFlushAndFinalizeDoFn(this.bqServices)));
            apply.get(this.failedRowsTag).setCoder(this.failedRowsCoder);
            if (this.successfulRowsTag != null) {
                apply.get(this.successfulRowsTag).setCoder(this.succussfulRowsCoder);
            }
            return apply;
        } catch (NoSuchSchemaException e) {
            throw new RuntimeException((Throwable) e);
        }
    }
}
