package com.google.cloud.bigquery.connector.common;

import com.google.cloud.spark.bigquery.repackaged.com.google.api.client.util.Sleeper;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.core.ApiFuture;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.core.NanoClock;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.RetryAlgorithm;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.stub.readrows.ApiResultRetryAlgorithm;
import com.google.cloud.spark.bigquery.repackaged.com.google.common.base.Optional;
import com.google.cloud.spark.bigquery.repackaged.com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/bigquery/connector/common/BigQueryDirectDataWriterHelper.class */
public class BigQueryDirectDataWriterHelper {
    private final BigQueryWriteClient writeClient;
    private final String tablePath;
    private final ProtoSchema protoSchema;
    private final RetrySettings retrySettings;
    private final Optional<String> traceId;
    private String writeStreamName;
    private StreamWriter streamWriter;
    private ProtoRows.Builder protoRows;
    final Logger logger = LoggerFactory.getLogger(BigQueryDirectDataWriterHelper.class);
    final long MAX_APPEND_ROWS_REQUEST_SIZE = (long) (StreamWriter.getApiMaxRequestBytes() * 0.95d);
    private long appendRequestRowCount = 0;
    private long appendRequestSizeBytes = 0;
    private long writeStreamRowCount = 0;

    public BigQueryDirectDataWriterHelper(BigQueryClientFactory bigQueryClientFactory, String str, ProtoSchema protoSchema, RetrySettings retrySettings, Optional<String> optional) {
        this.writeClient = bigQueryClientFactory.getBigQueryWriteClient();
        this.tablePath = str;
        this.protoSchema = protoSchema;
        this.retrySettings = retrySettings;
        this.traceId = optional;
        try {
            this.writeStreamName = retryCreateWriteStream();
            this.streamWriter = createStreamWriter(this.writeStreamName);
            this.protoRows = ProtoRows.newBuilder();
        } catch (InterruptedException | ExecutionException e) {
            throw new BigQueryConnectorException("Could not create write-stream after multiple retries", e);
        }
    }

    private String retryCreateWriteStream() throws ExecutionException, InterruptedException {
        return (String) retryCallable(() -> {
            return this.writeClient.createWriteStream(CreateWriteStreamRequest.newBuilder().setParent(this.tablePath).setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build()).build()).getName();
        });
    }

    private <V> V retryCallable(Callable<V> callable) throws ExecutionException, InterruptedException {
        DirectRetryingExecutor directRetryingExecutor = new DirectRetryingExecutor(new RetryAlgorithm(new ApiResultRetryAlgorithm(), new ExponentialRetryAlgorithm(this.retrySettings, NanoClock.getDefaultClock())));
        return directRetryingExecutor.submit(directRetryingExecutor.createFuture(callable)).get();
    }

    private StreamWriter createStreamWriter(String str) {
        try {
            StreamWriter.Builder writerSchema = StreamWriter.newBuilder(str, this.writeClient).setWriterSchema(this.protoSchema);
            if (this.traceId.isPresent()) {
                writerSchema.setTraceId(this.traceId.get());
            }
            return writerSchema.build();
        } catch (IOException e) {
            throw new BigQueryConnectorException("Could not build stream-writer", e);
        }
    }

    public void addRow(ByteString byteString) throws IOException {
        int size = byteString.size();
        if (this.appendRequestSizeBytes + size > this.MAX_APPEND_ROWS_REQUEST_SIZE) {
            if (size > this.MAX_APPEND_ROWS_REQUEST_SIZE) {
                throw new IOException(String.format("A single row of size %d bytes exceeded the maximum of %d bytes for an append-rows-request size", Integer.valueOf(size), Long.valueOf(this.MAX_APPEND_ROWS_REQUEST_SIZE)));
            }
            sendAppendRowsRequest();
        }
        this.protoRows.addSerializedRows(byteString);
        this.appendRequestSizeBytes += size;
        this.appendRequestRowCount++;
    }

    private void sendAppendRowsRequest() throws IOException {
        long j = this.writeStreamRowCount;
        validateAppendRowsResponse(this.streamWriter.append(this.protoRows.build(), j), j);
        clearProtoRows();
        this.writeStreamRowCount += this.appendRequestRowCount;
        this.appendRequestRowCount = 0L;
        this.appendRequestSizeBytes = 0L;
    }

    private void validateAppendRowsResponse(ApiFuture<AppendRowsResponse> apiFuture, long j) throws IOException {
        try {
            AppendRowsResponse appendRowsResponse = apiFuture.get();
            if (appendRowsResponse.hasError()) {
                throw new IOException("Append request failed with error: " + appendRowsResponse.getError().getMessage());
            }
            long value = appendRowsResponse.getAppendResult().getOffset().getValue();
            if (j != value) {
                throw new IOException(String.format("On stream %s append-rows response, offset %d did not match expected offset %d", this.writeStreamName, Long.valueOf(value), Long.valueOf(j)));
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new BigQueryConnectorException("Could not retrieve AppendRowsResponse", e);
        }
    }

    public long commit() throws IOException {
        if (this.protoRows.getSerializedRowsCount() != 0) {
            sendAppendRowsRequest();
        }
        waitBeforeFinalization();
        FinalizeWriteStreamResponse retryFinalizeWriteStream = retryFinalizeWriteStream(FinalizeWriteStreamRequest.newBuilder().setName(this.writeStreamName).build());
        long j = this.writeStreamRowCount;
        long rowCount = retryFinalizeWriteStream.getRowCount();
        if (rowCount != j) {
            throw new IOException(String.format("On stream %s finalization, expected finalized row count %d but received %d", this.writeStreamName, Long.valueOf(j), Long.valueOf(rowCount)));
        }
        this.logger.debug("Write-stream {} finalized with row-count {}", this.writeStreamName, Long.valueOf(rowCount));
        clean();
        return rowCount;
    }

    private FinalizeWriteStreamResponse retryFinalizeWriteStream(FinalizeWriteStreamRequest finalizeWriteStreamRequest) {
        try {
            return (FinalizeWriteStreamResponse) retryCallable(() -> {
                return this.writeClient.finalizeWriteStream(finalizeWriteStreamRequest);
            });
        } catch (InterruptedException | ExecutionException e) {
            throw new BigQueryConnectorException(String.format("Could not finalize stream %s.", this.writeStreamName), e);
        }
    }

    private void waitBeforeFinalization() {
        try {
            Sleeper.DEFAULT.sleep(500L);
        } catch (InterruptedException e) {
            throw new BigQueryConnectorException(String.format("Interrupted while sleeping before finalizing write-stream %s", this.writeStreamName), e);
        }
    }

    public void abort() {
        clean();
        this.protoRows = null;
        this.writeStreamName = null;
    }

    private void clean() {
        clearProtoRows();
        if (this.streamWriter != null) {
            this.streamWriter.close();
        }
    }

    private void clearProtoRows() {
        if (this.protoRows != null) {
            this.protoRows.clear();
        }
    }

    public String getWriteStreamName() {
        return this.writeStreamName;
    }
}
