package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowCell;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.reconciler.PartitionReconciler;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.class */
public class DetectNewPartitionsAction {
    private static final Logger LOG = LoggerFactory.getLogger(DetectNewPartitionsAction.class);
    private static final Duration DEBUG_WATERMARK_DELAY = Duration.standardMinutes(5);
    private final ChangeStreamMetrics metrics;
    private final MetadataTableDao metadataTableDao;
    private final GenerateInitialPartitionsAction generateInitialPartitionsAction;
    private transient PartitionReconciler partitionReconciler;

    public DetectNewPartitionsAction(ChangeStreamMetrics changeStreamMetrics, MetadataTableDao metadataTableDao, GenerateInitialPartitionsAction generateInitialPartitionsAction) {
        this.metrics = changeStreamMetrics;
        this.metadataTableDao = metadataTableDao;
        this.generateInitialPartitionsAction = generateInitialPartitionsAction;
    }

    private void advanceWatermark(RestrictionTracker<OffsetRange, Long> restrictionTracker, ManualWatermarkEstimator<Instant> manualWatermarkEstimator) throws InvalidProtocolBufferException {
        if (((OffsetRange) restrictionTracker.currentRestriction()).getFrom() % 10 == 0) {
            ServerStream<Row> readFromMdTableStreamPartitionsWithWatermark = this.metadataTableDao.readFromMdTableStreamPartitionsWithWatermark();
            ArrayList arrayList = new ArrayList();
            HashMap hashMap = new HashMap();
            ReadableInstant ofEpochMilli = Instant.ofEpochMilli(Long.MAX_VALUE);
            Iterator it = readFromMdTableStreamPartitionsWithWatermark.iterator();
            while (it.hasNext()) {
                Row row = (Row) it.next();
                ReadableInstant parseWatermarkFromRow = MetadataTableEncoder.parseWatermarkFromRow(row);
                if (parseWatermarkFromRow != null) {
                    if (parseWatermarkFromRow.compareTo(ofEpochMilli) < 0) {
                        ofEpochMilli = parseWatermarkFromRow;
                    }
                    Range.ByteStringRange convertStreamPartitionRowKeyToPartition = this.metadataTableDao.convertStreamPartitionRowKeyToPartition(row.getKey());
                    arrayList.add(convertStreamPartitionRowKeyToPartition);
                    if (parseWatermarkFromRow.plus(DEBUG_WATERMARK_DELAY).isBeforeNow()) {
                        hashMap.put(convertStreamPartitionRowKeyToPartition, parseWatermarkFromRow);
                    }
                }
            }
            List<Range.ByteStringRange> missingAndOverlappingPartitionsFromKeySpace = ByteStringRangeHelper.getMissingAndOverlappingPartitionsFromKeySpace(arrayList);
            if (missingAndOverlappingPartitionsFromKeySpace.isEmpty()) {
                manualWatermarkEstimator.setWatermark(ofEpochMilli);
                LOG.info("DNP: Updating watermark: " + manualWatermarkEstimator.currentWatermark());
            } else {
                LOG.warn("DNP: Could not update watermark because missing {}", ByteStringRangeHelper.partitionsToString(missingAndOverlappingPartitionsFromKeySpace));
            }
            if (!hashMap.isEmpty()) {
                LOG.warn("DNP: Watermark is being held back by the following partitions: {}", hashMap.entrySet().stream().map(entry -> {
                    return ByteStringRangeHelper.formatByteStringRange((Range.ByteStringRange) entry.getKey()) + " => " + entry.getValue();
                }).collect(Collectors.joining(", ", "{", "}")));
            }
            this.partitionReconciler.addMissingPartitions(missingAndOverlappingPartitionsFromKeySpace);
        }
    }

