/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.oxia.client.session;

import io.opentelemetry.api.common.Attributes;
import io.streamnative.oxia.client.ClientConfig;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.grpc.OxiaStubProvider;
import io.streamnative.oxia.client.metrics.Counter;
import io.streamnative.oxia.client.metrics.InstrumentProvider;
import io.streamnative.oxia.client.metrics.Unit;
import io.streamnative.oxia.client.session.SessionNotificationListener;
import io.streamnative.oxia.proto.CloseSessionRequest;
import io.streamnative.oxia.proto.CloseSessionResponse;
import io.streamnative.oxia.proto.KeepAliveResponse;
import io.streamnative.oxia.proto.SessionHeartbeat;
import io.streamnative.pulsarmetadatastoreoxia.shaded.com.google.common.annotations.VisibleForTesting;
import io.streamnative.pulsarmetadatastoreoxia.shaded.io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Session
implements StreamObserver<KeepAliveResponse> {
    private static final Logger log = LoggerFactory.getLogger(Session.class);
    @NonNull
    private final OxiaStubProvider stubProvider;
    @NonNull
    private final Duration sessionTimeout;
    @NonNull
    private final Duration heartbeatInterval;
    @VisibleForTesting
    private final long shardId;
    private final long sessionId;
    private final String clientIdentifier;
    @NonNull
    private final SessionHeartbeat heartbeat;
    @NonNull
    private final SessionNotificationListener listener;
    private volatile boolean closed;
    private Counter sessionsOpened;
    private Counter sessionsExpired;
    private Counter sessionsClosed;
    private final ScheduledFuture<?> heartbeatFuture;
    private volatile Instant lastSuccessfullResponse;

    Session(@NonNull ScheduledExecutorService executor, @NonNull OxiaStubProvider stubProvider, @NonNull ClientConfig config, long shardId, long sessionId, InstrumentProvider instrumentProvider, SessionNotificationListener listener) {
        if (executor == null) {
            throw new NullPointerException("executor is marked non-null but is null");
        }
        if (stubProvider == null) {
            throw new NullPointerException("stubProvider is marked non-null but is null");
        }
        if (config == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        this.stubProvider = stubProvider;
        this.sessionTimeout = config.sessionTimeout();
        this.heartbeatInterval = Duration.ofMillis(Math.max(config.sessionTimeout().toMillis() / 10L, Duration.ofSeconds(2L).toMillis()));
        this.shardId = shardId;
        this.sessionId = sessionId;
        this.clientIdentifier = config.clientIdentifier();
        this.heartbeat = SessionHeartbeat.newBuilder().setShardId(shardId).setSessionId(sessionId).build();
        this.listener = listener;
        log.info("Session created shard={} sessionId={} clientIdentity={}", shardId, sessionId, config.clientIdentifier());
        this.sessionsOpened = instrumentProvider.newCounter("oxia.client.sessions.opened", Unit.Sessions, "The total number of sessions opened by this client", Attributes.builder().put("oxia.shard", shardId).build());
        this.sessionsExpired = instrumentProvider.newCounter("oxia.client.sessions.expired", Unit.Sessions, "The total number of sessions expired int this client", Attributes.builder().put("oxia.shard", shardId).build());
        this.sessionsClosed = instrumentProvider.newCounter("oxia.client.sessions.closed", Unit.Sessions, "The total number of sessions closed by this client", Attributes.builder().put("oxia.shard", shardId).build());
        this.sessionsOpened.increment();
        this.lastSuccessfullResponse = Instant.now();
        this.heartbeatFuture = executor.scheduleAtFixedRate(this::sendKeepAlive, this.heartbeatInterval.toMillis(), this.heartbeatInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void sendKeepAlive() {
        Duration diff = Duration.between(this.lastSuccessfullResponse, Instant.now());
        if (diff.toMillis() > this.sessionTimeout.toMillis()) {
            this.handleSessionExpired();
            return;
        }
        this.stubProvider.getStubForShard(this.shardId).async().keepAlive(this.heartbeat, this);
    }

    @Override
    public void onNext(KeepAliveResponse value) {
        this.lastSuccessfullResponse = Instant.now();
        if (log.isDebugEnabled()) {
            log.debug("Received keep-alive response shard={} sessionId={} clientIdentity={}", this.shardId, this.sessionId, this.clientIdentifier);
        }
    }

    @Override
    public void onError(Throwable t2) {
        log.warn("Error during session keep-alive shard={} sessionId={} clientIdentity={}: {}", this.shardId, this.sessionId, this.clientIdentifier, t2.getMessage());
    }

    @Override
    public void onCompleted() {
    }

    private void handleSessionExpired() {
        this.sessionsExpired.increment();
        log.warn("Session expired shard={} sessionId={} clientIdentity={}", this.shardId, this.sessionId, this.clientIdentifier);
        this.close();
    }

    public CompletableFuture<Void> close() {
        this.sessionsClosed.increment();
        this.heartbeatFuture.cancel(true);
        OxiaStub stub = this.stubProvider.getStubForShard(this.shardId);
        CloseSessionRequest request = CloseSessionRequest.newBuilder().setShardId(this.shardId).setSessionId(this.sessionId).build();
        final CompletableFuture<Void> result = new CompletableFuture<Void>();
        stub.async().closeSession(request, new StreamObserver<CloseSessionResponse>(){

            @Override
            public void onNext(CloseSessionResponse value) {
                log.info("Session closed shard={} sessionId={} clientIdentity={}", Session.this.shardId, Session.this.sessionId, Session.this.clientIdentifier);
                Session.this.listener.onSessionClosed(Session.this);
                result.complete(null);
            }

            @Override
            public void onError(Throwable t2) {
                Session.this.listener.onSessionClosed(Session.this);
                result.complete(null);
            }

            @Override
            public void onCompleted() {
            }
        });
        return result;
    }

    long getShardId() {
        return this.shardId;
    }

    public long getSessionId() {
        return this.sessionId;
    }
}

