package com.google.cloud.flink.bigquery.sink.writer;

import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.examples.shaded.com.google.api.core.ApiFuture;
import com.google.cloud.flink.bigquery.examples.shaded.com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.flink.bigquery.examples.shaded.com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.flink.bigquery.examples.shaded.com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.flink.bigquery.examples.shaded.com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
import com.google.cloud.flink.bigquery.examples.shaded.com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.flink.bigquery.examples.shaded.com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.flink.bigquery.examples.shaded.com.google.protobuf.ByteString;
import com.google.cloud.flink.bigquery.services.BigQueryServices;
import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQueryConnectorException;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQuerySerializationException;
import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer;
import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/flink/bigquery/sink/writer/BaseWriter.class */
public abstract class BaseWriter<IN> implements SinkWriter<IN> {
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private static final long MAX_APPEND_REQUEST_BYTES = (long) (StreamWriter.getApiMaxRequestBytes() * 0.95d);
    private long appendRequestSizeBytes;
    protected final int subtaskId;
    private final String tablePath;
    private final BigQueryConnectOptions connectOptions;
    private final ProtoSchema protoSchema;
    private final BigQueryProtoSerializer serializer;
    private final ProtoRows.Builder protoRowsBuilder;
    final Queue<AppendInfo> appendResponseFuturesQueue;
    BigQueryServices.StorageWriteClient writeClient;
    StreamWriter streamWriter;
    String streamName;
    long totalRecordsSeen;
    long totalRecordsWritten;
    Counter numberOfRecordsWrittenToBigQuery;
    Counter numberOfRecordsSeenByWriter;
    Counter numberOfRecordsSeenByWriterSinceCheckpoint;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/flink/bigquery/sink/writer/BaseWriter$AppendInfo.class */
    public static class AppendInfo {
        private final ApiFuture<AppendRowsResponse> future;
        private final long expectedOffset;
        private final long recordsAppended;

        /* JADX INFO: Access modifiers changed from: package-private */
        public AppendInfo(ApiFuture<AppendRowsResponse> apiFuture, long j, long j2) {
            this.future = apiFuture;
            this.expectedOffset = j;
            this.recordsAppended = j2;
        }

        public ApiFuture<AppendRowsResponse> getFuture() {
            return this.future;
        }

        public long getExpectedOffset() {
            return this.expectedOffset;
        }

