package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Mutation;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowCell;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.SerializationException;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.SerializationUtils;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.DetectNewPartitionsState;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.StreamPartitionWithWatermark;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.class */
public class MetadataTableDao {
    private static final Logger LOG = LoggerFactory.getLogger(MetadataTableDao.class);
    private static final Duration DELETED_NEW_PARTITION_REEVALUATE_DELAY = Duration.standardMinutes(1);
    private final BigtableDataClient dataClient;
    private final String tableId;
    private final ByteString changeStreamNamePrefix;

    public MetadataTableDao(BigtableDataClient bigtableDataClient, String str, ByteString byteString) {
        this.dataClient = bigtableDataClient;
        this.tableId = str;
        this.changeStreamNamePrefix = byteString;
    }

    public ByteString getChangeStreamNamePrefix() {
        return this.changeStreamNamePrefix;
    }

    private ByteString getFullNewPartitionPrefix() {
        return this.changeStreamNamePrefix.concat(MetadataTableAdminDao.NEW_PARTITION_PREFIX);
    }

    private ByteString getFullStreamPartitionPrefix() {
        return this.changeStreamNamePrefix.concat(MetadataTableAdminDao.STREAM_PARTITION_PREFIX);
    }

    private ByteString getFullDetectNewPartition() {
        return this.changeStreamNamePrefix.concat(MetadataTableAdminDao.DETECT_NEW_PARTITION_SUFFIX);
    }

    public Range.ByteStringRange convertStreamPartitionRowKeyToPartition(ByteString byteString) throws InvalidProtocolBufferException {
        return Range.ByteStringRange.toByteStringRange(byteString.substring(this.changeStreamNamePrefix.size() + MetadataTableAdminDao.STREAM_PARTITION_PREFIX.size()));
    }

    public ByteString convertPartitionToStreamPartitionRowKey(Range.ByteStringRange byteStringRange) {
        return getFullStreamPartitionPrefix().concat(Range.ByteStringRange.serializeToByteString(byteStringRange));
    }

    public Range.ByteStringRange convertNewPartitionRowKeyToPartition(ByteString byteString) throws InvalidProtocolBufferException {
        return Range.ByteStringRange.toByteStringRange(byteString.substring(this.changeStreamNamePrefix.size() + MetadataTableAdminDao.NEW_PARTITION_PREFIX.size()));
    }

    public ByteString convertPartitionToNewPartitionRowKey(Range.ByteStringRange byteStringRange) {
        return getFullNewPartitionPrefix().concat(Range.ByteStringRange.serializeToByteString(byteStringRange));
    }

    @Nullable
    public DetectNewPartitionsState readDetectNewPartitionsState() {
        Row readRow = this.dataClient.readRow(this.tableId, getFullDetectNewPartition(), Filters.FILTERS.chain().filter(Filters.FILTERS.family().exactMatch(MetadataTableAdminDao.CF_WATERMARK)).filter(Filters.FILTERS.qualifier().exactMatch(MetadataTableAdminDao.QUALIFIER_DEFAULT)).filter(Filters.FILTERS.limit().cellsPerColumn(1)));
        if (readRow == null) {
            return null;
        }
        Instant parseWatermarkFromRow = MetadataTableEncoder.parseWatermarkFromRow(readRow);
        Instant parseWatermarkLastUpdatedFromRow = MetadataTableEncoder.parseWatermarkLastUpdatedFromRow(readRow);
        if (parseWatermarkFromRow == null || parseWatermarkLastUpdatedFromRow == null) {
            return null;
        }
        return new DetectNewPartitionsState(parseWatermarkFromRow, parseWatermarkLastUpdatedFromRow);
    }

    public List<NewPartition> readNewPartitionsIncludingDeleted() throws InvalidProtocolBufferException {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.dataClient.readRows(Query.create(this.tableId).prefix(getFullNewPartitionPrefix()).filter(Filters.FILTERS.limit().cellsPerColumn(1))).iterator();
        while (it.hasNext()) {
            Row row = (Row) it.next();
            Range.ByteStringRange convertNewPartitionRowKeyToPartition = convertNewPartitionRowKeyToPartition(row.getKey());
            ArrayList arrayList2 = new ArrayList();
            Iterator it2 = row.getCells(MetadataTableAdminDao.CF_INITIAL_TOKEN).iterator();
            while (it2.hasNext()) {
                arrayList2.add(ChangeStreamContinuationToken.fromByteString(((RowCell) it2.next()).getValue()));
            }
            if (!arrayList2.isEmpty()) {
                ArrayList arrayList3 = new ArrayList();
                Iterator it3 = row.getCells(MetadataTableAdminDao.CF_PARENT_LOW_WATERMARKS).iterator();
                while (it3.hasNext()) {
                    arrayList3.add(Long.valueOf(Longs.fromByteArray(((RowCell) it3.next()).getValue().toByteArray())));
                }
                arrayList.add(new NewPartition(convertNewPartitionRowKeyToPartition, arrayList2, Instant.ofEpochMilli(((Long) Collections.min(arrayList3)).longValue())));
            }
        }
        return arrayList;
    }

