package org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.impl.metadata.stream;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableList;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.op.CompareResult;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.op.Op;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.op.TxnOp;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.result.Result;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.Bytes;
import org.apache.pulsar.shade.org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.proto.RangeMetadata;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.proto.RangeProperties;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.proto.RangeState;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.proto.StreamConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.proto.StreamMetadata;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.proto.StreamProperties;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.protocol.util.ProtoUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.api.metadata.stream.MetaRange;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.exceptions.DataRangeNotFoundException;
import org.apache.pulsar.shade.org.tukaani.xz.common.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/shade/org/apache/bookkeeper/stream/storage/impl/metadata/stream/MetaRangeImpl.class */
public class MetaRangeImpl implements MetaRange {
    private static final Logger log = LoggerFactory.getLogger(MetaRangeImpl.class);
    private static final byte METADATA_SEP = 1;
    private static final byte RANGE_SEP = 2;
    private static final byte END_SEP = -1;
    private final MVCCAsyncStore<byte[], byte[]> store;
    private final ExecutorService executor;
    private final StorageContainerPlacementPolicy placementPolicy;

    @GuardedBy("this")
    private long streamId;

    @GuardedBy("this")
    private StreamProperties streamProps;
    private StreamMetadata.LifecycleState lifecycleState;
    private StreamMetadata.ServingState servingState;
    private long cTime;
    private long mTime;
    private long nextRangeId;
    private final NavigableMap<Long, RangeMetadata> ranges;
    private final List<Long> currentRanges;
    private long revision;

    public static final byte[] getStreamMetadataKey(long j) {
        byte[] bArr = new byte[9];
        Bytes.toBytes(j, bArr, 0);
        bArr[8] = 1;
        return bArr;
    }

    public static final byte[] getStreamRangeKey(long j, long j2) {
        byte[] bArr = new byte[17];
        Bytes.toBytes(j, bArr, 0);
        bArr[8] = 2;
        Bytes.toBytes(j2, bArr, 9);
        return bArr;
    }

    public static final byte[] getStreamMetadataEndKey(long j) {
        byte[] bArr = new byte[9];
        Bytes.toBytes(j, bArr, 0);
        bArr[8] = -1;
        return bArr;
    }

    public static final boolean isMetadataKey(byte[] bArr) {
        return bArr.length == 9 && bArr[8] == 1;
    }

    static final boolean isStreamRangeKey(byte[] bArr) {
        return bArr.length == 17 && bArr[8] == 2;
    }

    public MetaRangeImpl(MVCCAsyncStore<byte[], byte[]> mVCCAsyncStore, ExecutorService executorService, StorageContainerPlacementPolicy storageContainerPlacementPolicy) {
        this(mVCCAsyncStore, executorService, storageContainerPlacementPolicy, Maps.newTreeMap(), Lists.newArrayList(), StreamMetadata.newBuilder().setLifecycleState(StreamMetadata.LifecycleState.UNINIT).setServingState(StreamMetadata.ServingState.WRITABLE).setNextRangeId(1024L).build(), 0L, 0L);
    }

    private MetaRangeImpl(MVCCAsyncStore<byte[], byte[]> mVCCAsyncStore, ExecutorService executorService, StorageContainerPlacementPolicy storageContainerPlacementPolicy, NavigableMap<Long, RangeMetadata> navigableMap, List<Long> list, StreamMetadata streamMetadata, long j, long j2) {
        this.lifecycleState = StreamMetadata.LifecycleState.UNINIT;
        this.servingState = StreamMetadata.ServingState.WRITABLE;
        this.cTime = 0L;
        this.mTime = 0L;
        this.nextRangeId = 1024L;
        this.store = mVCCAsyncStore;
        this.executor = executorService;
        this.placementPolicy = storageContainerPlacementPolicy;
        this.ranges = navigableMap;
        this.currentRanges = list;
        this.cTime = j;
        this.mTime = j2;
        this.streamProps = streamMetadata.getProps();
        this.streamId = this.streamProps.getStreamId();
        this.lifecycleState = streamMetadata.getLifecycleState();
        this.servingState = streamMetadata.getServingState();
        this.nextRangeId = streamMetadata.getNextRangeId();
    }

    @VisibleForTesting
    public long unsafeGetCreationTime() {
        return this.cTime;
    }

    @VisibleForTesting
    public long unsafeGetModificationTime() {
        return this.mTime;
    }

    @VisibleForTesting
    public StreamMetadata.LifecycleState unsafeGetLifecycleState() {
        return this.lifecycleState;
    }

    @VisibleForTesting
    public synchronized StreamProperties unsafeGetStreamProperties() {
        return this.streamProps;
    }

    @VisibleForTesting
    public synchronized long unsafeGetStreamId() {
        return this.streamId;
    }

    @VisibleForTesting
    public NavigableMap<Long, RangeMetadata> unsafeGetRanges() {
        return this.ranges;
    }