    private void processNewPartition(Row row, DoFn.OutputReceiver<PartitionRecord> outputReceiver, List<ByteString> list) throws Exception {
        Range.ByteStringRange convertNewPartitionRowKeyToPartition = this.metadataTableDao.convertNewPartitionRowKeyToPartition(row.getKey());
        this.partitionReconciler.addNewPartition(convertNewPartitionRowKeyToPartition, row.getKey());
        ArrayList arrayList = new ArrayList();
        Iterator it = row.getCells(MetadataTableAdminDao.CF_PARENT_PARTITIONS).iterator();
        while (it.hasNext()) {
            arrayList.add(Range.ByteStringRange.toByteStringRange(((RowCell) it.next()).getQualifier()));
        }
        if (!ByteStringRangeHelper.isSuperset(arrayList, convertNewPartitionRowKeyToPartition)) {
            LOG.warn("DNP: New partition: {} does not have all the parents {}", ByteStringRangeHelper.formatByteStringRange(convertNewPartitionRowKeyToPartition), ByteStringRangeHelper.partitionsToString(arrayList));
            return;
        }
        ImmutableList.Builder builder = ImmutableList.builder();
        Iterator it2 = row.getCells(MetadataTableAdminDao.CF_INITIAL_TOKEN).iterator();
        while (it2.hasNext()) {
            builder.add(ChangeStreamContinuationToken.fromByteString(((RowCell) it2.next()).getQualifier()));
        }
        ImmutableList build = builder.build();
        if (arrayList.size() != build.size()) {
            LOG.warn("DNP: New partition {} parent partitions count {} != continuation token count {}", new Object[]{ByteStringRangeHelper.formatByteStringRange(convertNewPartitionRowKeyToPartition), Integer.valueOf(arrayList.size()), Integer.valueOf(build.size())});
        }
        ArrayList arrayList2 = new ArrayList();
        Iterator it3 = row.getCells(MetadataTableAdminDao.CF_PARENT_LOW_WATERMARKS).iterator();
        while (it3.hasNext()) {
            arrayList2.add(Long.valueOf(Longs.fromByteArray(((RowCell) it3.next()).getValue().toByteArray())));
        }
        PartitionRecord partitionRecord = new PartitionRecord(convertNewPartitionRowKeyToPartition, (List<ChangeStreamContinuationToken>) build, UniqueIdGenerator.getNextId(), Instant.ofEpochMilli(((Long) Collections.min(arrayList2)).longValue()));
        if (arrayList.size() > 1) {
            this.metrics.incPartitionMergeCount();
        } else {
            this.metrics.incPartitionSplitCount();
        }
        LOG.info("DNP: Split/Merge {} into {}", arrayList.stream().map(ByteStringRangeHelper::formatByteStringRange).collect(Collectors.joining(",", "{", "}")), ByteStringRangeHelper.formatByteStringRange(convertNewPartitionRowKeyToPartition));
        outputReceiver.outputWithTimestamp(partitionRecord, Instant.EPOCH);
        list.add(row.getKey());
    }

    private void processReconcilerPartitions(DoFn.OutputReceiver<PartitionRecord> outputReceiver, ManualWatermarkEstimator<Instant> manualWatermarkEstimator, Instant instant, List<ByteString> list) {
        for (Map.Entry<Range.ByteStringRange, Set<ByteString>> entry : this.partitionReconciler.getPartitionsToReconcile().entrySet()) {
            String nextId = UniqueIdGenerator.getNextId();
            Instant minus = manualWatermarkEstimator.currentWatermark().minus(Duration.standardMinutes(60L));
            if (minus.compareTo(instant) < 0) {
                minus = instant;
            }
            PartitionRecord partitionRecord = new PartitionRecord(entry.getKey(), minus, nextId, minus);
            outputReceiver.outputWithTimestamp(partitionRecord, Instant.EPOCH);
            list.addAll(entry.getValue());
            LOG.warn("DNP: Reconciling missing partition: {} and cleaning up rows {}", partitionRecord, entry.getValue().stream().map(byteString -> {
                try {
                    return ByteStringRangeHelper.formatByteStringRange(this.metadataTableDao.convertNewPartitionRowKeyToPartition(byteString));
                } catch (InvalidProtocolBufferException e) {
                    return byteString.toStringUtf8();
                }
            }).collect(Collectors.joining(", ", "{", "}")));
        }
    }

    private void cleanUpAfterCommit(DoFn.BundleFinalizer bundleFinalizer, List<ByteString> list) {
        bundleFinalizer.afterBundleCommit(Instant.ofEpochMilli(Long.MAX_VALUE), () -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                this.metadataTableDao.deleteRowKey((ByteString) it.next());
            }
        });
    }

    @VisibleForTesting
    public DoFn.ProcessContinuation run(RestrictionTracker<OffsetRange, Long> restrictionTracker, DoFn.OutputReceiver<PartitionRecord> outputReceiver, ManualWatermarkEstimator<Instant> manualWatermarkEstimator, DoFn.BundleFinalizer bundleFinalizer, Instant instant) throws Exception {
        LOG.debug("DNP: Watermark: " + manualWatermarkEstimator.getState());
        LOG.debug("DNP: CurrentTracker: " + ((OffsetRange) restrictionTracker.currentRestriction()).getFrom());
        if (((OffsetRange) restrictionTracker.currentRestriction()).getFrom() == 0) {
            return this.generateInitialPartitionsAction.run(outputReceiver, restrictionTracker, manualWatermarkEstimator, instant);
        }
        this.partitionReconciler = new PartitionReconciler(this.metadataTableDao);
        advanceWatermark(restrictionTracker, manualWatermarkEstimator);
        if (!restrictionTracker.tryClaim(Long.valueOf(((OffsetRange) restrictionTracker.currentRestriction()).getFrom()))) {
            LOG.error("DNP: Couldn't continue because we failed to claim tracker: " + restrictionTracker.currentRestriction());
            return DoFn.ProcessContinuation.stop();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = this.metadataTableDao.readNewPartitions().iterator();
        while (it.hasNext()) {
            processNewPartition((Row) it.next(), outputReceiver, arrayList);
        }
        processReconcilerPartitions(outputReceiver, manualWatermarkEstimator, instant, arrayList);
        cleanUpAfterCommit(bundleFinalizer, arrayList);
        return DoFn.ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1L));
    }
}
