package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerException;
import io.opencensus.common.Scope;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.util.Objects;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSet;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.class */
public class QueryChangeStreamAction {
    private static final Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class);
    private static final Tracer TRACER = Tracing.getTracer();
    private static final Duration BUNDLE_FINALIZER_TIMEOUT = Duration.standardMinutes(5);
    private static final String OUT_OF_RANGE_ERROR_MESSAGE = "Specified start_timestamp is invalid";
    private final ChangeStreamDao changeStreamDao;
    private final PartitionMetadataDao partitionMetadataDao;
    private final ChangeStreamRecordMapper changeStreamRecordMapper;
    private final PartitionMetadataMapper partitionMetadataMapper;
    private final DataChangeRecordAction dataChangeRecordAction;
    private final HeartbeatRecordAction heartbeatRecordAction;
    private final ChildPartitionsRecordAction childPartitionsRecordAction;

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueryChangeStreamAction(ChangeStreamDao changeStreamDao, PartitionMetadataDao partitionMetadataDao, ChangeStreamRecordMapper changeStreamRecordMapper, PartitionMetadataMapper partitionMetadataMapper, DataChangeRecordAction dataChangeRecordAction, HeartbeatRecordAction heartbeatRecordAction, ChildPartitionsRecordAction childPartitionsRecordAction) {
        this.changeStreamDao = changeStreamDao;
        this.partitionMetadataDao = partitionMetadataDao;
        this.changeStreamRecordMapper = changeStreamRecordMapper;
        this.partitionMetadataMapper = partitionMetadataMapper;
        this.dataChangeRecordAction = dataChangeRecordAction;
        this.heartbeatRecordAction = heartbeatRecordAction;
        this.childPartitionsRecordAction = childPartitionsRecordAction;
    }

    @VisibleForTesting
    public DoFn.ProcessContinuation run(PartitionMetadata partitionMetadata, RestrictionTracker<OffsetRange, Long> restrictionTracker, DoFn.OutputReceiver<DataChangeRecord> outputReceiver, ManualWatermarkEstimator<Instant> manualWatermarkEstimator, DoFn.BundleFinalizer bundleFinalizer) {
        Optional<DoFn.ProcessContinuation> run;
        String partitionToken = partitionMetadata.getPartitionToken();
        Timestamp endTimestamp = partitionMetadata.getEndTimestamp();
        Timestamp ofTimeMicroseconds = Timestamp.ofTimeMicroseconds(((OffsetRange) restrictionTracker.currentRestriction()).getFrom());
        Timestamp ofTimeMicroseconds2 = ofTimeMicroseconds.compareTo(partitionMetadata.getStartTimestamp()) == 0 ? ofTimeMicroseconds : Timestamp.ofTimeMicroseconds(TimestampConverter.timestampToMicros(ofTimeMicroseconds) - 1);
        Scope startScopedSpan = TRACER.spanBuilder("QueryChangeStreamAction").setRecordEvents(true).startScopedSpan();
        try {
            TRACER.getCurrentSpan().putAttribute(ChangeStreamMetrics.PARTITION_ID_ATTRIBUTE_LABEL, AttributeValue.stringAttributeValue(partitionToken));
            Optional ofNullable = Optional.ofNullable(this.partitionMetadataDao.getPartition(partitionToken));
            PartitionMetadataMapper partitionMetadataMapper = this.partitionMetadataMapper;
            Objects.requireNonNull(partitionMetadataMapper);
            PartitionMetadata partitionMetadata2 = (PartitionMetadata) ofNullable.map(partitionMetadataMapper::from).orElseThrow(() -> {
                return new IllegalStateException("Partition " + partitionToken + " not found in metadata table");
            });
            try {
                ChangeStreamResultSet changeStreamQuery = this.changeStreamDao.changeStreamQuery(partitionToken, ofTimeMicroseconds2, endTimestamp, partitionMetadata.getHeartbeatMillis());
                while (changeStreamQuery.next()) {
                    try {
                        for (ChangeStreamRecord changeStreamRecord : this.changeStreamRecordMapper.toChangeStreamRecords(partitionMetadata2, changeStreamQuery.getCurrentRowAsStruct(), changeStreamQuery.getMetadata())) {
                            if (changeStreamRecord.getRecordTimestamp().compareTo(ofTimeMicroseconds) >= 0) {
                                if (changeStreamRecord instanceof DataChangeRecord) {
                                    run = this.dataChangeRecordAction.run(partitionMetadata2, (DataChangeRecord) changeStreamRecord, restrictionTracker, outputReceiver, manualWatermarkEstimator);
                                } else if (changeStreamRecord instanceof HeartbeatRecord) {
                                    run = this.heartbeatRecordAction.run(partitionMetadata2, (HeartbeatRecord) changeStreamRecord, restrictionTracker, manualWatermarkEstimator);
                                } else {
                                    if (!(changeStreamRecord instanceof ChildPartitionsRecord)) {
                                        LOG.error("[" + partitionToken + "] Unknown record type " + changeStreamRecord.getClass());
                                        throw new IllegalArgumentException("Unknown record type " + changeStreamRecord.getClass());
                                    }
                                    run = this.childPartitionsRecordAction.run(partitionMetadata2, (ChildPartitionsRecord) changeStreamRecord, restrictionTracker, manualWatermarkEstimator);
                                }
                                if (run.isPresent()) {
                                    LOG.debug("[" + partitionToken + "] Continuation present, returning " + run);
                                    bundleFinalizer.afterBundleCommit(Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT), updateWatermarkCallback(partitionToken, manualWatermarkEstimator));
                                    DoFn.ProcessContinuation processContinuation = run.get();
                                    if (changeStreamQuery != null) {
                                        $closeResource(null, changeStreamQuery);
                                    }
                                    return processContinuation;
                                }
                            }
                        }
                    } catch (Throwable th) {
                        if (changeStreamQuery != null) {
                            $closeResource(null, changeStreamQuery);
                        }
                        throw th;
                    }
                }
                bundleFinalizer.afterBundleCommit(Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT), updateWatermarkCallback(partitionToken, manualWatermarkEstimator));
                if (changeStreamQuery != null) {
                    $closeResource(null, changeStreamQuery);
                }
            } catch (SpannerException e) {
                if (!isTimestampOutOfRange(e)) {
                    throw e;
                }
                LOG.debug("[" + partitionToken + "] query change stream is out of range for " + ofTimeMicroseconds2 + " to " + endTimestamp + ", finishing stream");
            }
            if (startScopedSpan != null) {
                $closeResource(null, startScopedSpan);
            }
            long timestampToMicros = TimestampConverter.timestampToMicros(endTimestamp);
            LOG.debug("[" + partitionToken + "] change stream completed successfully");
            if (restrictionTracker.tryClaim(Long.valueOf(timestampToMicros))) {
                LOG.debug("[" + partitionToken + "] Finishing partition");
                this.partitionMetadataDao.updateToFinished(partitionToken);
                LOG.info("[" + partitionToken + "] Partition finished");
            }
            return DoFn.ProcessContinuation.stop();
        } finally {
            if (startScopedSpan != null) {
                $closeResource(null, startScopedSpan);
            }
        }
    }

    private DoFn.BundleFinalizer.Callback updateWatermarkCallback(String str, WatermarkEstimator<Instant> watermarkEstimator) {
        return () -> {
            Instant currentWatermark = watermarkEstimator.currentWatermark();
            LOG.debug("[" + str + "] Updating current watermark to " + currentWatermark);
            try {
                this.partitionMetadataDao.updateWatermark(str, TimestampConverter.timestampFromMillis(currentWatermark.getMillis()));
            } catch (SpannerException e) {
                if (e.getErrorCode() == ErrorCode.NOT_FOUND) {
                    LOG.debug("[" + str + "] Unable to update the current watermark, partition NOT FOUND");
                } else {
                    LOG.error("[" + str + "] Error updating the current watermark: " + e.getMessage(), e);
                }
            }
        };
    }

    private boolean isTimestampOutOfRange(SpannerException spannerException) {
        return (spannerException.getErrorCode() == ErrorCode.INVALID_ARGUMENT || spannerException.getErrorCode() == ErrorCode.OUT_OF_RANGE) && spannerException.getMessage() != null && spannerException.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE);
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