    @VisibleForTesting
    private synchronized StreamMetadata toStreamMetadata(StreamMetadata.LifecycleState lifecycleState) {
        return StreamMetadata.newBuilder().setProps(this.streamProps).setLifecycleState(lifecycleState).setServingState(this.servingState).setNextRangeId(this.nextRangeId).setCTime(this.cTime).setMTime(this.mTime).addAllCurrentRanges(this.currentRanges).build();
    }

    private synchronized StreamMetadata toStreamMetadata(StreamMetadata.ServingState servingState, long j) {
        return StreamMetadata.newBuilder().setProps(this.streamProps).setLifecycleState(this.lifecycleState).setServingState(servingState).setNextRangeId(this.nextRangeId).setCTime(this.cTime).setMTime(j).addAllCurrentRanges(this.currentRanges).build();
    }

    @VisibleForTesting
    public List<Long> unsafeGetCurrentRanges() {
        return this.currentRanges;
    }

    private <T> CompletableFuture<T> checkStreamCreated(Supplier<CompletableFuture<T>> supplier) {
        CompletableFuture<T> createFuture = FutureUtils.createFuture();
        this.executor.submit(() -> {
            try {
                if (!ProtoUtils.isStreamCreated(this.lifecycleState)) {
                    throw new IllegalStateException("Stream isn't created yet.");
                }
                ((CompletableFuture) supplier.get()).thenApplyAsync(obj -> {
                    return Boolean.valueOf(createFuture.complete(obj));
                }, (Executor) this.executor).exceptionally(th -> {
                    return Boolean.valueOf(createFuture.completeExceptionally(th));
                });
            } catch (Throwable th2) {
                createFuture.completeExceptionally(th2);
            }
        });
        return createFuture;
    }

