package com.google.cloud.spring.bigquery.core;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.StorageError;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.StreamUtils;

/* loaded from: input_file:com/google/cloud/spring/bigquery/core/BigQueryTemplate.class */
public class BigQueryTemplate implements BigQueryOperations {
    private final BigQuery bigQuery;
    private final String datasetName;
    private final TaskScheduler taskScheduler;
    private final BigQueryWriteClient bigQueryWriteClient;
    private JobInfo.CreateDisposition createDisposition;
    private static final int DEFAULT_JSON_STREAM_WRITER_BATCH_SIZE = 1000;
    private static final int MIN_JSON_STREAM_WRITER_BATCH_SIZE = 10;
    private final int jsonWriterBatchSize;
    private boolean autoDetectSchema = true;
    private JobInfo.WriteDisposition writeDisposition = JobInfo.WriteDisposition.WRITE_APPEND;
    private Duration jobPollInterval = Duration.ofSeconds(2);
    private final Logger logger = LoggerFactory.getLogger(BigQueryTemplate.class);

    public BigQueryTemplate(BigQuery bigQuery, BigQueryWriteClient bigQueryWriteClient, Map<String, Object> map, TaskScheduler taskScheduler) {
        String str = (String) map.get("DATASET_NAME");
        Assert.notNull(bigQuery, "BigQuery client object must not be null.");
        Assert.notNull(str, "Dataset name must not be null");
        Assert.notNull(taskScheduler, "TaskScheduler must not be null");
        Assert.notNull(bigQueryWriteClient, "BigQueryWriteClient must not be null");
        this.jsonWriterBatchSize = ((Integer) map.getOrDefault("JSON_WRITER_BATCH_SIZE", Integer.valueOf(DEFAULT_JSON_STREAM_WRITER_BATCH_SIZE))).intValue();
        this.bigQuery = bigQuery;
        this.datasetName = str;
        this.taskScheduler = taskScheduler;
        this.bigQueryWriteClient = bigQueryWriteClient;
    }

    public void setAutoDetectSchema(boolean z) {
        this.autoDetectSchema = z;
    }

    public void setWriteDisposition(JobInfo.WriteDisposition writeDisposition) {
        Assert.notNull(writeDisposition, "BigQuery write disposition must not be null.");
        this.writeDisposition = writeDisposition;
    }

    public void setCreateDisposition(JobInfo.CreateDisposition createDisposition) {
        Assert.notNull(createDisposition, "BigQuery create disposition must not be null.");
        this.createDisposition = createDisposition;
    }

    public void setJobPollInterval(Duration duration) {
        Assert.notNull(duration, "BigQuery job polling interval must not be null");
        this.jobPollInterval = duration;
    }

    @Override // com.google.cloud.spring.bigquery.core.BigQueryOperations
    public CompletableFuture<Job> writeDataToTable(String str, InputStream inputStream, FormatOptions formatOptions) {
        return writeDataToTable(str, inputStream, formatOptions, null);
    }

    @Override // com.google.cloud.spring.bigquery.core.BigQueryOperations
    public CompletableFuture<Job> writeDataToTable(String str, InputStream inputStream, FormatOptions formatOptions, Schema schema) {
        WriteChannelConfiguration.Builder autodetect = WriteChannelConfiguration.newBuilder(TableId.of(this.datasetName, str)).setFormatOptions(formatOptions).setWriteDisposition(this.writeDisposition).setCreateDisposition(this.createDisposition).setAutodetect(Boolean.valueOf(this.autoDetectSchema));
        if (schema != null) {
            autodetect.setSchema(schema);
        }
        TableDataWriteChannel writer = this.bigQuery.writer(autodetect.build());
        try {
            OutputStream newOutputStream = Channels.newOutputStream((WritableByteChannel) writer);
            try {
                StreamUtils.copy(inputStream, newOutputStream);
                if (newOutputStream != null) {
                    newOutputStream.close();
                }
                if (writer.getJob() == null) {
                    throw new BigQueryException("Failed to initialize the BigQuery write job.");
                }
                return createJobFuture(writer.getJob());
            } finally {
            }
        } catch (IOException e) {
            throw new BigQueryException("Failed to write data to BigQuery tables.", e);
        }
    }

    @Override // com.google.cloud.spring.bigquery.core.BigQueryOperations
    public CompletableFuture<WriteApiResponse> writeJsonStream(String str, InputStream inputStream, Schema schema) {
        createTable(str, schema);
        return writeJsonStream(str, inputStream);
    }

    @VisibleForTesting
    public Table createTable(String str, Schema schema) {
        TableId of = TableId.of(this.datasetName, str);
        Table table = this.bigQuery.getTable(TableId.of(this.datasetName, str), new BigQuery.TableOption[0]);
        if (table != null && table.exists()) {
            return null;
        }
        return this.bigQuery.create(TableInfo.newBuilder(of, StandardTableDefinition.of(schema)).build(), new BigQuery.TableOption[0]);
    }

