package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;

import com.google.cloud.Timestamp;
import java.util.Iterator;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.class */
public class ChildPartitionsRecordAction {
    private static final Logger LOG = LoggerFactory.getLogger(ChildPartitionsRecordAction.class);
    private final PartitionMetadataDao partitionMetadataDao;
    private final ChangeStreamMetrics metrics;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChildPartitionsRecordAction(PartitionMetadataDao partitionMetadataDao, ChangeStreamMetrics changeStreamMetrics) {
        this.partitionMetadataDao = partitionMetadataDao;
        this.metrics = changeStreamMetrics;
    }

    @VisibleForTesting
    public Optional<DoFn.ProcessContinuation> run(PartitionMetadata partitionMetadata, ChildPartitionsRecord childPartitionsRecord, RestrictionTracker<TimestampRange, Timestamp> restrictionTracker, ManualWatermarkEstimator<Instant> manualWatermarkEstimator) {
        String partitionToken = partitionMetadata.getPartitionToken();
        LOG.debug("[{}] Processing child partition record {}", partitionToken, childPartitionsRecord);
        Timestamp startTimestamp = childPartitionsRecord.getStartTimestamp();
        Instant instant = new Instant(startTimestamp.toSqlTimestamp().getTime());
        if (!restrictionTracker.tryClaim(startTimestamp)) {
            LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", partitionToken, startTimestamp);
            return Optional.of(DoFn.ProcessContinuation.stop());
        }
        manualWatermarkEstimator.setWatermark(instant);
        Iterator<ChildPartition> it = childPartitionsRecord.getChildPartitions().iterator();
        while (it.hasNext()) {
            processChildPartition(partitionMetadata, childPartitionsRecord, it.next());
        }
        LOG.debug("[{}] Child partitions action completed successfully", partitionToken);
        return Optional.empty();
    }

    private void processChildPartition(PartitionMetadata partitionMetadata, ChildPartitionsRecord childPartitionsRecord, ChildPartition childPartition) {
        String partitionToken = partitionMetadata.getPartitionToken();
        String token = childPartition.getToken();
        boolean isSplit = isSplit(childPartition);
        LOG.debug("[{}] Processing child partition {} event", partitionToken, isSplit ? "split" : "merge");
        PartitionMetadata partitionMetadata2 = toPartitionMetadata(childPartitionsRecord.getStartTimestamp(), partitionMetadata.getEndTimestamp(), partitionMetadata.getHeartbeatMillis(), childPartition);
        LOG.debug("[{}] Inserting child partition token {}", partitionToken, token);
        Boolean bool = (Boolean) this.partitionMetadataDao.runInTransaction(inTransactionContext -> {
            if (inTransactionContext.getPartition(token) != null) {
                return false;
            }
            inTransactionContext.insert(partitionMetadata2);
            return true;
        }).getResult();
        if (bool.booleanValue() && isSplit) {
            this.metrics.incPartitionRecordSplitCount();
        } else if (bool.booleanValue()) {
            this.metrics.incPartitionRecordMergeCount();
        } else {
            LOG.debug("[{}] Child token {} already exists, skipping...", partitionToken, token);
        }
    }

    private boolean isSplit(ChildPartition childPartition) {
        return childPartition.getParentTokens().size() == 1;
    }

    private PartitionMetadata toPartitionMetadata(Timestamp timestamp, Timestamp timestamp2, long j, ChildPartition childPartition) {
        return PartitionMetadata.newBuilder().setPartitionToken(childPartition.getToken()).setParentTokens(childPartition.getParentTokens()).setStartTimestamp(timestamp).setEndTimestamp(timestamp2).setHeartbeatMillis(j).setState(PartitionMetadata.State.CREATED).setWatermark(timestamp).build();
    }
}
