package com.google.cloud.flink.bigquery.sink.writer;

import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.examples.shaded.com.google.api.core.ApiFuture;
import com.google.cloud.flink.bigquery.examples.shaded.com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.flink.bigquery.examples.shaded.com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.flink.bigquery.examples.shaded.com.google.protobuf.ByteString;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQuerySerializationException;
import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer;
import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider;
import com.google.cloud.flink.bigquery.sink.writer.BaseWriter;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;

/* loaded from: input_file:com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriter.class */
public class BigQueryDefaultWriter<IN> extends BaseWriter<IN> {
    Counter numberOfRecordsWrittenToBigQuerySinceCheckpoint;

    public BigQueryDefaultWriter(String str, BigQueryConnectOptions bigQueryConnectOptions, BigQuerySchemaProvider bigQuerySchemaProvider, BigQueryProtoSerializer bigQueryProtoSerializer, Sink.InitContext initContext) {
        super(initContext.getSubtaskId(), str, bigQueryConnectOptions, bigQuerySchemaProvider, bigQueryProtoSerializer);
        this.streamName = String.format("%s/streams/_default", str);
        this.totalRecordsSeen = 0L;
        this.totalRecordsWritten = 0L;
        initializeAtleastOnceFlinkMetrics(initContext);
    }

    public void write(IN in, SinkWriter.Context context) {
        this.totalRecordsSeen++;
        this.numberOfRecordsSeenByWriter.inc();
        this.numberOfRecordsSeenByWriterSinceCheckpoint.inc();
        try {
            ByteString protoRow = getProtoRow(in);
            if (!fitsInAppendRequest(protoRow)) {
                validateAppendResponses(false);
                append();
            }
            addToAppendRequest(protoRow);
        } catch (BigQuerySerializationException e) {
            this.logger.error(String.format("Unable to serialize record %s. Dropping it!", in), e);
        }
    }

    @Override // com.google.cloud.flink.bigquery.sink.writer.BaseWriter
    public void flush(boolean z) {
        super.flush(z);
        this.numberOfRecordsSeenByWriterSinceCheckpoint.dec(this.numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
        this.numberOfRecordsWrittenToBigQuerySinceCheckpoint.dec(this.numberOfRecordsWrittenToBigQuerySinceCheckpoint.getCount());
    }

    @Override // com.google.cloud.flink.bigquery.sink.writer.BaseWriter
    void sendAppendRequest(ProtoRows protoRows) {
        if (this.streamWriter == null) {
            createStreamWriter(true);
        }
        this.appendResponseFuturesQueue.add(new BaseWriter.AppendInfo(this.streamWriter.append(protoRows), -1L, Long.valueOf(protoRows.getSerializedRowsCount()).longValue()));
    }

    @Override // com.google.cloud.flink.bigquery.sink.writer.BaseWriter
    void validateAppendResponse(BaseWriter.AppendInfo appendInfo) {
        ApiFuture<AppendRowsResponse> future = appendInfo.getFuture();
        long recordsAppended = appendInfo.getRecordsAppended();
        try {
            AppendRowsResponse appendRowsResponse = future.get();
            if (appendRowsResponse.hasError()) {
                logAndThrowFatalException(appendRowsResponse.getError().getMessage());
            }
            this.totalRecordsWritten += recordsAppended;
            this.numberOfRecordsWrittenToBigQuery.inc(recordsAppended);
            this.numberOfRecordsWrittenToBigQuerySinceCheckpoint.inc(recordsAppended);
        } catch (InterruptedException | ExecutionException e) {
            logAndThrowFatalException(e);
        }
    }

    private void initializeAtleastOnceFlinkMetrics(Sink.InitContext initContext) {
        SinkWriterMetricGroup metricGroup = initContext.metricGroup();
        initializeMetrics(metricGroup);
        this.numberOfRecordsWrittenToBigQuerySinceCheckpoint = metricGroup.counter("numberOfRecordsWrittenToBigQuerySinceCheckpoint");
    }

    @Override // com.google.cloud.flink.bigquery.sink.writer.BaseWriter
    public /* bridge */ /* synthetic */ void close() {
        super.close();
    }
}
