/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.oxia.client.notify;

import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.grpc.GrpcResponseStream;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.grpc.OxiaStubManager;
import io.streamnative.oxia.client.metrics.NotificationMetrics;
import io.streamnative.oxia.proto.NotificationBatch;
import io.streamnative.oxia.proto.NotificationType;
import io.streamnative.oxia.proto.NotificationsRequest;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.core.Disposable;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.core.publisher.Flux;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.core.scheduler.Scheduler;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.core.scheduler.Schedulers;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.util.retry.Retry;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.util.retry.RetryBackoffSpec;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShardNotificationReceiver
extends GrpcResponseStream {
    private static final Logger log = LoggerFactory.getLogger(ShardNotificationReceiver.class);
    private final long shardId;
    @NonNull
    private final Consumer<Notification> callback;
    @NonNull
    private final NotificationMetrics metrics;
    @NonNull
    private Optional<Long> startingOffset = Optional.empty();
    private Scheduler scheduler;
    private long offset;

    ShardNotificationReceiver(@NonNull OxiaStub stub, long shardId, @NonNull Consumer<Notification> callback, @NonNull NotificationMetrics metrics) {
        super(stub);
        if (stub == null) {
            throw new NullPointerException("stub is marked non-null but is null");
        }
        if (callback == null) {
            throw new NullPointerException("callback is marked non-null but is null");
        }
        if (metrics == null) {
            throw new NullPointerException("metrics is marked non-null but is null");
        }
        this.shardId = shardId;
        this.callback = callback;
        this.metrics = metrics;
    }

    public void start(@NonNull Optional<Long> offset) {
        if (offset == null) {
            throw new NullPointerException("offset is marked non-null but is null");
        }
        if (offset.isPresent() && offset.get() < 0L) {
            throw new IllegalArgumentException("Invalid offset: " + offset.get());
        }
        this.startingOffset = offset;
        this.start();
    }

    @Override
    public void close() {
        super.close();
        if (this.scheduler != null) {
            this.scheduler.dispose();
        }
    }

    @Override
    @NonNull
    protected CompletableFuture<Void> start(@NonNull OxiaStub stub, @NonNull Consumer<Disposable> consumer) {
        if (stub == null) {
            throw new NullPointerException("stub is marked non-null but is null");
        }
        if (consumer == null) {
            throw new NullPointerException("consumer is marked non-null but is null");
        }
        NotificationsRequest.Builder request = NotificationsRequest.newBuilder().setShardId(this.shardId);
        this.startingOffset.ifPresent(o -> request.setStartOffsetExclusive((long)o));
        RetryBackoffSpec retrySpec = Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100L)).doBeforeRetry(signal -> log.warn("Retrying receiving notifications for shard {}: {}", (Object)this.shardId, signal));
        String threadName = String.format("shard-%s-notifications", this.shardId);
        this.scheduler = Schedulers.newSingle(threadName);
        Disposable disposable = Flux.defer(() -> stub.reactor().getNotifications(request.build())).doOnError(t2 -> log.warn("Error receiving notifications for shard {}", (Object)this.shardId, t2)).doOnEach(this.metrics::recordBatch).retryWhen(retrySpec).repeat().publishOn(this.scheduler).subscribe(this::notify);
        consumer.accept(disposable);
        return CompletableFuture.completedFuture(null);
    }

    private void notify(@NonNull NotificationBatch batch) {
        if (batch == null) {
            throw new NullPointerException("batch is marked non-null but is null");
        }
        this.offset = Math.max(batch.getOffset(), this.offset);
        batch.getNotificationsMap().entrySet().stream().map(e -> {
            String key = (String)e.getKey();
            io.streamnative.oxia.proto.Notification notice = (io.streamnative.oxia.proto.Notification)e.getValue();
            return switch (notice.getType()) {
                case NotificationType.KEY_CREATED -> new Notification.KeyCreated(key, notice.getVersionId());
                case NotificationType.KEY_MODIFIED -> new Notification.KeyModified(key, notice.getVersionId());
                case NotificationType.KEY_DELETED -> new Notification.KeyDeleted(key);
                default -> null;
            };
        }).filter(x$0 -> Objects.nonNull(x$0)).forEach(x$0 -> this.callback.accept((Notification)x$0));
    }

    public long getOffset() {
        return this.offset;
    }

    long getShardId() {
        return this.shardId;
    }

    static class Factory {
        @NonNull
        private final OxiaStubManager stubManager;
        @NonNull
        private final Consumer<Notification> callback;

        @NonNull
        ShardNotificationReceiver newReceiver(long shardId, @NonNull String leader, @NonNull NotificationMetrics metrics) {
            if (leader == null) {
                throw new NullPointerException("leader is marked non-null but is null");
            }
            if (metrics == null) {
                throw new NullPointerException("metrics is marked non-null but is null");
            }
            return new ShardNotificationReceiver(this.stubManager.getStub(leader), shardId, this.callback, metrics);
        }

        Factory(@NonNull OxiaStubManager stubManager, @NonNull Consumer<Notification> callback) {
            if (stubManager == null) {
                throw new NullPointerException("stubManager is marked non-null but is null");
            }
            if (callback == null) {
                throw new NullPointerException("callback is marked non-null but is null");
            }
            this.stubManager = stubManager;
            this.callback = callback;
        }
    }
}

