package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.ByteString;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Status;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.class */
public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> extends PTransform<PCollection<KV<DestinationT, StorageApiWritePayload>>, PCollectionTuple> {
    private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
    private final BigQueryServices bqServices;
    private final TupleTag<BigQueryStorageApiInsertError> failedRowsTag;
    private final TupleTag<TableRow> successfulRowsTag;
    private final TupleTag<KV<String, String>> finalizeTag = new TupleTag<>("finalizeTag");
    private final Coder<BigQueryStorageApiInsertError> failedRowsCoder;
    private final Coder<TableRow> successfulRowsCoder;
    private final boolean autoUpdateSchema;
    private final boolean ignoreUnknownValues;
    private static final Logger LOG = LoggerFactory.getLogger(StorageApiWriteUnshardedRecords.class);
    private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();
    private static final Cache<String, AppendClientInfo> APPEND_CLIENTS = CacheBuilder.newBuilder().expireAfterAccess(15, TimeUnit.MINUTES).removalListener(removalNotification -> {
        LOG.info("Expiring append client for " + ((String) removalNotification.getKey()));
        AppendClientInfo appendClientInfo = (AppendClientInfo) removalNotification.getValue();
        if (appendClientInfo != null) {
            appendClientInfo.close();
        }
    }).build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords$ThrowingRunnable.class */
    public interface ThrowingRunnable {
        void run() throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords$WriteRecordsDoFn.class */
    public static class WriteRecordsDoFn<DestinationT, ElementT> extends DoFn<KV<DestinationT, StorageApiWritePayload>, KV<String, String>> {
        private final TupleTag<KV<String, String>> finalizeTag;
        private final TupleTag<BigQueryStorageApiInsertError> failedRowsTag;
        private final TupleTag<TableRow> successfulRowsTag;
        private final boolean autoUpdateSchema;
        private final boolean ignoreUnknownValues;
        private final TwoLevelMessageConverterCache<DestinationT, ElementT> messageConverters;
        private transient BigQueryServices.DatasetService maybeDatasetService;
        private final int flushThresholdBytes;
        private final int flushThresholdCount;
        private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
        private final BigQueryServices bqServices;
        private final boolean useDefaultStream;
        private int streamAppendClientCount;
        private final Counter forcedFlushes = Metrics.counter(WriteRecordsDoFn.class, "forcedFlushes");
        private Map<DestinationT, WriteRecordsDoFn<DestinationT, ElementT>.DestinationState> destinations = Maps.newHashMap();
        private int numPendingRecords = 0;
        private int numPendingRecordBytes = 0;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords$WriteRecordsDoFn$AppendRowsContext.class */
        public static class AppendRowsContext extends RetryManager.Operation.Context<AppendRowsResponse> {
            long offset;
            ProtoRows protoRows;
            List<Instant> timestamps;
            int failureCount = 0;

            public AppendRowsContext(long j, ProtoRows protoRows, List<Instant> list) {
                this.offset = j;
                this.protoRows = protoRows;
                this.timestamps = list;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DestinationState.class */
        public class DestinationState {
            private final String tableUrn;
            private transient BigQueryServices.DatasetService maybeDatasetService;
            private final boolean useDefaultStream;
            private TableSchema initialTableSchema;
            private final int clientNumber;
            private final boolean usingMultiplexing;
            private final long maxRequestSize;
            private String streamName = ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME;
            private AppendClientInfo appendClientInfo = null;
            private long currentOffset = 0;
            private final Counter recordsAppended = Metrics.counter(WriteRecordsDoFn.class, "recordsAppended");
            private final Counter appendFailures = Metrics.counter(WriteRecordsDoFn.class, "appendFailures");
            private final Distribution inflightWaitSecondsDistribution = Metrics.distribution(WriteRecordsDoFn.class, "streamWriterWaitSeconds");
            private final Counter rowsSentToFailedRowsCollection = Metrics.counter(StorageApiWritesShardedRecords.WriteRecordsDoFn.class, "rowsSentToFailedRowsCollection");
            private java.time.Instant nextCacheTickle = java.time.Instant.MAX;
            private List<ByteString> pendingMessages = Lists.newArrayList();
            private List<Instant> pendingTimestamps = Lists.newArrayList();

            public DestinationState(String str, StorageApiDynamicDestinations.MessageConverter<ElementT> messageConverter, BigQueryServices.DatasetService datasetService, boolean z, int i, boolean z2, long j) throws Exception {
                this.tableUrn = str;
                this.maybeDatasetService = datasetService;
                this.useDefaultStream = z;
                this.initialTableSchema = messageConverter.getTableSchema();
                this.clientNumber = new Random().nextInt(i);
                this.usingMultiplexing = z2;
                this.maxRequestSize = j;
            }

            void teardown() {
                maybeTickleCache();
                if (this.appendClientInfo != null) {
                    BigQueryServices.StreamAppendClient streamAppendClient = this.appendClientInfo.getStreamAppendClient();
                    if (streamAppendClient != null) {
                        ExecutorService executorService = StorageApiWriteUnshardedRecords.closeWriterExecutor;
                        Objects.requireNonNull(streamAppendClient);
                        StorageApiWriteUnshardedRecords.runAsyncIgnoreFailure(executorService, streamAppendClient::unpin);
                    }
                    this.appendClientInfo = null;
                }
            }

            String getDefaultStreamName() {
                return BigQueryHelpers.stripPartitionDecorator(this.tableUrn) + "/streams/_default";
            }

            String getStreamAppendClientCacheEntryKey() {
                return this.useDefaultStream ? getDefaultStreamName() + "-client" + this.clientNumber : this.streamName;
            }

            String getOrCreateStreamName() {
                if (Strings.isNullOrEmpty(this.streamName)) {
                    try {
                        if (this.useDefaultStream) {
                            this.streamName = getDefaultStreamName();
                        } else {
                            this.streamName = ((BigQueryServices.DatasetService) Preconditions.checkStateNotNull(this.maybeDatasetService)).createWriteStream(this.tableUrn, WriteStream.Type.PENDING).getName();
                            this.currentOffset = 0L;
                        }
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                return this.streamName;
            }

            AppendClientInfo generateClient(TableSchema tableSchema) throws Exception {
                AppendClientInfo withAppendClient = AppendClientInfo.of(tableSchema != null ? tableSchema : getCurrentTableSchema(this.streamName), streamAppendClient -> {
                    StorageApiWriteUnshardedRecords.runAsyncIgnoreFailure(StorageApiWriteUnshardedRecords.closeWriterExecutor, () -> {
                        synchronized (StorageApiWriteUnshardedRecords.APPEND_CLIENTS) {
                            streamAppendClient.unpin();
                            streamAppendClient.close();
                        }
                    });
                }).withAppendClient((BigQueryServices.DatasetService) Preconditions.checkStateNotNull(this.maybeDatasetService), () -> {
                    return this.streamName;
                }, this.usingMultiplexing);
                ((BigQueryServices.StreamAppendClient) Preconditions.checkStateNotNull(withAppendClient.getStreamAppendClient())).pin();
                return withAppendClient;
            }

            TableSchema getCurrentTableSchema(String str) {
                WriteStream writeStream;
                TableSchema tableSchema = this.initialTableSchema;
                if (WriteRecordsDoFn.this.autoUpdateSchema && (writeStream = ((BigQueryServices.DatasetService) Preconditions.checkStateNotNull(this.maybeDatasetService)).getWriteStream(this.streamName)) != null && writeStream.hasTableSchema()) {
                    tableSchema = writeStream.getTableSchema();
                }
                return tableSchema;
            }

            AppendClientInfo getAppendClientInfo(boolean z, TableSchema tableSchema) {
                AppendClientInfo generateClient;
                try {
                    if (this.appendClientInfo == null) {
                        getOrCreateStreamName();
                        synchronized (StorageApiWriteUnshardedRecords.APPEND_CLIENTS) {
                            if (z) {
                                generateClient = (AppendClientInfo) StorageApiWriteUnshardedRecords.APPEND_CLIENTS.get(getStreamAppendClientCacheEntryKey(), () -> {
                                    return generateClient(tableSchema);
                                });
                            } else {
                                generateClient = generateClient(tableSchema);
                                StorageApiWriteUnshardedRecords.APPEND_CLIENTS.put(getStreamAppendClientCacheEntryKey(), generateClient);
                            }
                            ((BigQueryServices.StreamAppendClient) Preconditions.checkStateNotNull(generateClient.getStreamAppendClient())).pin();
                        }
                        this.nextCacheTickle = java.time.Instant.now().plus((TemporalAmount) Duration.ofMinutes(1L));
                        this.appendClientInfo = generateClient;
                    }
                    return (AppendClientInfo) Preconditions.checkStateNotNull(this.appendClientInfo);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }

            void maybeTickleCache() {
                if (this.appendClientInfo == null || !java.time.Instant.now().isAfter(this.nextCacheTickle)) {
                    return;
                }
                synchronized (StorageApiWriteUnshardedRecords.APPEND_CLIENTS) {
                    StorageApiWriteUnshardedRecords.APPEND_CLIENTS.getIfPresent(getStreamAppendClientCacheEntryKey());
                }
                this.nextCacheTickle = java.time.Instant.now().plus((TemporalAmount) Duration.ofMinutes(1L));
            }

            void invalidateWriteStream() {
                if (this.appendClientInfo != null) {
                    synchronized (StorageApiWriteUnshardedRecords.APPEND_CLIENTS) {
                        BigQueryServices.StreamAppendClient streamAppendClient = this.appendClientInfo.getStreamAppendClient();
                        if (streamAppendClient != null) {
                            ExecutorService executorService = StorageApiWriteUnshardedRecords.closeWriterExecutor;
                            Objects.requireNonNull(streamAppendClient);
                            StorageApiWriteUnshardedRecords.runAsyncIgnoreFailure(executorService, streamAppendClient::unpin);
                        }
                        String streamAppendClientCacheEntryKey = getStreamAppendClientCacheEntryKey();
                        AppendClientInfo appendClientInfo = (AppendClientInfo) StorageApiWriteUnshardedRecords.APPEND_CLIENTS.getIfPresent(streamAppendClientCacheEntryKey);
                        if (appendClientInfo != null && System.identityHashCode(appendClientInfo) == System.identityHashCode(this.appendClientInfo)) {
                            StorageApiWriteUnshardedRecords.APPEND_CLIENTS.invalidate(streamAppendClientCacheEntryKey);
                        }
                    }
                    this.appendClientInfo = null;
                }
            }

            void addMessage(StorageApiWritePayload storageApiWritePayload, Instant instant, DoFn.OutputReceiver<BigQueryStorageApiInsertError> outputReceiver) throws Exception {
                maybeTickleCache();
                ByteString copyFrom = ByteString.copyFrom(storageApiWritePayload.getPayload());
                if (WriteRecordsDoFn.this.autoUpdateSchema) {
                    if (this.appendClientInfo == null) {
                        this.appendClientInfo = getAppendClientInfo(true, null);
                    }
                    TableRow unknownFields = storageApiWritePayload.getUnknownFields();
                    if (unknownFields != null) {
                        try {
                            copyFrom = copyFrom.concat(((AppendClientInfo) Preconditions.checkStateNotNull(this.appendClientInfo)).encodeUnknownFields(unknownFields, WriteRecordsDoFn.this.ignoreUnknownValues));
                        } catch (TableRowToStorageApiProto.SchemaConversionException e) {
                            TableRow tableRow = this.appendClientInfo.toTableRow(copyFrom);
                            Instant timestamp = storageApiWritePayload.getTimestamp();
                            outputReceiver.outputWithTimestamp(new BigQueryStorageApiInsertError(tableRow, e.toString()), timestamp != null ? timestamp : instant);
                            return;
                        }
                    }
                }
                this.pendingMessages.add(copyFrom);
                Instant timestamp2 = storageApiWritePayload.getTimestamp();
                this.pendingTimestamps.add(timestamp2 != null ? timestamp2 : instant);
            }

            long flush(RetryManager<AppendRowsResponse, AppendRowsContext> retryManager, DoFn.OutputReceiver<BigQueryStorageApiInsertError> outputReceiver, DoFn.OutputReceiver<TableRow> outputReceiver2) throws Exception {
                if (this.pendingMessages.isEmpty()) {
                    return 0L;
                }
                ProtoRows.Builder newBuilder = ProtoRows.newBuilder();
                newBuilder.addAllSerializedRows(this.pendingMessages);
                this.pendingMessages.clear();
                ProtoRows build = newBuilder.build();
                List<Instant> list = this.pendingTimestamps;
                this.pendingTimestamps = Lists.newArrayList();
                if (build.getSerializedSize() >= this.maxRequestSize) {
                    if (build.getSerializedRowsCount() > 1) {
                        StorageApiWriteUnshardedRecords.LOG.error("A request containing more than one row is over the request size limit of " + this.maxRequestSize + ". This is unexpected. All rows in the request will be sent to the failed-rows PCollection.");
                    }
                    for (int i = 0; i < build.getSerializedRowsCount(); i++) {
                        outputReceiver.outputWithTimestamp(new BigQueryStorageApiInsertError(TableRowToStorageApiProto.tableRowFromMessage(DynamicMessage.parseFrom(getAppendClientInfo(true, null).getDescriptor(), build.getSerializedRows(i))), "Row payload too large. Maximum size " + this.maxRequestSize), list.get(i));
                    }
                    return 0L;
                }
                long j = -1;
                if (!this.useDefaultStream) {
                    getOrCreateStreamName();
                    j = this.currentOffset;
                    this.currentOffset += build.getSerializedRowsCount();
                }
                retryManager.addOperation(appendRowsContext -> {
                    if (appendRowsContext.protoRows.getSerializedRowsCount() == 0) {
                        return ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
                    }
                    try {
                        BigQueryServices.StreamAppendClient streamAppendClient = (BigQueryServices.StreamAppendClient) Preconditions.checkStateNotNull(getAppendClientInfo(true, null).getStreamAppendClient());
                        ApiFuture<AppendRowsResponse> appendRows = streamAppendClient.appendRows(appendRowsContext.offset, appendRowsContext.protoRows);
                        this.inflightWaitSecondsDistribution.update(streamAppendClient.getInflightWaitSeconds());
                        if (!this.usingMultiplexing && streamAppendClient.getInflightWaitSeconds() > 5) {
                            StorageApiWriteUnshardedRecords.LOG.warn("Storage Api write delay more than {} seconds.", Long.valueOf(streamAppendClient.getInflightWaitSeconds()));
                        }
                        return appendRows;
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }, iterable -> {
                    AppendRowsContext appendRowsContext2 = (AppendRowsContext) Preconditions.checkStateNotNull((AppendRowsContext) Iterables.getFirst(iterable, (Object) null));
                    if (appendRowsContext2.getError() == null || !(appendRowsContext2.getError() instanceof Exceptions.AppendSerializtionError)) {
                        StorageApiWriteUnshardedRecords.LOG.warn("Append to stream {} by client #{} failed with error, operations will be retried.\n{}", new Object[]{this.streamName, Integer.valueOf(this.clientNumber), retrieveErrorDetails(iterable)});
                        appendRowsContext2.failureCount++;
                        if (appendRowsContext2.failureCount > 5) {
                            throw new RuntimeException("More than 5 attempts to call AppendRows failed.");
                        }
                        Status.Code code = Status.fromThrowable((Throwable) Preconditions.checkStateNotNull(appendRowsContext2.getError())).getCode();
                        if (code.equals(Status.Code.OUT_OF_RANGE) || code.equals(Status.Code.ALREADY_EXISTS)) {
                            throw new RuntimeException("Append to stream " + this.streamName + " failed with invalid offset of " + appendRowsContext2.offset);
                        }
                        if ((appendRowsContext2.getError() instanceof Exceptions.StreamFinalizedException) || code.equals(Status.Code.INVALID_ARGUMENT) || code.equals(Status.Code.NOT_FOUND) || code.equals(Status.Code.FAILED_PRECONDITION)) {
                            throw new RuntimeException("Append to stream " + this.streamName + " failed with stream doesn't exist");
                        }
                        invalidateWriteStream();
                        this.appendFailures.inc();
                        return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
                    }
                    Exceptions.AppendSerializtionError appendSerializtionError = (Exceptions.AppendSerializtionError) Preconditions.checkStateNotNull(appendRowsContext2.getError());
                    Set keySet = appendSerializtionError.getRowIndexToErrorMessage().keySet();
                    Iterator it = keySet.iterator();
                    while (it.hasNext()) {
                        int intValue = ((Integer) it.next()).intValue();
                        try {
                            outputReceiver.outputWithTimestamp(new BigQueryStorageApiInsertError(TableRowToStorageApiProto.tableRowFromMessage(DynamicMessage.parseFrom(((AppendClientInfo) Preconditions.checkStateNotNull(this.appendClientInfo)).getDescriptor(), appendRowsContext2.protoRows.getSerializedRows(intValue))), (String) appendSerializtionError.getRowIndexToErrorMessage().get(Integer.valueOf(intValue))), appendRowsContext2.timestamps.get(intValue));
                        } catch (InvalidProtocolBufferException e) {
                            StorageApiWriteUnshardedRecords.LOG.error("Failed to insert row and could not parse the result!");
                        }
                    }
                    this.rowsSentToFailedRowsCollection.inc(keySet.size());
                    ProtoRows.Builder newBuilder2 = ProtoRows.newBuilder();
                    ArrayList newArrayList = Lists.newArrayList();
                    for (int i2 = 0; i2 < appendRowsContext2.protoRows.getSerializedRowsCount(); i2++) {
                        if (!keySet.contains(Integer.valueOf(i2))) {
                            newBuilder2.addSerializedRows(appendRowsContext2.protoRows.getSerializedRows(i2));
                            newArrayList.add(appendRowsContext2.timestamps.get(i2));
                        }
                    }
                    appendRowsContext2.protoRows = newBuilder2.build();
                    appendRowsContext2.timestamps = newArrayList;
                    long j2 = appendRowsContext2.offset;
                    Iterator it2 = iterable.iterator();
                    while (it2.hasNext()) {
                        ((AppendRowsContext) it2.next()).offset = j2;
                        j2 += r0.protoRows.getSerializedRowsCount();
                    }
                    this.currentOffset = j2;
                    return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
                }, appendRowsContext2 -> {
                    this.recordsAppended.inc(appendRowsContext2.protoRows.getSerializedRowsCount());
                    if (outputReceiver2 != null) {
                        for (int i2 = 0; i2 < appendRowsContext2.protoRows.getSerializedRowsCount(); i2++) {
                            try {
                                outputReceiver2.outputWithTimestamp(TableRowToStorageApiProto.tableRowFromMessage(DynamicMessage.parseFrom(((AppendClientInfo) Preconditions.checkStateNotNull(this.appendClientInfo)).getDescriptor(), (ByteString) appendRowsContext2.protoRows.getSerializedRowsList().get(i2))), appendRowsContext2.timestamps.get(i2));
                            } catch (InvalidProtocolBufferException e) {
                                StorageApiWriteUnshardedRecords.LOG.warn("Failure parsing TableRow: " + e);
                            }
                        }
                    }
                }, new AppendRowsContext(j, build, list));
                maybeTickleCache();
                return build.getSerializedRowsCount();
            }

            String retrieveErrorDetails(Iterable<AppendRowsContext> iterable) {
                return (String) StreamSupport.stream(iterable.spliterator(), false).map((v0) -> {
                    return v0.getError();
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).map(th -> {
                    return ((Throwable) Preconditions.checkStateNotNull(th)).toString() + "\n" + ((String) Arrays.stream(((Throwable) Preconditions.checkStateNotNull(th)).getStackTrace()).map((v0) -> {
                        return v0.toString();
                    }).collect(Collectors.joining("\n")));
                }).collect(Collectors.joining("\n"));
            }

            void postFlush() {
                if (this.appendClientInfo != null) {
                    BigQueryServices.StreamAppendClient streamAppendClient = this.appendClientInfo.getStreamAppendClient();
                    TableSchema updatedSchema = streamAppendClient != null ? streamAppendClient.getUpdatedSchema() : null;
                    if (updatedSchema != null) {
                        invalidateWriteStream();
                        this.appendClientInfo = (AppendClientInfo) Preconditions.checkStateNotNull(getAppendClientInfo(false, updatedSchema));
                    }
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public WriteRecordsDoFn(String str, StorageApiDynamicDestinations<ElementT, DestinationT> storageApiDynamicDestinations, BigQueryServices bigQueryServices, boolean z, int i, int i2, int i3, TupleTag<KV<String, String>> tupleTag, TupleTag<BigQueryStorageApiInsertError> tupleTag2, TupleTag<TableRow> tupleTag3, boolean z2, boolean z3) {
            this.messageConverters = new TwoLevelMessageConverterCache<>(str);
            this.dynamicDestinations = storageApiDynamicDestinations;
            this.bqServices = bigQueryServices;
            this.useDefaultStream = z;
            this.flushThresholdBytes = i;
            this.flushThresholdCount = i2;
            this.streamAppendClientCount = i3;
            this.finalizeTag = tupleTag;
            this.failedRowsTag = tupleTag2;
            this.successfulRowsTag = tupleTag3;
            this.autoUpdateSchema = z2;
            this.ignoreUnknownValues = z3;
        }

        boolean shouldFlush() {
            return this.numPendingRecords > this.flushThresholdCount || this.numPendingRecordBytes > this.flushThresholdBytes;
        }

        void flushIfNecessary(DoFn.OutputReceiver<BigQueryStorageApiInsertError> outputReceiver, DoFn.OutputReceiver<TableRow> outputReceiver2) throws Exception {
            if (shouldFlush()) {
                this.forcedFlushes.inc();
                flushAll(outputReceiver, outputReceiver2);
            }
        }

        void flushAll(DoFn.OutputReceiver<BigQueryStorageApiInsertError> outputReceiver, DoFn.OutputReceiver<TableRow> outputReceiver2) throws Exception {
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(((Map) Preconditions.checkStateNotNull(this.destinations)).size());
            long j = 0;
            for (DestinationState destinationState : ((Map) Preconditions.checkStateNotNull(this.destinations)).values()) {
                RetryManager<AppendRowsResponse, AppendRowsContext> retryManager = new RetryManager<>(org.joda.time.Duration.standardSeconds(1L), org.joda.time.Duration.standardSeconds(10L), 1000);
                newArrayListWithCapacity.add(retryManager);
                j += destinationState.flush(retryManager, outputReceiver, outputReceiver2);
                retryManager.run(false);
            }
            if (j > 0) {
                Iterator it = newArrayListWithCapacity.iterator();
                while (it.hasNext()) {
                    ((RetryManager) it.next()).await();
                }
            }
            Iterator it2 = ((Map) Preconditions.checkStateNotNull(this.destinations)).values().iterator();
            while (it2.hasNext()) {
                ((DestinationState) it2.next()).postFlush();
            }
            this.numPendingRecords = 0;
            this.numPendingRecordBytes = 0;
        }

        private BigQueryServices.DatasetService initializeDatasetService(PipelineOptions pipelineOptions) {
            if (this.maybeDatasetService == null) {
                this.maybeDatasetService = this.bqServices.getDatasetService((BigQueryOptions) pipelineOptions.as(BigQueryOptions.class));
            }
            return this.maybeDatasetService;
        }

        @DoFn.StartBundle
        public void startBundle() throws IOException {
            this.destinations = Maps.newHashMap();
            this.numPendingRecords = 0;
            this.numPendingRecordBytes = 0;
        }

        WriteRecordsDoFn<DestinationT, ElementT>.DestinationState createDestinationState(DoFn<KV<DestinationT, StorageApiWritePayload>, KV<String, String>>.ProcessContext processContext, DestinationT destinationt, BigQueryServices.DatasetService datasetService, BigQueryOptions bigQueryOptions) {
            TableDestination table = this.dynamicDestinations.getTable(destinationt);
            org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(table != null, "DynamicDestinations.getTable() may not return null, but %s returned null for destination %s", this.dynamicDestinations, destinationt);
            try {
                return new DestinationState(table.getTableUrn(bigQueryOptions), this.messageConverters.get(destinationt, this.dynamicDestinations, datasetService), datasetService, this.useDefaultStream, this.streamAppendClientCount, bigQueryOptions.getUseStorageApiConnectionPool().booleanValue(), bigQueryOptions.getStorageWriteApiMaxRequestSize().longValue());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @DoFn.ProcessElement
        public void process(DoFn<KV<DestinationT, StorageApiWritePayload>, KV<String, String>>.ProcessContext processContext, PipelineOptions pipelineOptions, @DoFn.Element KV<DestinationT, StorageApiWritePayload> kv, @DoFn.Timestamp Instant instant, DoFn.MultiOutputReceiver multiOutputReceiver) throws Exception {
            BigQueryServices.DatasetService initializeDatasetService = initializeDatasetService(pipelineOptions);
            this.dynamicDestinations.setSideInputAccessorFromProcessContext(processContext);
            Preconditions.checkStateNotNull(this.destinations);
            DestinationState destinationState = (DestinationState) this.destinations.computeIfAbsent(kv.getKey(), obj -> {
                return createDestinationState(processContext, obj, initializeDatasetService, (BigQueryOptions) pipelineOptions.as(BigQueryOptions.class));
            });
            DoFn.OutputReceiver<BigQueryStorageApiInsertError> outputReceiver = multiOutputReceiver.get(this.failedRowsTag);
            flushIfNecessary(outputReceiver, this.successfulRowsTag != null ? multiOutputReceiver.get(this.successfulRowsTag) : null);
            destinationState.addMessage((StorageApiWritePayload) kv.getValue(), instant, outputReceiver);
            this.numPendingRecords++;
            this.numPendingRecordBytes += ((StorageApiWritePayload) kv.getValue()).getPayload().length;
        }

        @DoFn.FinishBundle
        public void finishBundle(final DoFn<KV<DestinationT, StorageApiWritePayload>, KV<String, String>>.FinishBundleContext finishBundleContext) throws Exception {
            flushAll(new DoFn.OutputReceiver<BigQueryStorageApiInsertError>() { // from class: org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords.WriteRecordsDoFn.1
                public void output(BigQueryStorageApiInsertError bigQueryStorageApiInsertError) {
                    outputWithTimestamp(bigQueryStorageApiInsertError, GlobalWindow.INSTANCE.maxTimestamp());
                }

                public void outputWithTimestamp(BigQueryStorageApiInsertError bigQueryStorageApiInsertError, Instant instant) {
                    finishBundleContext.output(WriteRecordsDoFn.this.failedRowsTag, bigQueryStorageApiInsertError, instant, GlobalWindow.INSTANCE);
                }
            }, this.successfulRowsTag != null ? new DoFn.OutputReceiver<TableRow>() { // from class: org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords.WriteRecordsDoFn.2
                public void output(TableRow tableRow) {
                    outputWithTimestamp(tableRow, GlobalWindow.INSTANCE.maxTimestamp());
                }

                public void outputWithTimestamp(TableRow tableRow, Instant instant) {
                    finishBundleContext.output(WriteRecordsDoFn.this.successfulRowsTag, tableRow, instant, GlobalWindow.INSTANCE);
                }
            } : null);
            Map map = (Map) Preconditions.checkStateNotNull(this.destinations);
            for (DestinationState destinationState : map.values()) {
                if (!this.useDefaultStream && !Strings.isNullOrEmpty(destinationState.streamName)) {
                    finishBundleContext.output(this.finalizeTag, KV.of(destinationState.tableUrn, destinationState.streamName), GlobalWindow.INSTANCE.maxTimestamp(), GlobalWindow.INSTANCE);
                }
                destinationState.teardown();
            }
            map.clear();
            this.destinations = null;
        }

        @DoFn.Teardown
        public void teardown() {
            this.destinations = null;
            try {
                if (this.maybeDatasetService != null) {
                    this.maybeDatasetService.close();
                    this.maybeDatasetService = null;
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void clearCache() {
        APPEND_CLIENTS.invalidateAll();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void runAsyncIgnoreFailure(ExecutorService executorService, ThrowingRunnable throwingRunnable) {
        executorService.submit(() -> {
            try {
                throwingRunnable.run();
            } catch (Exception e) {
                System.err.println("Exception happened while executing async task. Ignoring: " + (e.toString() + "\n" + ((String) Arrays.stream(e.getStackTrace()).map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining("\n")))));
            }
        });
    }

    public StorageApiWriteUnshardedRecords(StorageApiDynamicDestinations<ElementT, DestinationT> storageApiDynamicDestinations, BigQueryServices bigQueryServices, TupleTag<BigQueryStorageApiInsertError> tupleTag, TupleTag<TableRow> tupleTag2, Coder<BigQueryStorageApiInsertError> coder, Coder<TableRow> coder2, boolean z, boolean z2) {
        this.dynamicDestinations = storageApiDynamicDestinations;
        this.bqServices = bigQueryServices;
        this.failedRowsTag = tupleTag;
        this.successfulRowsTag = tupleTag2;
        this.failedRowsCoder = coder;
        this.successfulRowsCoder = coder2;
        this.autoUpdateSchema = z;
        this.ignoreUnknownValues = z2;
    }

    public PCollectionTuple expand(PCollection<KV<DestinationT, StorageApiWritePayload>> pCollection) {
        String str = pCollection.getName() + "/" + getName();
        BigQueryOptions bigQueryOptions = (BigQueryOptions) pCollection.getPipeline().getOptions().as(BigQueryOptions.class);
        org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(!bigQueryOptions.getUseStorageApiConnectionPool().booleanValue(), "useStorageApiConnectionPool only supported when using STORAGE_API_AT_LEAST_ONCE");
        TupleTagList of = TupleTagList.of(this.failedRowsTag);
        if (this.successfulRowsTag != null) {
            of = of.and(this.successfulRowsTag);
        }
        PCollectionTuple apply = pCollection.apply("Write Records", ParDo.of(new WriteRecordsDoFn(str, this.dynamicDestinations, this.bqServices, false, bigQueryOptions.getStorageApiAppendThresholdBytes().intValue(), bigQueryOptions.getStorageApiAppendThresholdRecordCount().intValue(), bigQueryOptions.getNumStorageWriteApiStreamAppendClients().intValue(), this.finalizeTag, this.failedRowsTag, this.successfulRowsTag, this.autoUpdateSchema, this.ignoreUnknownValues)).withOutputTags(this.finalizeTag, of).withSideInputs(this.dynamicDestinations.getSideInputs()));
        apply.get(this.finalizeTag).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply("Reshuffle", Reshuffle.of()).apply("Finalize writes", ParDo.of(new StorageApiFinalizeWritesDoFn(this.bqServices)));
        apply.get(this.failedRowsTag).setCoder(this.failedRowsCoder);
        if (this.successfulRowsTag != null) {
            apply.get(this.successfulRowsTag).setCoder(this.successfulRowsCoder);
        }
        return apply;
    }
}
