/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.oxia.client.notify;

import io.streamnative.oxia.client.CompositeConsumer;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.grpc.GrpcResponseStream;
import io.streamnative.oxia.client.grpc.OxiaStubManager;
import io.streamnative.oxia.client.metrics.NotificationMetrics;
import io.streamnative.oxia.client.metrics.api.Metrics;
import io.streamnative.oxia.client.notify.ShardNotificationReceiver;
import io.streamnative.oxia.client.shard.ShardManager;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationManager
implements AutoCloseable,
Consumer<ShardManager.ShardAssignmentChanges> {
    private static final Logger log = LoggerFactory.getLogger(NotificationManager.class);
    private final ConcurrentMap<Long, ShardNotificationReceiver> shardReceivers = new ConcurrentHashMap<Long, ShardNotificationReceiver>();
    @NonNull
    private final ShardNotificationReceiver.Factory recieverFactory;
    @NonNull
    private final ShardManager shardManager;
    @NonNull
    private final CompositeConsumer<Notification> compositeCallback;
    @NonNull
    private final NotificationMetrics metrics;
    private volatile boolean started = false;
    private volatile boolean closed = false;

    public NotificationManager(@NonNull OxiaStubManager stubManager, @NonNull ShardManager shardManager, @NonNull Metrics metrics) {
        if (stubManager == null) {
            throw new NullPointerException("stubManager is marked non-null but is null");
        }
        if (shardManager == null) {
            throw new NullPointerException("shardManager is marked non-null but is null");
        }
        if (metrics == null) {
            throw new NullPointerException("metrics is marked non-null but is null");
        }
        this.compositeCallback = new CompositeConsumer();
        this.recieverFactory = new ShardNotificationReceiver.Factory(stubManager, this.compositeCallback);
        this.shardManager = shardManager;
        this.metrics = NotificationMetrics.create(metrics);
    }

    @Override
    public void accept(@NonNull ShardManager.ShardAssignmentChanges changes) {
        if (changes == null) {
            throw new NullPointerException("changes is marked non-null but is null");
        }
        if (!this.started || this.closed) {
            return;
        }
        this.connectNotificationReceivers(changes);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerCallback(@NonNull Consumer<Notification> callback) {
        if (callback == null) {
            throw new NullPointerException("callback is marked non-null but is null");
        }
        if (this.closed) {
            throw new IllegalStateException("Notification manager has been closed");
        }
        this.compositeCallback.add(callback);
        if (!this.started) {
            NotificationManager notificationManager = this;
            synchronized (notificationManager) {
                if (!this.started) {
                    this.bootstrap();
                    this.started = true;
                }
            }
        }
    }

    private void bootstrap() {
        this.connectNotificationReceivers(new ShardManager.ShardAssignmentChanges(this.shardManager.getAll().stream().map(s2 -> new ShardManager.ShardAssignmentChange.Added((long)s2, this.shardManager.leader((long)s2))).collect(Collectors.toSet()), Set.of(), Set.of()));
    }

    private void connectNotificationReceivers(@NonNull ShardManager.ShardAssignmentChanges changes) {
        if (changes == null) {
            throw new NullPointerException("changes is marked non-null but is null");
        }
        changes.removed().forEach(s2 -> ((ShardNotificationReceiver)this.shardReceivers.remove(s2.shardId())).close());
        changes.added().forEach(s2 -> this.shardReceivers.computeIfAbsent(s2.shardId(), id -> this.recieverFactory.newReceiver((long)id, s2.leader(), this.metrics)).start());
        changes.reassigned().forEach(s2 -> {
            Optional<ShardNotificationReceiver> receiver = Optional.ofNullable((ShardNotificationReceiver)this.shardReceivers.remove(s2.shardId()));
            receiver.ifPresent(GrpcResponseStream::close);
            this.shardReceivers.computeIfAbsent(s2.shardId(), id -> this.recieverFactory.newReceiver((long)id, s2.toLeader(), this.metrics)).start(receiver.map(ShardNotificationReceiver::getOffset));
        });
    }

    @Override
    public void close() throws Exception {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.shardReceivers.values().parallelStream().forEach(GrpcResponseStream::close);
    }

    NotificationManager(@NonNull ShardNotificationReceiver.Factory recieverFactory, @NonNull ShardManager shardManager, @NonNull CompositeConsumer<Notification> compositeCallback, @NonNull NotificationMetrics metrics) {
        if (recieverFactory == null) {
            throw new NullPointerException("recieverFactory is marked non-null but is null");
        }
        if (shardManager == null) {
            throw new NullPointerException("shardManager is marked non-null but is null");
        }
        if (compositeCallback == null) {
            throw new NullPointerException("compositeCallback is marked non-null but is null");
        }
        if (metrics == null) {
            throw new NullPointerException("metrics is marked non-null but is null");
        }
        this.recieverFactory = recieverFactory;
        this.shardManager = shardManager;
        this.compositeCallback = compositeCallback;
        this.metrics = metrics;
    }
}