    public List<NewPartition> readNewPartitions() throws InvalidProtocolBufferException {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.dataClient.readRows(Query.create(this.tableId).prefix(getFullNewPartitionPrefix()).filter(Filters.FILTERS.limit().cellsPerColumn(1))).iterator();
        while (it.hasNext()) {
            Row row = (Row) it.next();
            long j = 0;
            HashSet hashSet = new HashSet();
            for (RowCell rowCell : row.getCells(MetadataTableAdminDao.CF_SHOULD_DELETE)) {
                j = Math.max(j, rowCell.getTimestamp());
                if (TimestampConverter.microsecondToInstant(rowCell.getTimestamp()).plus(DELETED_NEW_PARTITION_REEVALUATE_DELAY).isAfterNow()) {
                    hashSet.add(Range.ByteStringRange.toByteStringRange(rowCell.getQualifier()));
                }
            }
            Range.ByteStringRange convertNewPartitionRowKeyToPartition = convertNewPartitionRowKeyToPartition(row.getKey());
            ArrayList arrayList2 = new ArrayList();
            for (RowCell rowCell2 : row.getCells(MetadataTableAdminDao.CF_INITIAL_TOKEN)) {
                if (!hashSet.contains(Range.ByteStringRange.toByteStringRange(rowCell2.getQualifier()))) {
                    arrayList2.add(ChangeStreamContinuationToken.fromByteString(rowCell2.getValue()));
                    j = Math.max(j, rowCell2.getTimestamp());
                }
            }
            if (!arrayList2.isEmpty()) {
                ArrayList arrayList3 = new ArrayList();
                for (RowCell rowCell3 : row.getCells(MetadataTableAdminDao.CF_PARENT_LOW_WATERMARKS)) {
                    if (!hashSet.contains(Range.ByteStringRange.toByteStringRange(rowCell3.getQualifier()))) {
                        arrayList3.add(Long.valueOf(Longs.fromByteArray(rowCell3.getValue().toByteArray())));
                        j = Math.max(j, rowCell3.getTimestamp());
                    }
                }
                arrayList.add(new NewPartition(convertNewPartitionRowKeyToPartition, arrayList2, Instant.ofEpochMilli(((Long) Collections.min(arrayList3)).longValue()), TimestampConverter.microsecondToInstant(j)));
            }
        }
        return arrayList;
    }

    public void writeNewPartition(NewPartition newPartition) {
        ByteString convertPartitionToNewPartitionRowKey = convertPartitionToNewPartitionRowKey(newPartition.getPartition());
        Range.ByteStringRange partition = newPartition.getChangeStreamContinuationTokens().get(0).getPartition();
        this.dataClient.mutateRow(RowMutation.create(this.tableId, convertPartitionToNewPartitionRowKey).setCell(MetadataTableAdminDao.CF_INITIAL_TOKEN, Range.ByteStringRange.serializeToByteString(partition), newPartition.getChangeStreamContinuationTokens().get(0).toByteString()).setCell(MetadataTableAdminDao.CF_PARENT_LOW_WATERMARKS, Range.ByteStringRange.serializeToByteString(partition), newPartition.getLowWatermark().getMillis()).deleteCells(MetadataTableAdminDao.CF_SHOULD_DELETE, Range.ByteStringRange.serializeToByteString(partition)));
    }

    public void markNewPartitionForDeletion(NewPartition newPartition) {
        RowMutation create = RowMutation.create(this.tableId, convertPartitionToNewPartitionRowKey(newPartition.getPartition()));
        Iterator<ChangeStreamContinuationToken> it = newPartition.getChangeStreamContinuationTokens().iterator();
        while (it.hasNext()) {
            create.setCell(MetadataTableAdminDao.CF_SHOULD_DELETE, Range.ByteStringRange.serializeToByteString(it.next().getPartition()), 1L);
        }
        this.dataClient.mutateRow(create);
    }