    private void checkLifecycleState(StreamMetadata.LifecycleState lifecycleState) {
        Preconditions.checkState(lifecycleState == this.lifecycleState, "Unexpected state " + this.lifecycleState + ", expected to be " + lifecycleState);
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.api.metadata.stream.MetaRange
    public synchronized String getName() {
        Preconditions.checkState(null != this.streamProps);
        return this.streamProps.getStreamName();
    }

    private <T> CompletableFuture<T> executeTask(Consumer<CompletableFuture<T>> consumer) {
        CompletableFuture<T> createFuture = FutureUtils.createFuture();
        this.executor.submit(() -> {
            try {
                consumer.accept(createFuture);
            } catch (Throwable th) {
                createFuture.completeExceptionally(th);
            }
        });
        return createFuture;
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.api.metadata.stream.MetaRange
    public CompletableFuture<Boolean> create(StreamProperties streamProperties) {
        return executeTask(completableFuture -> {
            unsafeCreate(completableFuture, streamProperties);
        });
    }

    private void unsafeCreate(CompletableFuture<Boolean> completableFuture, StreamProperties streamProperties) {
        checkLifecycleState(StreamMetadata.LifecycleState.UNINIT);
        this.lifecycleState = StreamMetadata.LifecycleState.CREATING;
        synchronized (this) {
            this.streamProps = streamProperties;
        }
        this.streamId = streamProperties.getStreamId();
        long currentTimeMillis = System.currentTimeMillis();
        this.mTime = currentTimeMillis;
        this.cTime = currentTimeMillis;
        List<RangeProperties> split = ProtoUtils.split(this.streamId, streamProperties.getStreamConf().getInitialNumRanges(), this.nextRangeId, this.placementPolicy);
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(split.size() + 1);
        for (RangeProperties rangeProperties : split) {
            RangeMetadata build = RangeMetadata.newBuilder().setProps(rangeProperties).setCreateTime(this.cTime).setFenceTime(Util.VLI_MAX).setState(RangeState.RANGE_ACTIVE).addAllParents(Lists.newArrayList()).build();
            this.ranges.put(Long.valueOf(rangeProperties.getRangeId()), build);
            this.currentRanges.add(Long.valueOf(rangeProperties.getRangeId()));
            newArrayListWithExpectedSize.add(this.store.newPut(getStreamRangeKey(this.streamId, rangeProperties.getRangeId()), build.toByteArray()));
        }
        this.nextRangeId += split.size();
        byte[] streamMetadataKey = getStreamMetadataKey(this.streamId);
        newArrayListWithExpectedSize.add(this.store.newPut(streamMetadataKey, toStreamMetadata(StreamMetadata.LifecycleState.CREATED).toByteArray()));
        TxnOp<byte[], byte[]> build2 = this.store.newTxn().If(this.store.newCompareValue(CompareResult.EQUAL, streamMetadataKey, null)).Then((Op[]) newArrayListWithExpectedSize.toArray(new Op[newArrayListWithExpectedSize.size()])).build();
        if (log.isTraceEnabled()) {
            log.trace("Execute create stream metadata range txn {}", streamProperties);
        }
        this.store.txn(build2).thenApplyAsync(txnResult -> {
            try {
                if (log.isTraceEnabled()) {
                    log.trace("Create stream metadata range txn result = {}", Boolean.valueOf(txnResult.isSuccess()));
                }
                if (txnResult.isSuccess()) {
                    List results = txnResult.results();
                    this.revision = ((Result) results.get(results.size() - 1)).revision();
                    this.lifecycleState = StreamMetadata.LifecycleState.CREATED;
                    completableFuture.complete(true);
                } else {
                    completableFuture.complete(false);
                }
                return null;
            } finally {
                txnResult.close();
            }
        }, (Executor) this.executor).exceptionally((Function<Throwable, ? extends U>) th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.api.metadata.stream.MetaRange
    public CompletableFuture<MetaRange> load(long j) {
        return this.store.range(getStreamMetadataKey(j), getStreamMetadataEndKey(j)).thenApplyAsync(list -> {
            if (list.isEmpty()) {
                return null;
            }
            loadMetadata(list);
            return this;
        }, (Executor) this.executor);
    }

    private void loadMetadata(List<KeyValue<byte[], byte[]>> list) {
        for (KeyValue<byte[], byte[]> keyValue : list) {
            if (isMetadataKey(keyValue.key())) {
                this.revision = keyValue.modifiedRevision();
                loadStreamMetadata(Bytes.toLong(keyValue.key(), 0), keyValue.value());
            } else if (isStreamRangeKey(keyValue.key())) {
                loadRangeMetadata(Bytes.toLong(keyValue.key(), 0), Bytes.toLong(keyValue.key(), 9), keyValue.value());
            }
        }
    }

    private void loadStreamMetadata(long j, byte[] bArr) {
        this.streamId = j;
        try {
            StreamMetadata parseFrom = StreamMetadata.parseFrom(bArr);
            this.streamProps = parseFrom.getProps();
            this.lifecycleState = parseFrom.getLifecycleState();
            this.servingState = parseFrom.getServingState();
            this.currentRanges.clear();
            this.currentRanges.addAll(parseFrom.getCurrentRangesList());
            this.nextRangeId = parseFrom.getNextRangeId();
            this.cTime = parseFrom.getCTime();
            this.mTime = parseFrom.getMTime();
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException("Invalid stream metadata of stream " + j, e);
        }
    }

    private void loadRangeMetadata(long j, long j2, byte[] bArr) {
        Preconditions.checkArgument(this.streamId == j);
        Preconditions.checkArgument(j2 >= 0);
        try {
            this.ranges.put(Long.valueOf(j2), RangeMetadata.parseFrom(bArr));
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException("Invalid range metadata of range (" + j + ", " + j2 + ")", e);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.api.metadata.stream.MetaRange
    public CompletableFuture<Boolean> delete(long j) {
        return this.store.deleteRange(getStreamMetadataKey(j), getStreamMetadataEndKey(j)).thenApplyAsync(list -> {
            return Boolean.valueOf(!list.isEmpty());
        }, (Executor) this.executor);
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.api.metadata.stream.MetaRange
    public CompletableFuture<StreamMetadata.ServingState> getServingState() {
        return checkStreamCreated(() -> {
            return FutureUtils.value(this.servingState);
        });
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.api.metadata.stream.MetaRange
    public CompletableFuture<StreamMetadata.ServingState> updateServingState(StreamMetadata.ServingState servingState) {
        return checkStreamCreated(() -> {
            long currentTimeMillis = System.currentTimeMillis();
            return this.store.rPut(getStreamMetadataKey(this.streamId), toStreamMetadata(servingState, currentTimeMillis).toByteArray(), this.revision).thenApplyAsync(l -> {
                this.servingState = servingState;
                this.mTime = currentTimeMillis;
                this.revision = l.longValue();
                return servingState;
            }, (Executor) this.executor);
        });
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.api.metadata.stream.MetaRange
    public CompletableFuture<StreamConfiguration> getConfiguration() {
        return checkStreamCreated(() -> {
            return FutureUtils.value(unsafeGetStreamProperties().getStreamConf());
        });
    }

    private RangeMetadata unsafeGetDataRange(long j) throws DataRangeNotFoundException {
        RangeMetadata rangeMetadata = (RangeMetadata) this.ranges.get(Long.valueOf(j));
        if (null == rangeMetadata) {
            throw new DataRangeNotFoundException(this.streamId, j);
        }
        return rangeMetadata;
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.stream.storage.api.metadata.stream.MetaRange
    public CompletableFuture<List<RangeMetadata>> getActiveRanges() {
        return checkStreamCreated(() -> {
            ImmutableList copyOf = ImmutableList.copyOf((Collection) this.currentRanges);
            ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(copyOf.size());
            Iterator<E> it = copyOf.iterator();
            while (it.hasNext()) {
                newArrayListWithExpectedSize.add(unsafeGetDataRange(((Long) it.next()).longValue()));
            }
            return FutureUtils.value(newArrayListWithExpectedSize);
        });
    }
}
