package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerException;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.class */
public class QueryChangeStreamAction {
    private static final Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class);
    private static final Duration BUNDLE_FINALIZER_TIMEOUT = Duration.standardMinutes(5);
    private static final String OUT_OF_RANGE_ERROR_MESSAGE = "Specified start_timestamp is invalid";
    private final ChangeStreamDao changeStreamDao;
    private final PartitionMetadataDao partitionMetadataDao;
    private final ChangeStreamRecordMapper changeStreamRecordMapper;
    private final PartitionMetadataMapper partitionMetadataMapper;
    private final DataChangeRecordAction dataChangeRecordAction;
    private final HeartbeatRecordAction heartbeatRecordAction;
    private final ChildPartitionsRecordAction childPartitionsRecordAction;
    private final ChangeStreamMetrics metrics;

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueryChangeStreamAction(ChangeStreamDao changeStreamDao, PartitionMetadataDao partitionMetadataDao, ChangeStreamRecordMapper changeStreamRecordMapper, PartitionMetadataMapper partitionMetadataMapper, DataChangeRecordAction dataChangeRecordAction, HeartbeatRecordAction heartbeatRecordAction, ChildPartitionsRecordAction childPartitionsRecordAction, ChangeStreamMetrics changeStreamMetrics) {
        this.changeStreamDao = changeStreamDao;
        this.partitionMetadataDao = partitionMetadataDao;
        this.changeStreamRecordMapper = changeStreamRecordMapper;
        this.partitionMetadataMapper = partitionMetadataMapper;
        this.dataChangeRecordAction = dataChangeRecordAction;
        this.heartbeatRecordAction = heartbeatRecordAction;
        this.childPartitionsRecordAction = childPartitionsRecordAction;
        this.metrics = changeStreamMetrics;
    }

    /* JADX WARN: Code restructure failed: missing block: B:18:0x012d, code lost:
    
        org.apache.beam.sdk.io.gcp.spanner.changestreams.action.QueryChangeStreamAction.LOG.debug("[{}] Continuation present, returning {}", r0, r20);
        r12.afterBundleCommit(org.joda.time.Instant.now().plus(org.apache.beam.sdk.io.gcp.spanner.changestreams.action.QueryChangeStreamAction.BUNDLE_FINALIZER_TIMEOUT), updateWatermarkCallback(r0, r11));
        r0 = r20.get();
     */
    /* JADX WARN: Code restructure failed: missing block: B:19:0x015f, code lost:
    
        if (r0 == null) goto L31;
     */
    /* JADX WARN: Code restructure failed: missing block: B:21:0x0164, code lost:
    
        if (0 == 0) goto L30;
     */
    /* JADX WARN: Code restructure failed: missing block: B:22:0x017b, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:24:0x0167, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:26:0x016f, code lost:
    
        r24 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:27:0x0171, code lost:
    
        r0.addSuppressed(r24);
     */
    /* JADX WARN: Code restructure failed: missing block: B:43:0x0189, code lost:
    
        r12.afterBundleCommit(org.joda.time.Instant.now().plus(org.apache.beam.sdk.io.gcp.spanner.changestreams.action.QueryChangeStreamAction.BUNDLE_FINALIZER_TIMEOUT), updateWatermarkCallback(r0, r11));
     */
    /* JADX WARN: Code restructure failed: missing block: B:44:0x01a3, code lost:
    
        if (r0 == null) goto L57;
     */
    /* JADX WARN: Code restructure failed: missing block: B:46:0x01a8, code lost:
    
        if (0 == 0) goto L42;
     */
    /* JADX WARN: Code restructure failed: missing block: B:47:0x01bf, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:49:0x01ab, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:51:0x01b3, code lost:
    
        r19 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:52:0x01b5, code lost:
    
        r0.addSuppressed(r19);
     */
    /* JADX WARN: Removed duplicated region for block: B:78:0x0206  */
    /* JADX WARN: Removed duplicated region for block: B:79:0x0226  */
    @org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public org.apache.beam.sdk.transforms.DoFn.ProcessContinuation run(org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata r8, org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker<org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange, com.google.cloud.Timestamp> r9, org.apache.beam.sdk.transforms.DoFn.OutputReceiver<org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord> r10, org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator<org.joda.time.Instant> r11, org.apache.beam.sdk.transforms.DoFn.BundleFinalizer r12) {
        /*
            Method dump skipped, instructions count: 619
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.beam.sdk.io.gcp.spanner.changestreams.action.QueryChangeStreamAction.run(org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata, org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker, org.apache.beam.sdk.transforms.DoFn$OutputReceiver, org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator, org.apache.beam.sdk.transforms.DoFn$BundleFinalizer):org.apache.beam.sdk.transforms.DoFn$ProcessContinuation");
    }

    private DoFn.BundleFinalizer.Callback updateWatermarkCallback(String str, WatermarkEstimator<Instant> watermarkEstimator) {
        return () -> {
            Instant currentWatermark = watermarkEstimator.currentWatermark();
            LOG.debug("[{}] Updating current watermark to {}", str, currentWatermark);
            try {
                this.partitionMetadataDao.updateWatermark(str, Timestamp.ofTimeMicroseconds(currentWatermark.getMillis() * 1000));
            } catch (SpannerException e) {
                if (e.getErrorCode() == ErrorCode.NOT_FOUND) {
                    LOG.debug("[{}] Unable to update the current watermark, partition NOT FOUND", str);
                } else {
                    LOG.error("[{}] Error updating the current watermark: {}", new Object[]{str, e.getMessage(), e});
                }
            }
        };
    }

    private boolean isTimestampOutOfRange(SpannerException spannerException) {
        return (spannerException.getErrorCode() == ErrorCode.INVALID_ARGUMENT || spannerException.getErrorCode() == ErrorCode.OUT_OF_RANGE) && spannerException.getMessage() != null && spannerException.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE);
    }
}