    public boolean deleteNewPartition(NewPartition newPartition) {
        ByteString convertPartitionToNewPartitionRowKey = convertPartitionToNewPartitionRowKey(newPartition.getPartition());
        boolean z = true;
        for (ChangeStreamContinuationToken changeStreamContinuationToken : newPartition.getChangeStreamContinuationTokens()) {
            z = this.dataClient.checkAndMutateRow(ConditionalRowMutation.create(this.tableId, convertPartitionToNewPartitionRowKey).condition(Filters.FILTERS.chain().filter(Filters.FILTERS.family().exactMatch(MetadataTableAdminDao.CF_SHOULD_DELETE)).filter(Filters.FILTERS.qualifier().exactMatch(Range.ByteStringRange.serializeToByteString(changeStreamContinuationToken.getPartition()))).filter(Filters.FILTERS.value().exactMatch(ByteString.copyFrom(Longs.toByteArray(1L))))).then(Mutation.create().deleteCells(MetadataTableAdminDao.CF_INITIAL_TOKEN, Range.ByteStringRange.serializeToByteString(changeStreamContinuationToken.getPartition())).deleteCells(MetadataTableAdminDao.CF_PARENT_LOW_WATERMARKS, Range.ByteStringRange.serializeToByteString(changeStreamContinuationToken.getPartition())).deleteCells(MetadataTableAdminDao.CF_SHOULD_DELETE, Range.ByteStringRange.serializeToByteString(changeStreamContinuationToken.getPartition())))).booleanValue() && z;
        }
        return z;
    }

    public List<StreamPartitionWithWatermark> readStreamPartitionsWithWatermark() throws InvalidProtocolBufferException {
        Instant parseWatermarkFromRow;
        LOG.debug("Reading stream partitions from metadata table: " + getFullStreamPartitionPrefix().toStringUtf8());
        ServerStream readRows = this.dataClient.readRows(Query.create(this.tableId).prefix(getFullStreamPartitionPrefix()).filter(Filters.FILTERS.interleave().filter(Filters.FILTERS.chain().filter(Filters.FILTERS.limit().cellsPerColumn(1)).filter(Filters.FILTERS.family().exactMatch(MetadataTableAdminDao.CF_WATERMARK)).filter(Filters.FILTERS.qualifier().exactMatch(MetadataTableAdminDao.QUALIFIER_DEFAULT))).filter(Filters.FILTERS.chain().filter(Filters.FILTERS.limit().cellsPerColumn(1)).filter(Filters.FILTERS.family().exactMatch(MetadataTableAdminDao.CF_LOCK)).filter(Filters.FILTERS.qualifier().exactMatch(MetadataTableAdminDao.QUALIFIER_DEFAULT)))));
        ArrayList arrayList = new ArrayList();
        Iterator it = readRows.iterator();
        while (it.hasNext()) {
            Row row = (Row) it.next();
            if (MetadataTableEncoder.isRowLocked(row) && (parseWatermarkFromRow = MetadataTableEncoder.parseWatermarkFromRow(row)) != null) {
                arrayList.add(new StreamPartitionWithWatermark(convertStreamPartitionRowKeyToPartition(row.getKey()), parseWatermarkFromRow));
            }
        }
        return arrayList;
    }

    public List<PartitionRecord> readAllStreamPartitions() throws InvalidProtocolBufferException {
        ServerStream readRows = this.dataClient.readRows(Query.create(this.tableId).prefix(getFullStreamPartitionPrefix()));
        ArrayList arrayList = new ArrayList();
        Iterator it = readRows.iterator();
        while (it.hasNext()) {
            Row row = (Row) it.next();
            Instant parseWatermarkFromRow = MetadataTableEncoder.parseWatermarkFromRow(row);
            String parseLockUuid = MetadataTableEncoder.parseLockUuid(row);
            Range.ByteStringRange convertStreamPartitionRowKeyToPartition = convertStreamPartitionRowKeyToPartition(row.getKey());
            String parseTokenFromRow = MetadataTableEncoder.parseTokenFromRow(row);
            List<ChangeStreamContinuationToken> parseInitialContinuationTokens = MetadataTableEncoder.parseInitialContinuationTokens(row);
            if (parseWatermarkFromRow == null) {
                if (parseLockUuid != null) {
                    releaseStreamPartitionLockForDeletion(convertStreamPartitionRowKeyToPartition, parseLockUuid);
                }
                deleteStreamPartitionRow(convertStreamPartitionRowKeyToPartition);
                LOG.error("Cleaning up corrupted StreamPartition {}", ByteStringRangeHelper.formatByteStringRange(convertStreamPartitionRowKeyToPartition));
            } else {
                PartitionRecord partitionRecord = parseTokenFromRow != null ? new PartitionRecord(convertStreamPartitionRowKeyToPartition, (List<ChangeStreamContinuationToken>) Collections.singletonList(ChangeStreamContinuationToken.create(convertStreamPartitionRowKeyToPartition, parseTokenFromRow)), parseWatermarkFromRow, (List<NewPartition>) Collections.emptyList()) : !parseInitialContinuationTokens.isEmpty() ? new PartitionRecord(convertStreamPartitionRowKeyToPartition, parseInitialContinuationTokens, parseWatermarkFromRow, (List<NewPartition>) Collections.emptyList()) : new PartitionRecord(convertStreamPartitionRowKeyToPartition, parseWatermarkFromRow, parseWatermarkFromRow, (List<NewPartition>) Collections.emptyList());
                if (parseLockUuid != null) {
                    partitionRecord.setUuid(parseLockUuid);
                }
                arrayList.add(partitionRecord);
            }
        }
        return arrayList;
    }

