/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.oxia.client.notify;

import io.streamnative.oxia.client.CompositeConsumer;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.grpc.OxiaStubManager;
import io.streamnative.oxia.client.notify.NotificationManager;
import io.streamnative.oxia.client.util.Backoff;
import io.streamnative.oxia.proto.NotificationBatch;
import io.streamnative.oxia.proto.NotificationsRequest;
import io.streamnative.pulsarmetadatastoreoxia.shaded.io.grpc.stub.StreamObserver;
import java.io.Closeable;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShardNotificationReceiver
implements Closeable,
StreamObserver<NotificationBatch> {
    private static final Logger log = LoggerFactory.getLogger(ShardNotificationReceiver.class);
    private final OxiaStub stub;
    private final NotificationManager notificationManager;
    private final long shardId;
    @NonNull
    private final Consumer<Notification> callback;
    @NonNull
    private volatile OptionalLong offset;
    private volatile boolean closed = false;
    private final Backoff backoff = new Backoff();

    ShardNotificationReceiver(@NonNull OxiaStub stub, long shardId, @NonNull Consumer<Notification> callback, NotificationManager notificationManager, @NonNull OptionalLong offset) {
        if (stub == null) {
            throw new NullPointerException("stub is marked non-null but is null");
        }
        if (callback == null) {
            throw new NullPointerException("callback is marked non-null but is null");
        }
        if (offset == null) {
            throw new NullPointerException("offset is marked non-null but is null");
        }
        this.stub = stub;
        this.notificationManager = notificationManager;
        this.shardId = shardId;
        this.callback = callback;
        this.offset = offset;
        this.start();
    }

    void start() {
        NotificationsRequest.Builder request = NotificationsRequest.newBuilder().setShardId(this.shardId);
        this.offset.ifPresent(request::setStartOffsetExclusive);
        this.stub.async().getNotifications(request.build(), this);
    }

    @Override
    public void onNext(NotificationBatch batch) {
        if (this.offset.isPresent() && this.offset.getAsLong() >= batch.getOffset()) {
            return;
        }
        this.offset = OptionalLong.of(batch.getOffset());
        this.notificationManager.getCounterNotificationsBatchesReceived().increment();
        this.notificationManager.getCounterNotificationsReceived().add(batch.getNotificationsCount());
        batch.getNotificationsMap().forEach((key, notification) -> {
            Record n;
            if (log.isDebugEnabled()) {
                log.debug("--- Got notification: {} - {}", key, (Object)notification.getType());
            }
            switch (notification.getType()) {
                default: {
                    throw new IncompatibleClassChangeError();
                }
                case KEY_CREATED: {
                    Record record = new Notification.KeyCreated((String)key, notification.getVersionId());
                    break;
                }
                case KEY_MODIFIED: {
                    Record record = new Notification.KeyModified((String)key, notification.getVersionId());
                    break;
                }
                case KEY_DELETED: {
                    Record record = new Notification.KeyDeleted((String)key);
                    break;
                }
                case UNRECOGNIZED: {
                    Record record = n = null;
                }
            }
            if (n != null) {
                this.callback.accept((Notification)((Object)n));
            }
        });
    }

    @Override
    public void onError(Throwable t2) {
        if (this.closed) {
            return;
        }
        long retryDelayMillis = this.backoff.nextDelayMillis();
        log.warn("Error while receiving notifications for shard={}: {} - Retrying in {} seconds", this.shardId, t2.getMessage(), (double)retryDelayMillis / 1000.0);
        this.notificationManager.getExecutor().schedule(() -> {
            if (!this.closed) {
                log.info("Retrying getting notifications for shard={}", (Object)this.shardId);
                this.start();
            }
        }, retryDelayMillis, TimeUnit.MILLISECONDS);
    }

    @Override
    public void onCompleted() {
        if (!this.closed) {
            this.start();
        }
    }

    @Override
    public void close() {
        this.closed = true;
    }

    long getShardId() {
        return this.shardId;
    }

    @NonNull
    public OptionalLong getOffset() {
        return this.offset;
    }

    static class Factory {
        @NonNull
        private final OxiaStubManager stubManager;
        @NonNull
        private final CompositeConsumer<Notification> callback = new CompositeConsumer();

        @NonNull
        ShardNotificationReceiver newReceiver(long shardId, @NonNull String leader, @NonNull NotificationManager notificationManager, @NonNull OptionalLong offset) {
            if (leader == null) {
                throw new NullPointerException("leader is marked non-null but is null");
            }
            if (notificationManager == null) {
                throw new NullPointerException("notificationManager is marked non-null but is null");
            }
            if (offset == null) {
                throw new NullPointerException("offset is marked non-null but is null");
            }
            return new ShardNotificationReceiver(this.stubManager.getStub(leader), shardId, this.callback, notificationManager, offset);
        }

        Factory(@NonNull OxiaStubManager stubManager) {
            if (stubManager == null) {
                throw new NullPointerException("stubManager is marked non-null but is null");
            }
            this.stubManager = stubManager;
        }

        @NonNull
        public CompositeConsumer<Notification> getCallback() {
            return this.callback;
        }
    }
}

