/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.oxia.client.notify;

import io.opentelemetry.api.common.Attributes;
import io.streamnative.oxia.client.CompositeConsumer;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.grpc.OxiaStubManager;
import io.streamnative.oxia.client.metrics.Counter;
import io.streamnative.oxia.client.metrics.InstrumentProvider;
import io.streamnative.oxia.client.metrics.Unit;
import io.streamnative.oxia.client.notify.ShardNotificationReceiver;
import io.streamnative.oxia.client.shard.ShardManager;
import java.util.Collections;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationManager
implements AutoCloseable,
Consumer<ShardManager.ShardAssignmentChanges> {
    private static final Logger log = LoggerFactory.getLogger(NotificationManager.class);
    private final ConcurrentMap<Long, ShardNotificationReceiver> shardReceivers = new ConcurrentHashMap<Long, ShardNotificationReceiver>();
    @NonNull
    private final ShardNotificationReceiver.Factory receiverFactory;
    @NonNull
    private final ShardManager shardManager;
    private final CompositeConsumer<Notification> compositeCallback;
    private final ScheduledExecutorService executor;
    private volatile boolean started = false;
    private volatile boolean closed = false;
    private final Counter counterNotificationsReceived;
    private final Counter counterNotificationsBatchesReceived;

    public NotificationManager(@NonNull ScheduledExecutorService executor, @NonNull OxiaStubManager stubManager, @NonNull ShardManager shardManager, @NonNull InstrumentProvider instrumentProvider) {
        this(executor, new ShardNotificationReceiver.Factory(stubManager), shardManager, instrumentProvider);
        if (executor == null) {
            throw new NullPointerException("executor is marked non-null but is null");
        }
        if (stubManager == null) {
            throw new NullPointerException("stubManager is marked non-null but is null");
        }
        if (shardManager == null) {
            throw new NullPointerException("shardManager is marked non-null but is null");
        }
        if (instrumentProvider == null) {
            throw new NullPointerException("instrumentProvider is marked non-null but is null");
        }
    }

    public NotificationManager(@NonNull ScheduledExecutorService executor, @NonNull ShardNotificationReceiver.Factory receiverFactory, @NonNull ShardManager shardManager, @NonNull InstrumentProvider instrumentProvider) {
        if (executor == null) {
            throw new NullPointerException("executor is marked non-null but is null");
        }
        if (receiverFactory == null) {
            throw new NullPointerException("receiverFactory is marked non-null but is null");
        }
        if (shardManager == null) {
            throw new NullPointerException("shardManager is marked non-null but is null");
        }
        if (instrumentProvider == null) {
            throw new NullPointerException("instrumentProvider is marked non-null but is null");
        }
        this.receiverFactory = receiverFactory;
        this.compositeCallback = receiverFactory.getCallback();
        this.shardManager = shardManager;
        this.executor = executor;
        this.counterNotificationsReceived = instrumentProvider.newCounter("oxia.client.notifications.received", Unit.Events, "The total number of notification events", Attributes.empty());
        this.counterNotificationsBatchesReceived = instrumentProvider.newCounter("oxia.client.notifications.batches.received", Unit.Events, "The total number of notification batches received", Attributes.empty());
    }

    @Override
    public void accept(@NonNull ShardManager.ShardAssignmentChanges changes) {
        if (changes == null) {
            throw new NullPointerException("changes is marked non-null but is null");
        }
        if (!this.started || this.closed) {
            return;
        }
        this.connectNotificationReceivers(changes);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerCallback(@NonNull Consumer<Notification> callback) {
        if (callback == null) {
            throw new NullPointerException("callback is marked non-null but is null");
        }
        if (this.closed) {
            throw new IllegalStateException("Notification manager has been closed");
        }
        this.compositeCallback.add(callback);
        if (!this.started) {
            NotificationManager notificationManager = this;
            synchronized (notificationManager) {
                if (!this.started) {
                    this.bootstrap();
                    this.started = true;
                }
            }
        }
    }

    private void bootstrap() {
        this.connectNotificationReceivers(new ShardManager.ShardAssignmentChanges(Set.copyOf(this.shardManager.allShards()), Collections.emptySet(), Collections.emptySet()));
    }

    private void connectNotificationReceivers(@NonNull ShardManager.ShardAssignmentChanges changes) {
        if (changes == null) {
            throw new NullPointerException("changes is marked non-null but is null");
        }
        changes.removed().forEach(shard -> ((ShardNotificationReceiver)this.shardReceivers.remove(shard.id())).close());
        changes.added().forEach(s2 -> this.shardReceivers.computeIfAbsent(s2.id(), id -> this.receiverFactory.newReceiver(s2.id(), s2.leader(), this, OptionalLong.empty())));
        changes.reassigned().forEach(s2 -> {
            Optional<ShardNotificationReceiver> receiver = Optional.ofNullable((ShardNotificationReceiver)this.shardReceivers.remove(s2.id()));
            receiver.ifPresent(ShardNotificationReceiver::close);
            OptionalLong offset = receiver.map(ShardNotificationReceiver::getOffset).orElse(OptionalLong.empty());
            this.shardReceivers.computeIfAbsent(s2.id(), id -> this.receiverFactory.newReceiver(s2.id(), s2.leader(), this, offset));
        });
    }

    @Override
    public void close() throws Exception {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.shardReceivers.values().parallelStream().forEach(ShardNotificationReceiver::close);
    }

    public ScheduledExecutorService getExecutor() {
        return this.executor;
    }

    public Counter getCounterNotificationsReceived() {
        return this.counterNotificationsReceived;
    }

    public Counter getCounterNotificationsBatchesReceived() {
        return this.counterNotificationsBatchesReceived;
    }
}