    private void writeToMdTableWatermarkHelper(ByteString byteString, Instant instant, @Nullable ChangeStreamContinuationToken changeStreamContinuationToken) {
        RowMutation cell = RowMutation.create(this.tableId, byteString).setCell(MetadataTableAdminDao.CF_WATERMARK, MetadataTableAdminDao.QUALIFIER_DEFAULT, instant.getMillis());
        if (changeStreamContinuationToken != null) {
            cell.setCell(MetadataTableAdminDao.CF_CONTINUATION_TOKEN, MetadataTableAdminDao.QUALIFIER_DEFAULT, changeStreamContinuationToken.getToken());
        }
        this.dataClient.mutateRow(cell);
    }

    public void updateDetectNewPartitionWatermark(Instant instant) {
        writeToMdTableWatermarkHelper(getFullDetectNewPartition(), instant, null);
    }

    public void updateWatermark(Range.ByteStringRange byteStringRange, Instant instant, @Nullable ChangeStreamContinuationToken changeStreamContinuationToken) {
        writeToMdTableWatermarkHelper(convertPartitionToStreamPartitionRowKey(byteStringRange), instant, changeStreamContinuationToken);
    }

    public boolean releaseStreamPartitionLockForDeletion(Range.ByteStringRange byteStringRange, String str) {
        ByteString convertPartitionToStreamPartitionRowKey = convertPartitionToStreamPartitionRowKey(byteStringRange);
        Filters.ChainFilter filter = Filters.FILTERS.chain().filter(Filters.FILTERS.family().exactMatch(MetadataTableAdminDao.CF_LOCK)).filter(Filters.FILTERS.qualifier().exactMatch(MetadataTableAdminDao.QUALIFIER_DEFAULT)).filter(Filters.FILTERS.value().exactMatch(str));
        return this.dataClient.checkAndMutateRow(ConditionalRowMutation.create(this.tableId, convertPartitionToStreamPartitionRowKey).condition(filter).then(Mutation.create().deleteCells(MetadataTableAdminDao.CF_LOCK, MetadataTableAdminDao.QUALIFIER_DEFAULT).setCell(MetadataTableAdminDao.CF_SHOULD_DELETE, MetadataTableAdminDao.QUALIFIER_DEFAULT, 1L))).booleanValue();
    }

    public boolean deleteStreamPartitionRow(Range.ByteStringRange byteStringRange) {
        LOG.debug("Delete metadata row");
        ByteString convertPartitionToStreamPartitionRowKey = convertPartitionToStreamPartitionRowKey(byteStringRange);
        Filters.ChainFilter filter = Filters.FILTERS.chain().filter(Filters.FILTERS.family().exactMatch(MetadataTableAdminDao.CF_SHOULD_DELETE)).filter(Filters.FILTERS.qualifier().exactMatch(MetadataTableAdminDao.QUALIFIER_DEFAULT)).filter(Filters.FILTERS.value().exactMatch(ByteString.copyFrom(Longs.toByteArray(1L))));
        return this.dataClient.checkAndMutateRow(ConditionalRowMutation.create(this.tableId, convertPartitionToStreamPartitionRowKey).condition(filter).then(Mutation.create().deleteRow())).booleanValue();
    }