    @Override // com.google.cloud.spring.bigquery.core.BigQueryOperations
    public CompletableFuture<WriteApiResponse> writeJsonStream(String str, InputStream inputStream) {
        CompletableFuture<WriteApiResponse> completableFuture = new CompletableFuture<>();
        Thread thread = new Thread(() -> {
            try {
                completableFuture.complete(getWriteApiResponse(str, inputStream));
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
                Thread.currentThread().interrupt();
            } catch (Descriptors.DescriptorValidationException | IOException e2) {
                completableFuture.completeExceptionally(e2);
                this.logger.warn(String.format("Error: %s %n", e2.getMessage()), e2);
            }
        });
        thread.start();
        completableFuture.whenComplete((writeApiResponse, th) -> {
            if (th == null) {
                this.logger.info("Data successfully written");
            } else {
                thread.interrupt();
                this.logger.info("asyncTask interrupted");
            }
        });
        return completableFuture;
    }

    @VisibleForTesting
    public BigQueryJsonDataWriter getBigQueryJsonDataWriter(TableName tableName) throws Descriptors.DescriptorValidationException, IOException, InterruptedException {
        return new BigQueryJsonDataWriter(tableName, this.bigQueryWriteClient);
    }

    public WriteApiResponse getWriteApiResponse(String str, InputStream inputStream) throws Descriptors.DescriptorValidationException, IOException, InterruptedException {
        WriteApiResponse writeApiResponse = new WriteApiResponse();
        TableName of = TableName.of(this.bigQuery.getOptions().getProjectId(), this.datasetName, str);
        BigQueryJsonDataWriter bigQueryJsonDataWriter = getBigQueryJsonDataWriter(of);
        try {
            long j = 0;
            int i = 0;
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
            JSONArray jSONArray = new JSONArray();
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    break;
                }
                jSONArray.put(new JSONObject(readLine));
                i++;
                if (i == getBatchSize()) {
                    bigQueryJsonDataWriter.append(jSONArray, j);
                    j += jSONArray.length();
                    jSONArray = new JSONArray();
                    i = 0;
                }
            }
            if (jSONArray.length() != 0) {
                bigQueryJsonDataWriter.append(jSONArray, j);
            }
            bigQueryJsonDataWriter.finalizeWriteStream();
            BatchCommitWriteStreamsResponse commitResponse = getCommitResponse(of, bigQueryJsonDataWriter);
            if (!commitResponse.hasCommitTime()) {
                Iterator it = commitResponse.getStreamErrorsList().iterator();
                while (it.hasNext()) {
                    writeApiResponse.addError((StorageError) it.next());
                }
            }
            if (writeApiResponse.getErrors().isEmpty()) {
                writeApiResponse.setSuccessful(true);
            }
            return writeApiResponse;
        } catch (Exception e) {
            throw new BigQueryException("Failed to append records. \n" + e);
        }
    }

    @VisibleForTesting
    public BatchCommitWriteStreamsResponse getCommitResponse(TableName tableName, BigQueryJsonDataWriter bigQueryJsonDataWriter) {
        return this.bigQueryWriteClient.batchCommitWriteStreams(BatchCommitWriteStreamsRequest.newBuilder().setParent(tableName.toString()).addWriteStreams(bigQueryJsonDataWriter.getStreamName()).build());
    }

    private int getBatchSize() {
        return this.jsonWriterBatchSize > MIN_JSON_STREAM_WRITER_BATCH_SIZE ? this.jsonWriterBatchSize : DEFAULT_JSON_STREAM_WRITER_BATCH_SIZE;
    }

    public String getDatasetName() {
        return this.datasetName;
    }

    public int getJsonWriterBatchSize() {
        return this.jsonWriterBatchSize;
    }

    private CompletableFuture<Job> createJobFuture(Job job) {
        CompletableFuture<Job> completableFuture = new CompletableFuture<>();
        ScheduledFuture scheduleAtFixedRate = this.taskScheduler.scheduleAtFixedRate(() -> {
            try {
                Job reload = job.reload(new BigQuery.JobOption[0]);
                if (JobStatus.State.DONE.equals(reload.getStatus().getState())) {
                    if (reload.getStatus().getError() != null) {
                        completableFuture.completeExceptionally(new BigQueryException(reload.getStatus().getError().getMessage()));
                    } else {
                        completableFuture.complete(reload);
                    }
                }
            } catch (Exception e) {
                completableFuture.completeExceptionally(new BigQueryException(e.getMessage()));
            }
        }, this.jobPollInterval);
        completableFuture.whenComplete((job2, th) -> {
            if (th == null) {
                scheduleAtFixedRate.cancel(true);
            } else {
                job.cancel();
                scheduleAtFixedRate.cancel(true);
            }
        });
        return completableFuture;
    }
}
