package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
import org.joda.time.Duration;
import org.joda.time.Instant;

@Internal
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.class */
public class ChangeStreamDao {
    private final BigtableDataClient dataClient;
    private final String tableId;

    public ChangeStreamDao(BigtableDataClient bigtableDataClient, String str) {
        this.dataClient = bigtableDataClient;
        this.tableId = str;
    }

    public List<Range.ByteStringRange> generateInitialChangeStreamPartitions() {
        return (List) this.dataClient.generateInitialChangeStreamPartitionsCallable().all().call(this.tableId);
    }

    public ServerStream<ChangeStreamRecord> readChangeStreamPartition(PartitionRecord partitionRecord, StreamProgress streamProgress, @Nullable Instant instant, Duration duration) throws IOException {
        ReadChangeStreamQuery streamPartition = ReadChangeStreamQuery.create(this.tableId).streamPartition(partitionRecord.getPartition());
        ChangeStreamContinuationToken currentToken = streamProgress.getCurrentToken();
        Instant startTime = partitionRecord.getStartTime();
        List<ChangeStreamContinuationToken> changeStreamContinuationTokens = partitionRecord.getChangeStreamContinuationTokens();
        if (currentToken != null) {
            streamPartition.continuationTokens(Collections.singletonList(currentToken));
        } else if (startTime != null) {
            streamPartition.startTime(TimestampConverter.toThreetenInstant(startTime));
        } else {
            if (changeStreamContinuationTokens == null) {
                throw new IOException("Something went wrong");
            }
            streamPartition.continuationTokens(changeStreamContinuationTokens);
        }
        if (instant != null) {
            streamPartition.endTime(TimestampConverter.toThreetenInstant(instant));
        }
        streamPartition.heartbeatDuration(org.threeten.bp.Duration.ofMillis(duration.getMillis()));
        return this.dataClient.readChangeStream(streamPartition);
    }
}