    public boolean doHoldLock(Range.ByteStringRange byteStringRange, String str) {
        Row readRow = this.dataClient.readRow(this.tableId, convertPartitionToStreamPartitionRowKey(byteStringRange), Filters.FILTERS.chain().filter(Filters.FILTERS.family().exactMatch(MetadataTableAdminDao.CF_LOCK)).filter(Filters.FILTERS.qualifier().exactMatch(MetadataTableAdminDao.QUALIFIER_DEFAULT)).filter(Filters.FILTERS.limit().cellsPerRow(1)));
        if (readRow != null) {
            return ((RowCell) readRow.getCells().get(0)).getValue().toStringUtf8().equals(str);
        }
        return false;
    }

    public boolean lockAndRecordPartition(PartitionRecord partitionRecord) {
        if (doHoldLock(partitionRecord.getPartition(), partitionRecord.getUuid())) {
            return true;
        }
        Mutation deleteCells = Mutation.create().setCell(MetadataTableAdminDao.CF_LOCK, MetadataTableAdminDao.QUALIFIER_DEFAULT, partitionRecord.getUuid()).setCell(MetadataTableAdminDao.CF_WATERMARK, MetadataTableAdminDao.QUALIFIER_DEFAULT, partitionRecord.getParentLowWatermark().getMillis()).deleteCells(MetadataTableAdminDao.CF_SHOULD_DELETE, MetadataTableAdminDao.QUALIFIER_DEFAULT);
        List<ChangeStreamContinuationToken> changeStreamContinuationTokens = partitionRecord.getChangeStreamContinuationTokens();
        if (changeStreamContinuationTokens != null) {
            for (ChangeStreamContinuationToken changeStreamContinuationToken : changeStreamContinuationTokens) {
                deleteCells.setCell(MetadataTableAdminDao.CF_INITIAL_TOKEN, Range.ByteStringRange.serializeToByteString(changeStreamContinuationToken.getPartition()), changeStreamContinuationToken.toByteString());
            }
        }
        return !this.dataClient.checkAndMutateRow(ConditionalRowMutation.create(this.tableId, convertPartitionToStreamPartitionRowKey(partitionRecord.getPartition())).condition(Filters.FILTERS.chain().filter(Filters.FILTERS.family().exactMatch(MetadataTableAdminDao.CF_LOCK)).filter(Filters.FILTERS.qualifier().exactMatch(MetadataTableAdminDao.QUALIFIER_DEFAULT)).filter(Filters.FILTERS.value().regex("\\C*"))).otherwise(deleteCells)).booleanValue();
    }

    public void writeDetectNewPartitionVersion() {
        this.dataClient.mutateRow(RowMutation.create(this.tableId, getFullDetectNewPartition()).setCell(MetadataTableAdminDao.CF_VERSION, MetadataTableAdminDao.QUALIFIER_DEFAULT, 1L));
    }

    public HashMap<Range.ByteStringRange, Instant> readDetectNewPartitionMissingPartitions() {
        HashMap<Range.ByteStringRange, Instant> hashMap = new HashMap<>();
        Row readRow = this.dataClient.readRow(this.tableId, getFullDetectNewPartition(), Filters.FILTERS.chain().filter(Filters.FILTERS.family().exactMatch(MetadataTableAdminDao.CF_MISSING_PARTITIONS)).filter(Filters.FILTERS.qualifier().exactMatch(MetadataTableAdminDao.QUALIFIER_DEFAULT)).filter(Filters.FILTERS.limit().cellsPerColumn(1)));
        if (readRow == null || readRow.getCells(MetadataTableAdminDao.CF_MISSING_PARTITIONS, MetadataTableAdminDao.QUALIFIER_DEFAULT).isEmpty()) {
            return hashMap;
        }
        try {
            hashMap = (HashMap) SerializationUtils.deserialize(((RowCell) readRow.getCells(MetadataTableAdminDao.CF_MISSING_PARTITIONS, MetadataTableAdminDao.QUALIFIER_DEFAULT).get(0)).getValue().toByteArray());
        } catch (SerializationException | NullPointerException e) {
            LOG.warn("Failed to deserialize missingPartitions: {}", e.toString());
        }
        return hashMap;
    }

    public void writeDetectNewPartitionMissingPartitions(HashMap<Range.ByteStringRange, Instant> hashMap) {
        this.dataClient.mutateRow(RowMutation.create(this.tableId, getFullDetectNewPartition()).setCell(MetadataTableAdminDao.CF_MISSING_PARTITIONS, ByteString.copyFromUtf8(MetadataTableAdminDao.QUALIFIER_DEFAULT), ByteString.copyFrom(SerializationUtils.serialize(hashMap))));
    }
}
