package org.apache.beam.sdk.io.gcp.bigtable.changestreams.reconciler;

import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/PartitionReconciler.class */
public class PartitionReconciler {
    private final MetadataTableDao metadataTableDao;
    private final ChangeStreamMetrics metrics;
    private static final Logger LOG = LoggerFactory.getLogger(PartitionReconciler.class);
    private static final Duration MISSING_PARTITION_SHORT_DELAY = Duration.standardMinutes(2);
    private static final Duration MISSING_PARTITION_LONG_DELAY = Duration.standardMinutes(20);
    private HashMap<Range.ByteStringRange, Instant> missingPartitionDurations = new HashMap<>();
    private final List<NewPartition> newPartitions = new ArrayList();
    private boolean hasAddedMissingPartitions = false;

    public PartitionReconciler(MetadataTableDao metadataTableDao, ChangeStreamMetrics changeStreamMetrics) {
        this.metadataTableDao = metadataTableDao;
        this.metrics = changeStreamMetrics;
    }

    public void addMissingPartitions(List<Range.ByteStringRange> list) {
        this.hasAddedMissingPartitions = true;
        HashMap<Range.ByteStringRange, Instant> readDetectNewPartitionMissingPartitions = this.metadataTableDao.readDetectNewPartitionMissingPartitions();
        this.missingPartitionDurations = new HashMap<>();
        Instant now = Instant.now();
        for (Range.ByteStringRange byteStringRange : list) {
            this.missingPartitionDurations.put(byteStringRange, readDetectNewPartitionMissingPartitions.getOrDefault(byteStringRange, now));
        }
        this.metadataTableDao.writeDetectNewPartitionMissingPartitions(this.missingPartitionDurations);
    }

    public void addIncompleteNewPartitions(NewPartition newPartition) {
        this.newPartitions.add(newPartition);
    }

    private List<NewPartition> findOverlappingNewPartitions(Range.ByteStringRange byteStringRange) {
        NewPartition singleTokenNewPartition;
        ArrayList arrayList = new ArrayList();
        for (NewPartition newPartition : this.newPartitions) {
            for (Range.ByteStringRange byteStringRange2 : newPartition.getParentPartitions()) {
                if (ByteStringRangeHelper.doPartitionsOverlap(byteStringRange2, byteStringRange) && (singleTokenNewPartition = newPartition.getSingleTokenNewPartition(byteStringRange2)) != null) {
                    arrayList.add(singleTokenNewPartition);
                }
            }
        }
        return arrayList;
    }

    public List<PartitionRecord> getPartitionsToReconcile(Instant instant, Instant instant2) {
        if (!this.hasAddedMissingPartitions) {
            return Collections.emptyList();
        }
        this.hasAddedMissingPartitions = false;
        Instant minus = instant.minus(Duration.standardMinutes(60L));
        if (minus.compareTo(instant2) < 0) {
            minus = instant2;
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (Map.Entry<Range.ByteStringRange, Instant> entry : this.missingPartitionDurations.entrySet()) {
            if (entry.getValue().plus(MISSING_PARTITION_SHORT_DELAY).isBeforeNow()) {
                Range.ByteStringRange key = entry.getKey();
                List<NewPartition> findOverlappingNewPartitions = findOverlappingNewPartitions(key);
                ArrayList arrayList3 = new ArrayList();
                Iterator<NewPartition> it = findOverlappingNewPartitions.iterator();
                while (it.hasNext()) {
                    arrayList3.add(it.next().getParentPartitions().get(0));
                }
                if (ByteStringRangeHelper.coverSameKeySpace(arrayList3, key)) {
                    ArrayList arrayList4 = new ArrayList();
                    Iterator<NewPartition> it2 = findOverlappingNewPartitions.iterator();
                    while (it2.hasNext()) {
                        arrayList4.add(it2.next().getChangeStreamContinuationTokens().get(0));
                    }
                    this.metrics.incPartitionReconciledWithTokenCount();
                    PartitionRecord partitionRecord = new PartitionRecord(key, arrayList4, instant, findOverlappingNewPartitions);
                    arrayList2.add(key);
                    arrayList.add(partitionRecord);
                } else if (entry.getValue().plus(MISSING_PARTITION_LONG_DELAY).isBeforeNow()) {
                    for (NewPartition newPartition : findOverlappingNewPartitions) {
                        this.metrics.incPartitionReconciledWithTokenCount();
                        arrayList.add(new PartitionRecord(newPartition.getChangeStreamContinuationTokens().get(0).getPartition(), newPartition.getChangeStreamContinuationTokens(), instant, (List<NewPartition>) Collections.singletonList(newPartition)));
                    }
                    for (Range.ByteStringRange byteStringRange : ByteStringRangeHelper.getMissingPartitionsFrom(arrayList3, (ByteString) key.getStart(), (ByteString) key.getEnd())) {
                        this.metrics.incPartitionReconciledWithoutTokenCount();
                        PartitionRecord partitionRecord2 = new PartitionRecord(byteStringRange, minus, instant, (List<NewPartition>) Collections.emptyList());
                        arrayList.add(partitionRecord2);
                        LOG.error("DNP: Reconciling partition because we're missing a token {}", partitionRecord2);
                    }
                    arrayList2.add(key);
                }
            }
        }
        Iterator it3 = arrayList2.iterator();
        while (it3.hasNext()) {
            this.missingPartitionDurations.remove((Range.ByteStringRange) it3.next());
        }
        this.metadataTableDao.writeDetectNewPartitionMissingPartitions(this.missingPartitionDurations);
        return arrayList;
    }
}
