/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.oxia.client.session;

import io.streamnative.oxia.client.ClientConfig;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.metrics.SessionMetrics;
import io.streamnative.oxia.proto.CloseSessionRequest;
import io.streamnative.oxia.proto.CreateSessionRequest;
import io.streamnative.oxia.proto.CreateSessionResponse;
import io.streamnative.oxia.proto.SessionHeartbeat;
import io.streamnative.pulsarmetadatastoreoxia.shaded.com.google.common.annotations.VisibleForTesting;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.core.Disposable;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.core.publisher.Mono;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.core.scheduler.Scheduler;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.core.scheduler.Schedulers;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.util.retry.Retry;
import io.streamnative.pulsarmetadatastoreoxia.shaded.reactor.util.retry.RetryBackoffSpec;
import java.time.Duration;
import java.util.function.Function;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Session
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(Session.class);
    @NonNull
    private final Function<Long, OxiaStub> stubByShardId;
    @NonNull
    private final Duration sessionTimeout;
    @NonNull
    private final Duration heartbeatInterval;
    @VisibleForTesting
    private final long shardId;
    private final long sessionId;
    @NonNull
    private final SessionHeartbeat heartbeat;
    @NonNull
    private final SessionMetrics metrics;
    private Scheduler scheduler;
    private Disposable keepAliveSubscription;

    Session(@NonNull Function<Long, OxiaStub> stubByShardId, @NonNull ClientConfig config, long shardId, long sessionId, SessionMetrics metrics) {
        this(stubByShardId, config.sessionTimeout(), Duration.ofMillis(Math.max(config.sessionTimeout().toMillis() / 10L, Duration.ofSeconds(2L).toMillis())), shardId, sessionId, SessionHeartbeat.newBuilder().setShardId(shardId).setSessionId(sessionId).build(), metrics);
        if (stubByShardId == null) {
            throw new NullPointerException("stubByShardId is marked non-null but is null");
        }
        if (config == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        String threadName = String.format("session-[id=%s,shard=%s]-keep-alive", sessionId, shardId);
        this.scheduler = Schedulers.newSingle(threadName);
    }

    void start() {
        RetryBackoffSpec retrySpec = Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100L)).doBeforeRetry(signal -> log.warn("Retrying sending keep-alives for session [id={},shard={}] - {}", this.sessionId, this.shardId, signal));
        this.keepAliveSubscription = Mono.just(this.heartbeat).repeat().delayElements(this.heartbeatInterval).flatMap(hb -> this.stubByShardId.apply(this.shardId).reactor().keepAlive((SessionHeartbeat)hb)).retryWhen(retrySpec).timeout(this.sessionTimeout).publishOn(this.scheduler).doOnEach(this.metrics::recordKeepAlive).doOnError(t2 -> log.warn("Session keep-alive error: [id={},shard={}]", this.sessionId, this.shardId, t2)).subscribe();
    }

    @Override
    public void close() throws Exception {
        this.keepAliveSubscription.dispose();
        OxiaStub stub = this.stubByShardId.apply(this.shardId);
        CloseSessionRequest request = CloseSessionRequest.newBuilder().setShardId(this.shardId).setSessionId(this.sessionId).build();
        stub.reactor().closeSession(request).block();
        this.scheduler.dispose();
    }

    Session(@NonNull Function<Long, OxiaStub> stubByShardId, @NonNull Duration sessionTimeout, @NonNull Duration heartbeatInterval, long shardId, long sessionId, @NonNull SessionHeartbeat heartbeat, @NonNull SessionMetrics metrics) {
        if (stubByShardId == null) {
            throw new NullPointerException("stubByShardId is marked non-null but is null");
        }
        if (sessionTimeout == null) {
            throw new NullPointerException("sessionTimeout is marked non-null but is null");
        }
        if (heartbeatInterval == null) {
            throw new NullPointerException("heartbeatInterval is marked non-null but is null");
        }
        if (heartbeat == null) {
            throw new NullPointerException("heartbeat is marked non-null but is null");
        }
        if (metrics == null) {
            throw new NullPointerException("metrics is marked non-null but is null");
        }
        this.stubByShardId = stubByShardId;
        this.sessionTimeout = sessionTimeout;
        this.heartbeatInterval = heartbeatInterval;
        this.shardId = shardId;
        this.sessionId = sessionId;
        this.heartbeat = heartbeat;
        this.metrics = metrics;
    }

    long getShardId() {
        return this.shardId;
    }

    public long getSessionId() {
        return this.sessionId;
    }

    static class Factory {
        @NonNull
        ClientConfig config;
        @NonNull
        Function<Long, OxiaStub> stubByShardId;
        @NonNull
        SessionMetrics metrics;

        @NonNull
        Session create(long shardId) {
            OxiaStub stub = this.stubByShardId.apply(shardId);
            CreateSessionRequest request = CreateSessionRequest.newBuilder().setSessionTimeoutMs((int)this.config.sessionTimeout().toMillis()).setShardId(shardId).setClientIdentity(this.config.clientIdentifier()).build();
            CreateSessionResponse response = stub.reactor().createSession(request).block();
            if (response == null) {
                throw new IllegalStateException("Empty session returned for shardId: " + shardId);
            }
            return new Session(this.stubByShardId, this.config, shardId, response.getSessionId(), this.metrics);
        }

        Factory(@NonNull ClientConfig config, @NonNull Function<Long, OxiaStub> stubByShardId, @NonNull SessionMetrics metrics) {
            if (config == null) {
                throw new NullPointerException("config is marked non-null but is null");
            }
            if (stubByShardId == null) {
                throw new NullPointerException("stubByShardId is marked non-null but is null");
            }
            if (metrics == null) {
                throw new NullPointerException("metrics is marked non-null but is null");
            }
            this.config = config;
            this.stubByShardId = stubByShardId;
            this.metrics = metrics;
        }
    }
}

