/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.table.impl.pool;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Issue;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.core.utils.Async;
import tech.ydb.table.Session;
import tech.ydb.table.SessionPoolStats;
import tech.ydb.table.impl.BaseSession;
import tech.ydb.table.impl.pool.SessionPoolOptions;
import tech.ydb.table.impl.pool.StatefulSession;
import tech.ydb.table.impl.pool.WaitingQueue;
import tech.ydb.table.rpc.TableRpc;
import tech.ydb.table.settings.CreateSessionSettings;
import tech.ydb.table.settings.DeleteSessionSettings;

public class SessionPool
implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
    private static final CreateSessionSettings CREATE_SETTINGS = (CreateSessionSettings)((CreateSessionSettings)new CreateSessionSettings().setTimeout(Duration.ofSeconds(300L))).setOperationTimeout(Duration.ofSeconds(299L));
    private final int minSize;
    private final Clock clock;
    private final ScheduledExecutorService scheduler;
    private final WaitingQueue<ClosableSession> queue;
    private final ScheduledFuture<?> keepAliveFuture;
    private final StatsImpl stats = new StatsImpl();

    public SessionPool(Clock clock, TableRpc rpc, boolean keepQueryText, SessionPoolOptions options) {
        this.minSize = options.getMinSize();
        this.clock = clock;
        this.scheduler = rpc.getScheduler();
        this.queue = new WaitingQueue<ClosableSession>(new Handler(rpc, keepQueryText), options.getMaxSize());
        KeepAliveTask keepAlive = new KeepAliveTask(options);
        this.keepAliveFuture = this.scheduler.scheduleAtFixedRate(keepAlive, keepAlive.periodMillis / 2L, keepAlive.periodMillis, TimeUnit.MILLISECONDS);
        logger.info("init session pool, min size = {}, max size = {}, keep alive period = {}", new Object[]{options.getMinSize(), options.getMaxSize(), keepAlive.periodMillis});
    }

    public void updateMaxSize(int maxSize) {
        this.queue.updateLimits(maxSize);
    }

    @Override
    public void close() {
        logger.info("closing session pool");
        this.keepAliveFuture.cancel(false);
        this.queue.close();
    }

    public SessionPoolStats stats() {
        return this.stats;
    }

    public CompletableFuture<Result<Session>> acquire(Duration timeout) {
        logger.debug("acquire session with timeout {}", (Object)timeout);
        CompletableFuture<Result<Session>> future = new CompletableFuture<Result<Session>>();
        if (!this.pollNext(future)) {
            future.whenComplete((BiConsumer)new Canceller(this.scheduler.schedule(new Timeout(future), timeout.toMillis(), TimeUnit.MILLISECONDS)));
        }
        return future;
    }

    private boolean pollNext(CompletableFuture<Result<Session>> future) {
        CompletableFuture nextSession = new CompletableFuture();
        this.queue.acquire(nextSession);
        if (nextSession.isDone() && !nextSession.isCompletedExceptionally()) {
            return this.validateSession((ClosableSession)nextSession.join(), future);
        }
        nextSession.whenComplete((session, th) -> {
            if (th != null) {
                Result fail;
                Throwable ex = Async.unwrapCompletionException((Throwable)th);
                Result result = fail = ex instanceof UnexpectedResultException ? Result.fail((UnexpectedResultException)((UnexpectedResultException)ex)) : Result.error((String)"can't create session", (Throwable)ex);
                if (!future.complete(fail)) {
                    logger.warn("session acquisition failed with status {}", (Object)fail);
                    return;
                }
            }
            if (session != null) {
                this.validateSession((ClosableSession)session, future);
            }
        });
        return false;
    }

    private boolean validateSession(ClosableSession session, CompletableFuture<Result<Session>> future) {
        if (session.state().switchToActive(this.clock.instant())) {
            logger.debug("session {} accepted", (Object)session.getId());
            if (future.complete((Result<Session>)Result.success((Object)session))) {
                this.stats.acquired.increment();
            } else {
                logger.debug("session future already canceled, return session to the pool");
                session.state().switchToIdle(this.clock.instant());
                this.queue.release(session);
            }
            return true;
        }
        this.queue.delete(session);
        return this.pollNext(future);
    }

    static final class Timeout
    implements Runnable {
        private static final Status EXPIRE = Status.of((StatusCode)StatusCode.CLIENT_DEADLINE_EXPIRED, (Issue[])new Issue[]{Issue.of((String)"session acquire deadline was expired", (Issue.Severity)Issue.Severity.WARNING)});
        private final CompletableFuture<Result<Session>> f;

        Timeout(CompletableFuture<Result<Session>> f) {
            this.f = f;
        }

        @Override
        public void run() {
            if (this.f != null && !this.f.isDone()) {
                this.f.complete((Result<Session>)Result.fail((Status)EXPIRE));
            }
        }
    }

    static final class Canceller
    implements BiConsumer<Object, Throwable> {
        private final Future<?> f;

        Canceller(Future<?> f) {
            this.f = f;
        }

        @Override
        public void accept(Object ignore, Throwable ex) {
            if (this.f != null && !this.f.isDone()) {
                this.f.cancel(false);
            }
        }
    }

    private class StatsImpl
    implements SessionPoolStats {
        private final LongAdder acquired = new LongAdder();
        private final LongAdder released = new LongAdder();
        private final LongAdder requested = new LongAdder();
        private final LongAdder failed = new LongAdder();
        private final LongAdder created = new LongAdder();
        private final LongAdder deleted = new LongAdder();

        private StatsImpl() {
        }

        @Override
        public int getMinSize() {
            return SessionPool.this.minSize;
        }

        @Override
        public int getMaxSize() {
            return SessionPool.this.queue.getTotalLimit();
        }

        @Override
        public int getIdleCount() {
            return SessionPool.this.queue.getIdleCount();
        }

        @Override
        public int getAcquiredCount() {
            return SessionPool.this.queue.getUsedCount();
        }

        @Override
        public int getPendingAcquireCount() {
            return SessionPool.this.queue.getWaitingCount() + SessionPool.this.queue.getPendingCount();
        }

        @Override
        public long getAcquiredTotal() {
            return this.acquired.sum();
        }

        @Override
        public long getReleasedTotal() {
            return this.released.sum();
        }

        @Override
        public long getRequestedTotal() {
            return this.requested.sum();
        }

        @Override
        public long getCreatedTotal() {
            return this.created.sum();
        }

        @Override
        public long getFailedTotal() {
            return this.failed.sum();
        }

        @Override
        public long getDeletedTotal() {
            return this.deleted.sum();
        }

        public String toString() {
            return "SessionPoolStats{minSize=" + this.getMinSize() + ", maxSize=" + this.getMaxSize() + ", idleCount=" + this.getIdleCount() + ", acquiredCount=" + this.getAcquiredCount() + ", pendingAcquireCount=" + this.getPendingAcquireCount() + ", acquiredTotal=" + this.getAcquiredTotal() + ", releasedTotal=" + this.getReleasedTotal() + ", requestsTotal=" + this.getRequestedTotal() + ", createdTotal=" + this.getCreatedTotal() + ", failedTotal=" + this.getFailedTotal() + ", deletedTotal=" + this.getDeletedTotal() + "}";
        }
    }

    private class KeepAliveTask
    implements Runnable {
        private final long maxIdleTimeMillis;
        private final long keepAliveTimeMillis;
        private final int maxKeepAliveCount;
        private final long periodMillis;
        private final AtomicInteger keepAliveCount = new AtomicInteger();

        KeepAliveTask(SessionPoolOptions options) {
            this.maxIdleTimeMillis = options.getMaxIdleTimeMillis();
            this.keepAliveTimeMillis = options.getKeepAliveTimeMillis();
            this.maxKeepAliveCount = Math.max(2, options.getMaxSize() / 5);
            this.periodMillis = Math.max(100L, Math.min(this.keepAliveTimeMillis / 5L, this.maxIdleTimeMillis / 2L));
        }

        @Override
        public void run() {
            Iterator coldIterator = SessionPool.this.queue.coldIterator();
            Instant now = SessionPool.this.clock.instant();
            Instant idleToRemove = now.minusMillis(this.maxIdleTimeMillis);
            Instant keepAlive = now.minusMillis(this.keepAliveTimeMillis);
            while (coldIterator.hasNext()) {
                StatefulSession session = (StatefulSession)coldIterator.next();
                StatefulSession.State state = session.state();
                if (state.needShutdown()) {
                    coldIterator.remove();
                    continue;
                }
                if (!state.lastActive().isAfter(idleToRemove) && SessionPool.this.queue.getTotalCount() > SessionPool.this.minSize) {
                    coldIterator.remove();
                    continue;
                }
                if (state.lastUpdate().isAfter(keepAlive) || this.keepAliveCount.get() >= this.maxKeepAliveCount || !state.switchToKeepAlive(now)) continue;
                this.keepAliveCount.incrementAndGet();
                logger.debug("keep alive session {}", (Object)session.getId());
                session.keepAlive().whenComplete((res, th) -> {
                    boolean ok = th == null && res.isSuccess() && res.getValue() == Session.State.READY;
                    this.keepAliveCount.decrementAndGet();
                    if (ok) {
                        logger.debug("keep alive session {} ok", (Object)session.getId());
                        session.state().switchToIdle(SessionPool.this.clock.instant());
                    } else {
                        logger.debug("keep alive session {} error, change status to broken", (Object)session.getId());
                        session.state().switchToBroken(SessionPool.this.clock.instant());
                    }
                });
            }
        }
    }

    private class Handler
    implements WaitingQueue.Handler<ClosableSession> {
        private final TableRpc tableRpc;
        private final boolean keepQueryText;

        Handler(TableRpc tableRpc, boolean keepQueryText) {
            this.tableRpc = tableRpc;
            this.keepQueryText = keepQueryText;
        }

        @Override
        public CompletableFuture<ClosableSession> create() {
            SessionPool.this.stats.requested.increment();
            return BaseSession.createSessionId(this.tableRpc, CREATE_SETTINGS, true).thenApply(response -> {
                if (!response.isSuccess()) {
                    SessionPool.this.stats.failed.increment();
                    throw new UnexpectedResultException("create session problem", response.getStatus());
                }
                return new ClosableSession((String)response.getValue(), this.tableRpc, this.keepQueryText);
            });
        }

        @Override
        public void destroy(ClosableSession session) {
            SessionPool.this.stats.deleted.increment();
            session.delete(new DeleteSessionSettings()).whenComplete((status, th) -> {
                if (th != null) {
                    logger.warn("session {} destoryed with exception {}", (Object)session.getId(), (Object)th.getMessage());
                }
                if (status != null) {
                    if (status.isSuccess()) {
                        logger.debug("session {} successful destoryed", (Object)session.getId());
                    } else {
                        logger.warn("session {} destoryed with status {}", (Object)session.getId(), (Object)status.toString());
                    }
                }
            });
        }
    }

    private class ClosableSession
    extends StatefulSession {
        ClosableSession(String id, TableRpc rpc, boolean keepQueryText) {
            super(id, SessionPool.this.clock, rpc, keepQueryText);
            logger.debug("session {} successful created", (Object)id);
            SessionPool.this.stats.created.increment();
        }

        @Override
        public void close() {
            SessionPool.this.stats.released.increment();
            if (this.state().switchToIdle(SessionPool.this.clock.instant())) {
                logger.debug("session {} release", (Object)this.getId());
                SessionPool.this.queue.release(this);
            } else {
                logger.debug("session {} shutdown", (Object)this.getId());
                SessionPool.this.queue.delete(this);
            }
        }
    }
}