        public long getRecordsAppended() {
            return this.recordsAppended;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BaseWriter(int i, String str, BigQueryConnectOptions bigQueryConnectOptions, BigQuerySchemaProvider bigQuerySchemaProvider, BigQueryProtoSerializer bigQueryProtoSerializer) {
        this.subtaskId = i;
        this.tablePath = str;
        this.connectOptions = bigQueryConnectOptions;
        this.protoSchema = getProtoSchema(bigQuerySchemaProvider);
        this.serializer = bigQueryProtoSerializer;
        this.serializer.init(bigQuerySchemaProvider);
        this.appendRequestSizeBytes = 0L;
        this.appendResponseFuturesQueue = new LinkedList();
        this.protoRowsBuilder = ProtoRows.newBuilder();
    }

    public void flush(boolean z) {
        if (this.appendRequestSizeBytes > 0) {
            append();
        }
        this.logger.info("Validating all pending append responses in subtask {}", Integer.valueOf(this.subtaskId));
        validateAppendResponses(true);
    }

    public void close() {
        this.logger.debug("Closing writer in subtask {}", Integer.valueOf(this.subtaskId));
        if (this.protoRowsBuilder != null) {
            this.protoRowsBuilder.clear();
        }
        if (this.appendResponseFuturesQueue != null) {
            this.appendResponseFuturesQueue.clear();
        }
        if (this.streamWriter != null) {
            this.streamWriter.close();
        }
        if (this.writeClient != null) {
            this.writeClient.close();
        }
    }

    abstract void sendAppendRequest(ProtoRows protoRows);

    abstract void validateAppendResponse(AppendInfo appendInfo);

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addToAppendRequest(ByteString byteString) {
        this.protoRowsBuilder.addSerializedRows(byteString);
        this.appendRequestSizeBytes += getProtoRowBytes(byteString);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void append() {
        sendAppendRequest(this.protoRowsBuilder.build());
        this.protoRowsBuilder.clear();
        this.appendRequestSizeBytes = 0L;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void createStreamWriter(boolean z) {
        try {
            if (this.writeClient == null) {
                this.writeClient = BigQueryServicesFactory.instance(this.connectOptions).storageWrite();
            }
            this.logger.info("Creating BigQuery StreamWriter for write stream {} in subtask {}", this.streamName, Integer.valueOf(this.subtaskId));
            this.streamWriter = this.writeClient.createStreamWriter(this.streamName, this.protoSchema, z);
        } catch (IOException e) {
            this.logger.error(String.format("Unable to create StreamWriter for stream %s in subtask %d", this.streamName, Integer.valueOf(this.subtaskId)), e);
            throw new BigQueryConnectorException("Unable to connect to BigQuery", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void createWriteStream(WriteStream.Type type) {
        try {
            if (this.writeClient == null) {
                this.writeClient = BigQueryServicesFactory.instance(this.connectOptions).storageWrite();
            }
            this.logger.info("Creating BigQuery write stream in subtask {}", Integer.valueOf(this.subtaskId));
            this.streamName = this.writeClient.createWriteStream(this.tablePath, type).getName();
        } catch (IOException e) {
            this.logger.error(String.format("Unable to create write stream in subtask %d", Integer.valueOf(this.subtaskId)), e);
            throw new BigQueryConnectorException("Unable to connect to BigQuery", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean fitsInAppendRequest(ByteString byteString) {
        return this.appendRequestSizeBytes + ((long) getProtoRowBytes(byteString)) <= MAX_APPEND_REQUEST_BYTES;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ByteString getProtoRow(IN in) throws BigQuerySerializationException {
        ByteString serialize = this.serializer.serialize(in);
        if (getProtoRowBytes(serialize) <= MAX_APPEND_REQUEST_BYTES) {
            return serialize;
        }
        this.logger.error("A single row of size %d bytes exceeded the allowed maximum of %d bytes for an append request", Integer.valueOf(getProtoRowBytes(serialize)), Long.valueOf(MAX_APPEND_REQUEST_BYTES));
        throw new BigQuerySerializationException("Record size exceeds BigQuery append request limit");
    }

    private static ProtoSchema getProtoSchema(BigQuerySchemaProvider bigQuerySchemaProvider) {
        return ProtoSchemaConverter.convert(bigQuerySchemaProvider.getDescriptor());
    }

    private int getProtoRowBytes(ByteString byteString) {
        return byteString.size() + 2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void validateAppendResponses(boolean z) {
        while (!this.appendResponseFuturesQueue.isEmpty()) {
            AppendInfo peek = this.appendResponseFuturesQueue.peek();
            if (!z && !peek.getFuture().isDone()) {
                return;
            }
            this.appendResponseFuturesQueue.poll();
            validateAppendResponse(peek);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void logAndThrowFatalException(Throwable th) {
        this.logger.error(String.format("AppendRows request failed in subtask %d", Integer.valueOf(this.subtaskId)), th);
        throw new BigQueryConnectorException("Error while writing to BigQuery", th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void logAndThrowFatalException(String str) {
        this.logger.error(String.format("AppendRows request failed in subtask %d\n%s", Integer.valueOf(this.subtaskId), str));
        throw new BigQueryConnectorException(String.format("Error while writing to BigQuery\n%s", str));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initializeMetrics(SinkWriterMetricGroup sinkWriterMetricGroup) {
        this.numberOfRecordsSeenByWriter = sinkWriterMetricGroup.counter("numberOfRecordsSeenByWriter");
        this.numberOfRecordsWrittenToBigQuery = sinkWriterMetricGroup.counter("numberOfRecordsWrittenToBigQuery");
        this.numberOfRecordsSeenByWriterSinceCheckpoint = sinkWriterMetricGroup.counter("numberOfRecordsSeenByWriterSinceCheckpoint");
    }

    @Internal
    long getAppendRequestSizeBytes() {
        return this.appendRequestSizeBytes;
    }

    @Internal
    Queue<ApiFuture> getAppendResponseFuturesQueue() {
        return new LinkedList(this.appendResponseFuturesQueue);
    }

    @Internal
    ProtoRows getProtoRows() {
        return this.protoRowsBuilder.build();
    }
}
