/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.query.impl;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Issue;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.utils.FutureTools;
import tech.ydb.proto.query.YdbQuery;
import tech.ydb.query.QuerySession;
import tech.ydb.query.impl.QueryServiceRpc;
import tech.ydb.query.impl.SessionImpl;
import tech.ydb.query.settings.AttachSessionSettings;
import tech.ydb.query.settings.CreateSessionSettings;
import tech.ydb.query.settings.DeleteSessionSettings;
import tech.ydb.table.SessionPoolStats;
import tech.ydb.table.impl.pool.WaitingQueue;

class SessionPool
implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
    private static final CreateSessionSettings CREATE_SETTINGS = ((CreateSessionSettings.Builder)((CreateSessionSettings.Builder)CreateSessionSettings.newBuilder().withRequestTimeout(Duration.ofSeconds(300L))).withOperationTimeout(Duration.ofSeconds(299L))).build();
    private static final DeleteSessionSettings DELETE_SETTINGS = ((DeleteSessionSettings.Builder)((DeleteSessionSettings.Builder)DeleteSessionSettings.newBuilder().withRequestTimeout(Duration.ofSeconds(5L))).withOperationTimeout(Duration.ofSeconds(4L))).build();
    private static final AttachSessionSettings ATTACH_SETTINGS = AttachSessionSettings.newBuilder().build();
    private final int minSize;
    private final Clock clock;
    private final ScheduledExecutorService scheduler;
    private final WaitingQueue<PooledQuerySession> queue;
    private final ScheduledFuture<?> cleanerFuture;
    private final StatsImpl stats = new StatsImpl();

    SessionPool(Clock clock, QueryServiceRpc rpc, ScheduledExecutorService scheduler, int minSize, int maxSize, Duration idleDuration) {
        this.minSize = minSize;
        this.clock = clock;
        this.scheduler = scheduler;
        this.queue = new WaitingQueue((WaitingQueue.Handler)new Handler(rpc), maxSize);
        CleanerTask cleaner = new CleanerTask(idleDuration);
        this.cleanerFuture = scheduler.scheduleAtFixedRate(cleaner, cleaner.periodMillis / 2L, cleaner.periodMillis, TimeUnit.MILLISECONDS);
        logger.info("init QuerySession pool, min size = {}, max size = {}, keep alive period = {}", new Object[]{minSize, maxSize, cleaner.periodMillis});
    }

    public void updateMaxSize(int maxSize) {
        this.queue.updateLimits(maxSize);
    }

    SessionPoolStats getStats() {
        return this.stats;
    }

    @Override
    public void close() {
        logger.info("closing QuerySession pool");
        this.cleanerFuture.cancel(false);
        this.queue.close();
    }

    public CompletableFuture<Result<QuerySession>> acquire(Duration timeout) {
        logger.trace("acquire QuerySession with timeout {}", (Object)timeout);
        CompletableFuture<Result<QuerySession>> future = new CompletableFuture<Result<QuerySession>>();
        if (!this.pollNext(future)) {
            future.whenComplete((BiConsumer)new Canceller(this.scheduler.schedule(new Timeout(future), timeout.toMillis(), TimeUnit.MILLISECONDS)));
        }
        return future;
    }

    private boolean pollNext(CompletableFuture<Result<QuerySession>> future) {
        CompletableFuture nextSession = new CompletableFuture();
        this.queue.acquire(nextSession);
        if (nextSession.isDone() && !nextSession.isCompletedExceptionally()) {
            return this.tryComplete(future, (PooledQuerySession)nextSession.join());
        }
        nextSession.whenComplete((session, th) -> {
            if (th != null) {
                if (future.isDone()) {
                    logger.warn("can't get QuerySession, future is already canceled", th);
                    return;
                }
                Throwable ex = FutureTools.unwrapCompletionException((Throwable)th);
                if (ex instanceof UnexpectedResultException) {
                    future.complete(Result.fail((UnexpectedResultException)((UnexpectedResultException)ex)));
                } else {
                    future.complete(Result.error((String)"can't create QuerySession", (Throwable)ex));
                }
            }
            if (session != null) {
                this.tryComplete(future, (PooledQuerySession)session);
            }
        });
        return false;
    }

    private boolean tryComplete(CompletableFuture<Result<QuerySession>> future, PooledQuerySession session) {
        logger.trace("QuerySession[{}] tries to complete acquire", (Object)session.getId());
        if (!future.complete((Result<QuerySession>)Result.success((Object)session))) {
            logger.debug("QuerySession[{}] future already done, return session to the pool", (Object)session.getId());
            this.queue.release((Object)session);
            return false;
        }
        this.stats.acquired.increment();
        return true;
    }

    static final class Timeout
    implements Runnable {
        private static final Status EXPIRE = Status.of((StatusCode)StatusCode.CLIENT_DEADLINE_EXPIRED, (Issue[])new Issue[]{Issue.of((String)"query session acquire deadline was expired", (Issue.Severity)Issue.Severity.WARNING)});
        private final CompletableFuture<Result<QuerySession>> f;

        Timeout(CompletableFuture<Result<QuerySession>> f) {
            this.f = f;
        }

        @Override
        public void run() {
            if (this.f != null && !this.f.isDone()) {
                this.f.complete((Result<QuerySession>)Result.fail((Status)EXPIRE));
            }
        }
    }

    static final class Canceller
    implements BiConsumer<Object, Throwable> {
        private final Future<?> f;

        Canceller(Future<?> f) {
            this.f = f;
        }

        @Override
        public void accept(Object ignore, Throwable ex) {
            if (this.f != null && !this.f.isDone()) {
                this.f.cancel(false);
            }
        }
    }

    private class StatsImpl
    implements SessionPoolStats {
        private final LongAdder acquired = new LongAdder();
        private final LongAdder released = new LongAdder();
        private final LongAdder requested = new LongAdder();
        private final LongAdder failed = new LongAdder();
        private final LongAdder created = new LongAdder();
        private final LongAdder deleted = new LongAdder();

        private StatsImpl() {
        }

        public int getMinSize() {
            return SessionPool.this.minSize;
        }

        public int getMaxSize() {
            return SessionPool.this.queue.getTotalLimit();
        }

        public int getIdleCount() {
            return SessionPool.this.queue.getIdleCount();
        }

        public int getAcquiredCount() {
            return SessionPool.this.queue.getUsedCount();
        }

        public int getPendingAcquireCount() {
            return SessionPool.this.queue.getWaitingCount() + SessionPool.this.queue.getPendingCount();
        }

        public long getAcquiredTotal() {
            return this.acquired.sum();
        }

        public long getReleasedTotal() {
            return this.released.sum();
        }

        public long getRequestedTotal() {
            return this.requested.sum();
        }

        public long getCreatedTotal() {
            return this.created.sum();
        }

        public long getFailedTotal() {
            return this.failed.sum();
        }

        public long getDeletedTotal() {
            return this.deleted.sum();
        }

        public String toString() {
            return "SessionPoolStats{minSize=" + this.getMinSize() + ", maxSize=" + this.getMaxSize() + ", idleCount=" + this.getIdleCount() + ", acquiredCount=" + this.getAcquiredCount() + ", pendingAcquireCount=" + this.getPendingAcquireCount() + ", acquiredTotal=" + this.getAcquiredTotal() + ", releasedTotal=" + this.getReleasedTotal() + ", requestsTotal=" + this.getRequestedTotal() + ", createdTotal=" + this.getCreatedTotal() + ", failedTotal=" + this.getFailedTotal() + ", deletedTotal=" + this.getDeletedTotal() + "}";
        }
    }

    private class CleanerTask
    implements Runnable {
        private final long maxIdleTimeMillis;
        private final long periodMillis;

        CleanerTask(Duration idleDuration) {
            this.maxIdleTimeMillis = idleDuration.toMillis();
            this.periodMillis = Math.max(500L, this.maxIdleTimeMillis / 3L);
        }

        @Override
        public void run() {
            Iterator coldIterator = SessionPool.this.queue.coldIterator();
            Instant now = SessionPool.this.clock.instant();
            Instant idleToRemove = now.minusMillis(this.maxIdleTimeMillis);
            while (coldIterator.hasNext()) {
                PooledQuerySession session = (PooledQuerySession)coldIterator.next();
                if (session.getLastActive().isAfter(idleToRemove) || SessionPool.this.queue.getTotalCount() <= SessionPool.this.minSize) continue;
                coldIterator.remove();
            }
        }
    }

    private class Handler
    implements WaitingQueue.Handler<PooledQuerySession> {
        private final QueryServiceRpc rpc;

        Handler(QueryServiceRpc rpc) {
            this.rpc = rpc;
        }

        public CompletableFuture<PooledQuerySession> create() {
            SessionPool.this.stats.requested.increment();
            return ((CompletableFuture)SessionImpl.createSession(this.rpc, CREATE_SETTINGS, true).thenCompose(r -> {
                if (!r.isSuccess()) {
                    SessionPool.this.stats.failed.increment();
                    throw new UnexpectedResultException("create session problem", r.getStatus());
                }
                return new PooledQuerySession(this.rpc, (YdbQuery.CreateSessionResponse)r.getValue()).start();
            })).thenApply(Result::getValue);
        }

        public void destroy(PooledQuerySession session) {
            SessionPool.this.stats.deleted.increment();
            session.destroy();
        }
    }

    private class PooledQuerySession
    extends SessionImpl {
        private final GrpcReadStream<Status> attachStream;
        private volatile Instant lastActive;
        private volatile boolean isStarted;
        private volatile boolean isBroken;
        private volatile boolean isStopped;

        PooledQuerySession(QueryServiceRpc rpc, YdbQuery.CreateSessionResponse response) {
            super(rpc, response);
            this.isStarted = false;
            this.isBroken = false;
            this.isStopped = false;
            this.lastActive = SessionPool.this.clock.instant();
            this.attachStream = this.attach(ATTACH_SETTINGS);
            SessionPool.this.stats.created.increment();
        }

        @Override
        public void updateSessionState(Status status) {
            boolean isStatusBroken;
            this.lastActive = SessionPool.this.clock.instant();
            boolean bl = isStatusBroken = status.getCode() == StatusCode.BAD_SESSION || status.getCode() == StatusCode.SESSION_BUSY || status.getCode() == StatusCode.INTERNAL_ERROR || status.getCode() == StatusCode.CLIENT_DEADLINE_EXCEEDED || status.getCode() == StatusCode.CLIENT_DEADLINE_EXPIRED || status.getCode() == StatusCode.CLIENT_CANCELLED || status.getCode() == StatusCode.TRANSPORT_UNAVAILABLE;
            if (isStatusBroken) {
                logger.warn("QuerySession[{}] broken with status {}", (Object)this.getId(), (Object)status);
            }
            this.isBroken = this.isBroken || isStatusBroken;
        }

        public Instant getLastActive() {
            return this.lastActive;
        }

        public CompletableFuture<Result<PooledQuerySession>> start() {
            CompletableFuture<Result<PooledQuerySession>> future = new CompletableFuture<Result<PooledQuerySession>>();
            Result ok = Result.success((Object)this);
            ((CompletableFuture)this.attachStream.start(status -> {
                if (!status.isSuccess()) {
                    logger.warn("QuerySession[{}] attach message {}", (Object)this.getId(), status);
                    future.complete(Result.fail((Status)status));
                    this.clean();
                    return;
                }
                if (future.complete(ok)) {
                    logger.debug("QuerySession[{}] attach message {}", (Object)this.getId(), status);
                    this.isStarted = true;
                    return;
                }
                logger.trace("QuerySession[{}] attach message {}", (Object)this.getId(), status);
            }).whenComplete((status, th) -> {
                if (th != null) {
                    logger.debug("QuerySession[{}] finished with exception", (Object)this.getId(), th);
                }
                if (status != null) {
                    if (status.isSuccess()) {
                        logger.debug("QuerySession[{}] finished with status {}", (Object)this.getId(), status);
                    } else {
                        logger.warn("QuerySession[{}] finished with status {}", (Object)this.getId(), status);
                    }
                }
            })).thenRun(this::clean);
            return future;
        }

        private void clean() {
            logger.debug("QuerySession[{}] attach stream is stopped", (Object)this.getId());
            this.isStopped = true;
            if (!this.isStarted) {
                this.destroy();
            }
        }

        public void destroy() {
            logger.debug("QuerySession[{}] destroy", (Object)this.getId());
            this.delete(DELETE_SETTINGS).whenComplete((status, th) -> {
                if (th != null) {
                    logger.warn("QuerySession[{}] removed with exception {}", (Object)this.getId(), (Object)th.getMessage());
                }
                if (status != null) {
                    if (status.isSuccess()) {
                        logger.debug("QuerySession[{}] successful removed", (Object)this.getId());
                    } else {
                        logger.warn("QuerySession[{}] removed with status {}", (Object)this.getId(), status);
                    }
                }
            });
        }

        @Override
        public void close() {
            logger.trace("QuerySession[{}] closed with broke status {}", (Object)this.getId(), (Object)this.isBroken);
            SessionPool.this.stats.released.increment();
            if (this.isBroken || this.isStopped) {
                SessionPool.this.queue.delete((Object)this);
            } else {
                SessionPool.this.queue.release((Object)this);
            }
        }
    }
}

