package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.gax.rpc.ApiException;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.OffsetBasedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.RequiresNonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.class */
public class BigQueryStorageStreamBundleSource<T> extends OffsetBasedSource<T> {
    private final ReadSession readSession;
    private final List<ReadStream> streamBundle;
    private final String jsonTableSchema;
    private final SerializableFunction<SchemaAndRecord, T> parseFn;
    private final Coder<T> outputCoder;
    private final BigQueryServices bqServices;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource$BigQueryStorageStreamBundleReader.class */
    public static class BigQueryStorageStreamBundleReader<T> extends OffsetBasedSource.OffsetBasedReader<T> {
        private static final Logger LOG = LoggerFactory.getLogger(BigQueryStorageStreamBundleReader.class);
        private final BigQueryStorageReader reader;
        private final SerializableFunction<SchemaAndRecord, T> parseFn;
        private final BigQueryServices.StorageClient storageClient;
        private final TableSchema tableSchema;
        private BigQueryStorageStreamBundleSource<T> source;
        private BigQueryServices.BigQueryServerStream<ReadRowsResponse> responseStream;
        private Iterator<ReadRowsResponse> responseIterator;
        private T current;
        private int currentStreamBundleIndex;
        private long currentStreamOffset;
        private double fractionOfStreamBundleConsumed;
        private double progressAtResponseStart;
        private double progressAtResponseEnd;
        private long rowsConsumedFromCurrentResponse;
        private long totalRowsInCurrentResponse;
        private TableReference tableReference;
        private ServiceCallMetric serviceCallMetric;

        private BigQueryStorageStreamBundleReader(BigQueryStorageStreamBundleSource<T> bigQueryStorageStreamBundleSource, BigQueryOptions bigQueryOptions) throws IOException {
            super(bigQueryStorageStreamBundleSource);
            this.responseStream = null;
            this.responseIterator = null;
            this.current = null;
            this.source = bigQueryStorageStreamBundleSource;
            this.reader = BigQueryStorageReaderFactory.getReader(((BigQueryStorageStreamBundleSource) bigQueryStorageStreamBundleSource).readSession);
            this.parseFn = ((BigQueryStorageStreamBundleSource) bigQueryStorageStreamBundleSource).parseFn;
            this.storageClient = ((BigQueryStorageStreamBundleSource) bigQueryStorageStreamBundleSource).bqServices.getStorageClient(bigQueryOptions);
            this.tableSchema = (TableSchema) BigQueryHelpers.fromJsonString(((BigQueryStorageStreamBundleSource) bigQueryStorageStreamBundleSource).jsonTableSchema, TableSchema.class);
            this.currentStreamBundleIndex = 0;
            this.fractionOfStreamBundleConsumed = 0.0d;
            this.progressAtResponseStart = 0.0d;
            this.progressAtResponseEnd = 0.0d;
            this.rowsConsumedFromCurrentResponse = 0L;
            this.totalRowsInCurrentResponse = 0L;
        }

        public T getCurrent() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current;
        }

        protected long getCurrentOffset() throws NoSuchElementException {
            return this.currentStreamBundleIndex;
        }

        protected boolean isAtSplitPoint() throws NoSuchElementException {
            return this.currentStreamOffset == 0;
        }

        public boolean startImpl() throws IOException {
            return readNextStream();
        }

        public boolean advanceImpl() throws IOException {
            Preconditions.checkStateNotNull(this.responseIterator);
            this.currentStreamOffset += this.totalRowsInCurrentResponse;
            return readNextRecord();
        }

        private boolean readNextStream() throws IOException {
            BigQueryStorageStreamBundleSource<T> m51getCurrentSource = m51getCurrentSource();
            if (this.currentStreamBundleIndex == ((BigQueryStorageStreamBundleSource) m51getCurrentSource).streamBundle.size()) {
                this.fractionOfStreamBundleConsumed = 1.0d;
                return false;
            }
            ReadRowsRequest build = ReadRowsRequest.newBuilder().setReadStream(((ReadStream) ((BigQueryStorageStreamBundleSource) m51getCurrentSource).streamBundle.get(this.currentStreamBundleIndex)).getName()).build();
            this.tableReference = BigQueryUtils.toTableReference(((BigQueryStorageStreamBundleSource) m51getCurrentSource).readSession.getTable());
            this.serviceCallMetric = BigQueryUtils.readCallMetric(this.tableReference);
            LOG.info("Started BigQuery Storage API read from stream {}.", ((ReadStream) ((BigQueryStorageStreamBundleSource) m51getCurrentSource).streamBundle.get(this.currentStreamBundleIndex)).getName());
            this.responseStream = this.storageClient.readRows(build, ((BigQueryStorageStreamBundleSource) m51getCurrentSource).readSession.getTable());
            this.responseIterator = this.responseStream.iterator();
            return readNextRecord();
        }

