package com.google.cloud.flink.bigquery.source.split.reader;

import com.codahale.metrics.SlidingWindowReservoir;
import com.google.cloud.flink.bigquery.examples.shaded.com.google.cloud.bigquery.storage.v1.AvroRows;
import com.google.cloud.flink.bigquery.examples.shaded.com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.flink.bigquery.examples.shaded.com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.flink.bigquery.services.BigQueryServices;
import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory;
import com.google.cloud.flink.bigquery.source.config.BigQueryReadOptions;
import com.google.cloud.flink.bigquery.source.reader.BigQuerySourceReaderContext;
import com.google.cloud.flink.bigquery.source.split.BigQuerySourceSplit;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:com/google/cloud/flink/bigquery/source/split/reader/BigQuerySourceSplitReader.class */
public class BigQuerySourceSplitReader implements SplitReader<GenericRecord, BigQuerySourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceSplitReader.class);
    private final BigQueryReadOptions readOptions;
    private final BigQuerySourceReaderContext readerContext;
    private final transient Optional<Histogram> readSplitTimeMetric;
    private final Configuration configuration;
    private Long splitStartFetch;
    private final Queue<BigQuerySourceSplit> assignedSplits = new ArrayDeque();
    private Boolean closed = false;
    private Schema avroSchema = null;
    private Long readSoFar = 0L;
    private Iterator<ReadRowsResponse> readStreamIterator = null;

    /* loaded from: input_file:com/google/cloud/flink/bigquery/source/split/reader/BigQuerySourceSplitReader$GenericRecordReader.class */
    static class GenericRecordReader {
        private final Schema schema;

        private GenericRecordReader(Schema schema) {
            Preconditions.checkNotNull(schema, "The provided avro schema reference is null.");
            this.schema = schema;
        }

        public static GenericRecordReader create(Schema schema) {
            return new GenericRecordReader(schema);
        }

        public List<GenericRecord> processRows(AvroRows avroRows) throws IOException {
            BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(avroRows.getSerializedBinaryRows().toByteArray(), (BinaryDecoder) null);
            GenericDatumReader genericDatumReader = new GenericDatumReader(this.schema);
            ArrayList arrayList = new ArrayList();
            while (!binaryDecoder.isEnd()) {
                arrayList.add((GenericRecord) genericDatumReader.read(null, binaryDecoder));
            }
            return arrayList;
        }
    }

    public BigQuerySourceSplitReader(BigQueryReadOptions bigQueryReadOptions, BigQuerySourceReaderContext bigQuerySourceReaderContext) {
        this.readOptions = bigQueryReadOptions;
        this.readerContext = bigQuerySourceReaderContext;
        this.configuration = bigQuerySourceReaderContext.getConfiguration();
        this.readSplitTimeMetric = Optional.ofNullable(bigQuerySourceReaderContext.metricGroup()).map(sourceReaderMetricGroup -> {
            return (DropwizardHistogramWrapper) sourceReaderMetricGroup.histogram("bq.split.read.time.ms", new DropwizardHistogramWrapper(new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))));
        });
    }

    Long offsetToFetch(BigQuerySourceSplit bigQuerySourceSplit) {
        if (bigQuerySourceSplit.getOffset().longValue() > 0) {
            this.readSoFar = bigQuerySourceSplit.getOffset();
            this.splitStartFetch = Long.valueOf(System.currentTimeMillis());
        } else if (this.readSoFar.longValue() == 0) {
            this.splitStartFetch = Long.valueOf(System.currentTimeMillis());
        }
        LOG.debug("[subtask #{}] Offset to fetch from {} for stream {}.", new Object[]{Integer.valueOf(this.readerContext.getIndexOfSubtask()), this.readSoFar, bigQuerySourceSplit.getStreamName()});
        return this.readSoFar;
    }

    BigQueryServices.BigQueryServerStream<ReadRowsResponse> retrieveReadStream(BigQuerySourceSplit bigQuerySourceSplit) throws IOException {
        try {
            BigQueryServices.StorageReadClient storageRead = BigQueryServicesFactory.instance(this.readOptions.getBigQueryConnectOptions()).storageRead();
            Throwable th = null;
            try {
                BigQueryServices.BigQueryServerStream<ReadRowsResponse> readRows = storageRead.readRows(ReadRowsRequest.newBuilder().setReadStream(bigQuerySourceSplit.getStreamName()).setOffset(offsetToFetch(bigQuerySourceSplit).longValue()).build());
                if (storageRead != null) {
                    if (0 != 0) {
                        try {
                            storageRead.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        storageRead.close();
                    }
                }
                return readRows;
            } finally {
            }
        } catch (Exception e) {
            throw new IOException(String.format("[subtask #%d][hostname %s] Problems while opening the stream %s from BigQuery with connection info %s. Current split offset %d, reader offset %d.", Integer.valueOf(this.readerContext.getIndexOfSubtask()), this.readerContext.getLocalHostName(), Optional.ofNullable(bigQuerySourceSplit.getStreamName()).orElse("NA"), this.readOptions.toString(), bigQuerySourceSplit.getOffset(), this.readSoFar), e);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:48:0x024f, code lost:
    
        r10.readSoFar = java.lang.Long.valueOf(r10.readSoFar.longValue() + r14);
     */
    /* JADX WARN: Code restructure failed: missing block: B:49:0x0266, code lost:
    
        if (r16.booleanValue() != false) goto L48;
     */
    /* JADX WARN: Code restructure failed: missing block: B:50:0x0269, code lost:
    
        r10.readerContext.updateReadCount(r10.readSoFar);
        r0 = java.lang.Long.valueOf(java.lang.System.currentTimeMillis() - r10.splitStartFetch.longValue());
        r10.readSplitTimeMetric.ifPresent((v1) -> { // java.util.function.Consumer.accept(java.lang.Object):void
            lambda$fetch$2(r1, v1);
        });
        com.google.cloud.flink.bigquery.source.split.reader.BigQuerySourceSplitReader.LOG.info("[subtask #{}][hostname {}] Completed reading split, {} records in {}ms on stream {}.", new java.lang.Object[]{java.lang.Integer.valueOf(r10.readerContext.getIndexOfSubtask()), r10.readerContext.getLocalHostName(), r10.readSoFar, r0, r0.splitId()});
        r10.readSoFar = 0L;
        r10.assignedSplits.poll();
        r10.readStreamIterator = null;
        r0.addFinishedSplit(r0.splitId());
     */
    /* JADX WARN: Code restructure failed: missing block: B:52:0x0339, code lost:
    
        return r0.build();
     */
    /* JADX WARN: Code restructure failed: missing block: B:53:0x02ee, code lost:
    
        com.google.cloud.flink.bigquery.source.split.reader.BigQuerySourceSplitReader.LOG.debug("[subtask #{}][hostname {}] Completed a partial fetch in {}ms, so far read {} from stream {}.", new java.lang.Object[]{java.lang.Integer.valueOf(r10.readerContext.getIndexOfSubtask()), r10.readerContext.getLocalHostName(), java.lang.Long.valueOf(java.lang.System.currentTimeMillis() - r0.longValue()), r10.readSoFar, r0.getStreamName()});
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public org.apache.flink.connector.base.source.reader.RecordsWithSplitIds<org.apache.avro.generic.GenericRecord> fetch() throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 946
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.google.cloud.flink.bigquery.source.split.reader.BigQuerySourceSplitReader.fetch():org.apache.flink.connector.base.source.reader.RecordsWithSplitIds");
    }

    public void handleSplitsChanges(SplitsChange<BigQuerySourceSplit> splitsChange) {
        LOG.debug("Handle split changes {}.", splitsChange);
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        this.assignedSplits.addAll(splitsChange.splits());
    }

    public void wakeUp() {
        LOG.debug("[subtask #{}][hostname %{}] Wake up called.", Integer.valueOf(this.readerContext.getIndexOfSubtask()), this.readerContext.getLocalHostName());
    }

    public void close() throws Exception {
        LOG.debug("[subtask #{}][hostname {}] Close called, assigned splits {}.", new Object[]{Integer.valueOf(this.readerContext.getIndexOfSubtask()), this.readerContext.getLocalHostName(), this.assignedSplits.toString()});
        if (this.closed.booleanValue()) {
            return;
        }
        this.closed = true;
        this.readSoFar = 0L;
        this.readStreamIterator = null;
    }
}
