/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.oxia.client;

import io.streamnative.oxia.client.ClientConfig;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.DeleteOption;
import io.streamnative.oxia.client.api.GetResult;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.api.PutOption;
import io.streamnative.oxia.client.api.PutResult;
import io.streamnative.oxia.client.batch.BatchManager;
import io.streamnative.oxia.client.batch.Operation;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.grpc.OxiaStubManager;
import io.streamnative.oxia.client.metrics.BatchMetrics;
import io.streamnative.oxia.client.metrics.OperationMetrics;
import io.streamnative.oxia.client.notify.NotificationManager;
import io.streamnative.oxia.client.session.SessionManager;
import io.streamnative.oxia.client.shard.ShardManager;
import io.streamnative.oxia.proto.ListRequest;
import io.streamnative.oxia.proto.ListResponse;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.core.publisher.Flux;
import java.time.Clock;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;

class AsyncOxiaClientImpl
implements AsyncOxiaClient {
    @NonNull
    private final OxiaStubManager stubManager;
    @NonNull
    private final ShardManager shardManager;
    @NonNull
    private final NotificationManager notificationManager;
    @NonNull
    private final BatchManager readBatchManager;
    @NonNull
    private final BatchManager writeBatchManager;
    @NonNull
    private final SessionManager sessionManager;
    @NonNull
    private final OperationMetrics metrics;
    private final AtomicLong sequence = new AtomicLong();
    private volatile boolean closed;

    @NonNull
    static CompletableFuture<AsyncOxiaClient> newInstance(@NonNull ClientConfig config) {
        if (config == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        OxiaStubManager stubManager = new OxiaStubManager();
        OxiaStub serviceAddrStub = stubManager.getStub(config.serviceAddress());
        ShardManager shardManager = new ShardManager(serviceAddrStub, config.metrics(), config.namespace());
        NotificationManager notificationManager = new NotificationManager(stubManager, shardManager, config.metrics());
        Function<Long, String> leaderFn = shardManager::leader;
        Function<Long, OxiaStub> stubByShardId = leaderFn.andThen(stubManager::getStub);
        shardManager.addCallback(notificationManager);
        BatchMetrics batchMetrics = BatchMetrics.create(Clock.systemUTC(), config.metrics());
        BatchManager readBatchManager = BatchManager.newReadBatchManager(config, stubByShardId, batchMetrics);
        SessionManager sessionManager = new SessionManager(config, stubByShardId);
        shardManager.addCallback(sessionManager);
        BatchManager writeBatchManager = BatchManager.newWriteBatchManager(config, stubByShardId, sessionManager, batchMetrics);
        OperationMetrics operationMetrics = OperationMetrics.create(Clock.systemUTC(), config.metrics());
        AsyncOxiaClientImpl client = new AsyncOxiaClientImpl(stubManager, shardManager, notificationManager, readBatchManager, writeBatchManager, sessionManager, operationMetrics);
        return shardManager.start().thenApply(v -> client);
    }

    @Override
    @NonNull
    public CompletableFuture<PutResult> put(String key, byte[] value, PutOption ... options) {
        OperationMetrics.Sample<PutResult> sample = this.metrics.recordPut(value == null ? 0L : (long)value.length);
        CompletableFuture<PutResult> callback = new CompletableFuture<PutResult>();
        try {
            this.checkIfClosed();
            Objects.requireNonNull(key);
            Objects.requireNonNull(value);
            Set<PutOption> validatedOptions = PutOption.validate(options);
            long shardId = this.shardManager.get(key);
            Optional<Long> versionId = PutOption.toVersionId(validatedOptions);
            Operation.WriteOperation.PutOperation op = new Operation.WriteOperation.PutOperation(this.sequence.getAndIncrement(), callback, key, value, versionId, PutOption.toEphemeral(validatedOptions));
            this.writeBatchManager.getBatcher(shardId).add(op);
        }
        catch (RuntimeException e) {
            callback.completeExceptionally(e);
        }
        return callback.whenComplete(sample::stop);
    }

    @Override
    @NonNull
    public CompletableFuture<Boolean> delete(String key, DeleteOption ... options) {
        OperationMetrics.Sample<Boolean> sample = this.metrics.recordDelete();
        CompletableFuture<Boolean> callback = new CompletableFuture<Boolean>();
        try {
            this.checkIfClosed();
            Objects.requireNonNull(key);
            Set<DeleteOption> validatedOptions = DeleteOption.validate(options);
            long shardId = this.shardManager.get(key);
            Optional<Long> versionId = DeleteOption.toVersionId(validatedOptions);
            this.writeBatchManager.getBatcher(shardId).add(new Operation.WriteOperation.DeleteOperation(this.sequence.getAndIncrement(), callback, key, versionId));
        }
        catch (RuntimeException e) {
            callback.completeExceptionally(e);
        }
        return callback.whenComplete(sample::stop);
    }

    @Override
    @NonNull
    public CompletableFuture<Void> deleteRange(String startKeyInclusive, String endKeyExclusive) {
        CompletableFuture<Object> callback;
        OperationMetrics.Sample<Void> sample = this.metrics.recordDeleteRange();
        try {
            this.checkIfClosed();
            Objects.requireNonNull(startKeyInclusive);
            Objects.requireNonNull(endKeyExclusive);
            CompletableFuture[] shardDeletes = this.shardManager.getAll().stream().map(this.writeBatchManager::getBatcher).map(b -> {
                CompletableFuture<Void> shardCallback = new CompletableFuture<Void>();
                b.add(new Operation.WriteOperation.DeleteRangeOperation(this.sequence.getAndIncrement(), shardCallback, startKeyInclusive, endKeyExclusive));
                return shardCallback;
            }).collect(Collectors.toList()).toArray(new CompletableFuture[0]);
            callback = CompletableFuture.allOf(shardDeletes);
        }
        catch (RuntimeException e) {
            callback = CompletableFuture.failedFuture(e);
        }
        return callback.whenComplete(sample::stop);
    }

    @Override
    @NonNull
    public CompletableFuture<GetResult> get(String key) {
        OperationMetrics.Sample<GetResult> sample = this.metrics.recordGet();
        CompletableFuture<GetResult> callback = new CompletableFuture<GetResult>();
        try {
            this.checkIfClosed();
            Objects.requireNonNull(key);
            long shardId = this.shardManager.get(key);
            this.readBatchManager.getBatcher(shardId).add(new Operation.ReadOperation.GetOperation(this.sequence.getAndIncrement(), callback, key));
        }
        catch (RuntimeException e) {
            callback.completeExceptionally(e);
        }
        return callback.whenComplete(sample::stop);
    }

    @Override
    @NonNull
    public CompletableFuture<List<String>> list(String startKeyInclusive, String endKeyExclusive) {
        CompletableFuture<Object> callback;
        OperationMetrics.Sample<List<String>> sample = this.metrics.recordList();
        try {
            this.checkIfClosed();
            Objects.requireNonNull(startKeyInclusive);
            Objects.requireNonNull(endKeyExclusive);
            callback = Flux.fromIterable(this.shardManager.getAll()).flatMap(shardId -> this.list((long)shardId, startKeyInclusive, endKeyExclusive)).collectList().toFuture();
        }
        catch (Exception e) {
            callback = CompletableFuture.failedFuture(e);
        }
        return callback.whenComplete(sample::stop);
    }

    @Override
    public void notifications(@NonNull Consumer<Notification> notificationCallback) {
        if (notificationCallback == null) {
            throw new NullPointerException("notificationCallback is marked non-null but is null");
        }
        this.checkIfClosed();
        this.notificationManager.registerCallback(notificationCallback);
    }

    @NonNull
    private Flux<String> list(long shardId, @NonNull String startKeyInclusive, @NonNull String endKeyExclusive) {
        if (startKeyInclusive == null) {
            throw new NullPointerException("startKeyInclusive is marked non-null but is null");
        }
        if (endKeyExclusive == null) {
            throw new NullPointerException("endKeyExclusive is marked non-null but is null");
        }
        String leader = this.shardManager.leader(shardId);
        OxiaStub stub = this.stubManager.getStub(leader);
        ListRequest request = ListRequest.newBuilder().setShardId(shardId).setStartInclusive(startKeyInclusive).setEndExclusive(endKeyExclusive).build();
        return stub.reactor().list(request).flatMapIterable(ListResponse::getKeysList);
    }

    @Override
    public void close() throws Exception {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.readBatchManager.close();
        this.writeBatchManager.close();
        this.sessionManager.close();
        this.notificationManager.close();
        this.shardManager.close();
        this.stubManager.close();
    }

    private void checkIfClosed() {
        if (this.closed) {
            throw new IllegalStateException("Client has been closed");
        }
    }

    AsyncOxiaClientImpl(@NonNull OxiaStubManager stubManager, @NonNull ShardManager shardManager, @NonNull NotificationManager notificationManager, @NonNull BatchManager readBatchManager, @NonNull BatchManager writeBatchManager, @NonNull SessionManager sessionManager, @NonNull OperationMetrics metrics) {
        if (stubManager == null) {
            throw new NullPointerException("stubManager is marked non-null but is null");
        }
        if (shardManager == null) {
            throw new NullPointerException("shardManager is marked non-null but is null");
        }
        if (notificationManager == null) {
            throw new NullPointerException("notificationManager is marked non-null but is null");
        }
        if (readBatchManager == null) {
            throw new NullPointerException("readBatchManager is marked non-null but is null");
        }
        if (writeBatchManager == null) {
            throw new NullPointerException("writeBatchManager is marked non-null but is null");
        }
        if (sessionManager == null) {
            throw new NullPointerException("sessionManager is marked non-null but is null");
        }
        if (metrics == null) {
            throw new NullPointerException("metrics is marked non-null but is null");
        }
        this.stubManager = stubManager;
        this.shardManager = shardManager;
        this.notificationManager = notificationManager;
        this.readBatchManager = readBatchManager;
        this.writeBatchManager = writeBatchManager;
        this.sessionManager = sessionManager;
        this.metrics = metrics;
    }
}