        @RequiresNonNull({"responseIterator"})
        private boolean readNextRecord() throws IOException {
            Iterator<ReadRowsResponse> it = this.responseIterator;
            if (it == null) {
                LOG.info("Received null responseIterator for stream {}", Integer.valueOf(this.currentStreamBundleIndex));
                return false;
            }
            while (this.reader.readyForNextReadResponse()) {
                if (!it.hasNext()) {
                    synchronized (this) {
                        this.currentStreamOffset = 0L;
                        this.currentStreamBundleIndex++;
                    }
                    return readNextStream();
                }
                try {
                    ReadRowsResponse next = it.next();
                    if (this.serviceCallMetric != null) {
                        this.serviceCallMetric.call("ok");
                    }
                    this.progressAtResponseStart = next.getStats().getProgress().getAtResponseStart();
                    this.progressAtResponseEnd = next.getStats().getProgress().getAtResponseEnd();
                    this.totalRowsInCurrentResponse = next.getRowCount();
                    this.rowsConsumedFromCurrentResponse = 0L;
                    org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(this.totalRowsInCurrentResponse >= 0, "Row count from current response (%s) must be non-negative.", this.totalRowsInCurrentResponse);
                    org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(0.0d <= this.progressAtResponseStart && this.progressAtResponseStart <= 1.0d, "Progress at response start (%s) is not in the range [0.0, 1.0].", Double.valueOf(this.progressAtResponseStart));
                    org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(0.0d <= this.progressAtResponseEnd && this.progressAtResponseEnd <= 1.0d, "Progress at response end (%s) is not in the range [0.0, 1.0].", Double.valueOf(this.progressAtResponseEnd));
                    this.reader.processReadRowsResponse(next);
                } catch (ApiException e) {
                    if (this.serviceCallMetric != null) {
                        this.serviceCallMetric.call(e.getStatusCode().getCode().name());
                    }
                    throw e;
                }
            }
            this.current = (T) this.parseFn.apply(new SchemaAndRecord(this.reader.readSingleRecord(), this.tableSchema));
            this.rowsConsumedFromCurrentResponse++;
            this.fractionOfStreamBundleConsumed = (this.currentStreamBundleIndex + (this.progressAtResponseStart + ((this.progressAtResponseEnd - this.progressAtResponseStart) * ((this.rowsConsumedFromCurrentResponse * 1.0d) / this.totalRowsInCurrentResponse)))) / ((BigQueryStorageStreamBundleSource) this.source).streamBundle.size();
            return true;
        }

        public synchronized void close() {
            Preconditions.checkStateNotNull(this.storageClient);
            Preconditions.checkStateNotNull(this.reader);
            this.storageClient.close();
            this.reader.close();
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public synchronized BigQueryStorageStreamBundleSource<T> m51getCurrentSource() {
            return this.source;
        }

        public synchronized Double getFractionConsumed() {
            return Double.valueOf(this.fractionOfStreamBundleConsumed);
        }
    }

    public static <T> BigQueryStorageStreamBundleSource<T> create(ReadSession readSession, List<ReadStream> list, TableSchema tableSchema, SerializableFunction<SchemaAndRecord, T> serializableFunction, Coder<T> coder, BigQueryServices bigQueryServices, long j) {
        return new BigQueryStorageStreamBundleSource<>(readSession, list, BigQueryHelpers.toJsonString(Preconditions.checkArgumentNotNull(tableSchema, "tableSchema")), serializableFunction, coder, bigQueryServices, j);
    }

    public BigQueryStorageStreamBundleSource<T> fromExisting(List<ReadStream> list) {
        return new BigQueryStorageStreamBundleSource<>(this.readSession, list, this.jsonTableSchema, this.parseFn, this.outputCoder, this.bqServices, getMinBundleSize());
    }

    private BigQueryStorageStreamBundleSource(ReadSession readSession, List<ReadStream> list, String str, SerializableFunction<SchemaAndRecord, T> serializableFunction, Coder<T> coder, BigQueryServices bigQueryServices, long j) {
        super(0L, list.size(), j);
        this.readSession = (ReadSession) Preconditions.checkArgumentNotNull(readSession, "readSession");
        this.streamBundle = (List) Preconditions.checkArgumentNotNull(list, "streams");
        this.jsonTableSchema = (String) Preconditions.checkArgumentNotNull(str, "jsonTableSchema");
        this.parseFn = (SerializableFunction) Preconditions.checkArgumentNotNull(serializableFunction, "parseFn");
        this.outputCoder = (Coder) Preconditions.checkArgumentNotNull(coder, "outputCoder");
        this.bqServices = (BigQueryServices) Preconditions.checkArgumentNotNull(bigQueryServices, "bqServices");
    }

    public Coder<T> getOutputCoder() {
        return this.outputCoder;
    }

    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        builder.add(DisplayData.item("table", this.readSession.getTable()).withLabel("Table")).add(DisplayData.item("readSession", this.readSession.getName()).withLabel("Read session"));
        Iterator<ReadStream> it = this.streamBundle.iterator();
        while (it.hasNext()) {
            builder.add(DisplayData.item("stream", it.next().getName()).withLabel("Stream"));
        }
    }

    public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) {
        return 0L;
    }

    public List<? extends OffsetBasedSource<T>> split(long j, PipelineOptions pipelineOptions) {
        return ImmutableList.of(this);
    }

    public long getMaxEndOffset(PipelineOptions pipelineOptions) throws Exception {
        return this.streamBundle.size();
    }

    public OffsetBasedSource<T> createSourceForSubrange(long j, long j2) {
        return fromExisting(this.streamBundle.subList((int) j, (int) j2));
    }

    /* renamed from: createReader, reason: merged with bridge method [inline-methods] */
    public BigQueryStorageStreamBundleReader<T> m47createReader(PipelineOptions pipelineOptions) throws IOException {
        return new BigQueryStorageStreamBundleReader<>((BigQueryOptions) pipelineOptions.as(BigQueryOptions.class));
    }
}
