/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.table;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.core.utils.Async;
import tech.ydb.table.Session;
import tech.ydb.table.SessionSupplier;

@ParametersAreNonnullByDefault
public class SessionRetryContext {
    private static final Logger logger = LoggerFactory.getLogger(SessionRetryContext.class);
    private final SessionSupplier sessionSupplier;
    private final Executor executor;
    private final Duration sessionCreationTimeout;
    private final int maxRetries;
    private final long backoffSlotMillis;
    private final int backoffCeiling;
    private final long fastBackoffSlotMillis;
    private final int fastBackoffCeiling;
    private final boolean retryNotFound;
    private final boolean idempotent;

    private SessionRetryContext(Builder b) {
        this.sessionSupplier = b.sessionSupplier;
        this.executor = b.executor;
        this.sessionCreationTimeout = b.sessionCreationTimeout;
        this.maxRetries = b.maxRetries;
        this.backoffSlotMillis = b.backoffSlotMillis;
        this.backoffCeiling = b.backoffCeiling;
        this.fastBackoffSlotMillis = b.fastBackoffSlotMillis;
        this.fastBackoffCeiling = b.fastBackoffCeiling;
        this.retryNotFound = b.retryNotFound;
        this.idempotent = b.idempotent;
    }

    public static Builder create(SessionSupplier sessionSupplier) {
        return new Builder(Objects.requireNonNull(sessionSupplier));
    }

    public <T> CompletableFuture<Result<T>> supplyResult(Function<Session, CompletableFuture<Result<T>>> fn) {
        RetryableResultTask<T> task = new RetryableResultTask<T>(fn);
        task.requestSession();
        return task.getFuture();
    }

    public CompletableFuture<Status> supplyStatus(Function<Session, CompletableFuture<Status>> fn) {
        RetryableStatusTask task = new RetryableStatusTask(fn);
        task.requestSession();
        return task.getFuture();
    }

    private boolean canRetry(Throwable t) {
        Throwable cause = Async.unwrapCompletionException(t);
        if (cause instanceof UnexpectedResultException) {
            StatusCode statusCode = ((UnexpectedResultException)cause).getStatus().getCode();
            return statusCode.isRetryable(this.idempotent, this.retryNotFound);
        }
        return false;
    }

    private String errorMsg(Throwable t) {
        if (!logger.isDebugEnabled()) {
            return "unknown";
        }
        Throwable cause = Async.unwrapCompletionException(t);
        if (cause instanceof UnexpectedResultException) {
            StatusCode statusCode = ((UnexpectedResultException)cause).getStatus().getCode();
            return statusCode.name();
        }
        return t.getMessage();
    }

    private boolean canRetry(StatusCode code) {
        return code.isRetryable(this.idempotent) || this.retryNotFound && code == StatusCode.NOT_FOUND;
    }

    private long backoffTimeMillisInternal(int retryNumber, long backoffSlotMillis, int backoffCeiling) {
        int slots = 1 << Math.min(retryNumber, backoffCeiling);
        long delay = backoffSlotMillis * (long)slots;
        return delay + ThreadLocalRandom.current().nextLong(delay);
    }

    private long slowBackoffTimeMillis(int retryNumber) {
        return this.backoffTimeMillisInternal(retryNumber, this.backoffSlotMillis, this.backoffCeiling);
    }

    private long fastBackoffTimeMillis(int retryNumber) {
        return this.backoffTimeMillisInternal(retryNumber, this.fastBackoffSlotMillis, this.fastBackoffCeiling);
    }

    private long backoffTimeMillis(StatusCode code, int retryNumber) {
        switch (code) {
            case BAD_SESSION: {
                return 0L;
            }
            case ABORTED: 
            case CLIENT_CANCELLED: 
            case CLIENT_INTERNAL_ERROR: 
            case SESSION_BUSY: 
            case TRANSPORT_UNAVAILABLE: 
            case UNAVAILABLE: 
            case UNDETERMINED: {
                return this.fastBackoffTimeMillis(retryNumber);
            }
        }
        return this.slowBackoffTimeMillis(retryNumber);
    }

    private long backoffTimeMillis(Throwable t, int retryNumber) {
        Throwable cause = Async.unwrapCompletionException(t);
        if (cause instanceof UnexpectedResultException) {
            StatusCode statusCode = ((UnexpectedResultException)cause).getStatus().getCode();
            return this.backoffTimeMillis(statusCode, retryNumber);
        }
        return this.slowBackoffTimeMillis(retryNumber);
    }

    @ParametersAreNonnullByDefault
    public static final class Builder {
        private final SessionSupplier sessionSupplier;
        private Executor executor = MoreExecutors.directExecutor();
        private Duration sessionCreationTimeout = Duration.ofSeconds(5L);
        private int maxRetries = 10;
        private long backoffSlotMillis = 500L;
        private int backoffCeiling = 6;
        private long fastBackoffSlotMillis = 5L;
        private int fastBackoffCeiling = 10;
        private boolean retryNotFound = true;
        private boolean idempotent = false;

        public Builder(SessionSupplier sessionSupplier) {
            this.sessionSupplier = sessionSupplier;
        }

        public Builder executor(Executor executor) {
            this.executor = Objects.requireNonNull(executor);
            return this;
        }

        public Builder sessionCreationTimeout(Duration duration) {
            this.sessionCreationTimeout = duration;
            return this;
        }

        public Builder maxRetries(int maxRetries) {
            this.maxRetries = maxRetries;
            return this;
        }

        public Builder backoffSlot(Duration duration) {
            Preconditions.checkArgument(!duration.isNegative(), "backoffSlot(%s) is negative", (Object)duration);
            this.backoffSlotMillis = duration.toMillis();
            return this;
        }

        public Builder backoffCeiling(int backoffCeiling) {
            this.backoffCeiling = backoffCeiling;
            return this;
        }

        public Builder fastBackoffSlot(Duration duration) {
            Preconditions.checkArgument(!duration.isNegative(), "backoffSlot(%s) is negative", (Object)duration);
            this.fastBackoffSlotMillis = duration.toMillis();
            return this;
        }

        public Builder fastBackoffCeiling(int backoffCeiling) {
            this.fastBackoffCeiling = backoffCeiling;
            return this;
        }

        public Builder retryNotFound(boolean retryNotFound) {
            this.retryNotFound = retryNotFound;
            return this;
        }

        public Builder idempotent(boolean idempotent) {
            this.idempotent = idempotent;
            return this;
        }

        public SessionRetryContext build() {
            return new SessionRetryContext(this);
        }
    }

    private final class RetryableStatusTask
    extends BaseRetryableTask<Status> {
        RetryableStatusTask(Function<Session, CompletableFuture<Status>> fn) {
            super(fn);
        }

        @Override
        StatusCode toStatusCode(Status status) {
            return status.getCode();
        }

        @Override
        Status toFailedResult(Result<Session> sessionResult) {
            return sessionResult.getStatus();
        }
    }

    private final class RetryableResultTask<T>
    extends BaseRetryableTask<Result<T>> {
        RetryableResultTask(Function<Session, CompletableFuture<Result<T>>> fn) {
            super(fn);
        }

        @Override
        StatusCode toStatusCode(Result<T> result) {
            return result.getStatus().getCode();
        }

        @Override
        Result<T> toFailedResult(Result<Session> sessionResult) {
            return sessionResult.map(null);
        }
    }

    private abstract class BaseRetryableTask<R>
    implements Runnable {
        private final CompletableFuture<R> promise = new CompletableFuture();
        private final AtomicInteger retryNumber = new AtomicInteger();
        private final Function<Session, CompletableFuture<R>> fn;
        private final long createTimestamp = Instant.now().toEpochMilli();

        BaseRetryableTask(Function<Session, CompletableFuture<R>> fn) {
            this.fn = fn;
        }

        CompletableFuture<R> getFuture() {
            return this.promise;
        }

        abstract StatusCode toStatusCode(R var1);

        abstract R toFailedResult(Result<Session> var1);

        private long ms() {
            return Instant.now().toEpochMilli() - this.createTimestamp;
        }

        @Override
        public void run() {
            if (this.promise.isCancelled()) {
                logger.debug("RetryCtx[{}] cancelled, {} retries, {} ms", this.hashCode(), this.retryNumber.get(), this.ms());
                return;
            }
            SessionRetryContext.this.executor.execute(this::requestSession);
        }

        public void requestSession() {
            CompletableFuture<Result<Session>> sessionFuture = SessionRetryContext.this.sessionSupplier.createSession(SessionRetryContext.this.sessionCreationTimeout);
            if (sessionFuture.isDone() && !sessionFuture.isCompletedExceptionally()) {
                this.acceptSession(sessionFuture.join());
            } else {
                sessionFuture.whenCompleteAsync((result, th) -> {
                    if (result != null) {
                        this.acceptSession((Result<Session>)result);
                    }
                    if (th != null) {
                        this.handleException((Throwable)th);
                    }
                }, SessionRetryContext.this.executor);
            }
        }

        private void acceptSession(@Nonnull Result<Session> sessionResult) {
            if (!sessionResult.isSuccess()) {
                this.handleError(sessionResult.getStatus().getCode(), this.toFailedResult(sessionResult));
                return;
            }
            Session session = sessionResult.getValue();
            Async.safeCall(session, this.fn).whenComplete((fnResult, fnException) -> {
                try {
                    session.close();
                    if (fnException != null) {
                        this.handleException((Throwable)fnException);
                        return;
                    }
                    StatusCode statusCode = this.toStatusCode(fnResult);
                    if (statusCode == StatusCode.SUCCESS) {
                        logger.debug("RetryCtx[{}] OK, finished after {} retries, {} ms total", this.hashCode(), this.retryNumber.get(), this.ms());
                        this.promise.complete(fnResult);
                    } else {
                        this.handleError(statusCode, fnResult);
                    }
                }
                catch (Throwable unexpected) {
                    logger.debug("RetryCtx[{}] UNEXPECTED[{}], finished after {} retries, {} ms total", this.hashCode(), unexpected.getMessage(), this.retryNumber.get(), this.ms());
                    this.promise.completeExceptionally(unexpected);
                }
            });
        }

        private void scheduleNext(long delayMillis) {
            if (this.promise.isCancelled()) {
                return;
            }
            SessionRetryContext.this.sessionSupplier.getScheduler().schedule(this, delayMillis, TimeUnit.MILLISECONDS);
        }

        private void handleError(@Nonnull StatusCode code, R result) {
            if (!code.isRetryable(SessionRetryContext.this.idempotent, SessionRetryContext.this.retryNotFound)) {
                logger.debug("RetryCtx[{}] NON-RETRYABLE CODE[{}], finished after {} retries, {} ms total", new Object[]{this.hashCode(), code, this.retryNumber.get(), this.ms()});
                this.promise.complete(result);
                return;
            }
            int retry = this.retryNumber.incrementAndGet();
            if (retry <= SessionRetryContext.this.maxRetries) {
                long next = SessionRetryContext.this.backoffTimeMillis(code, retry);
                logger.debug("RetryCtx[{}] RETRYABLE CODE[{}], scheduling next retry #{} in {} ms, {} ms total", new Object[]{this.hashCode(), code, retry, next, this.ms()});
                this.scheduleNext(next);
            } else {
                logger.debug("RetryCtx[{}] RETRYABLE CODE[{}], finished by retries limit ({}), {} ms total", new Object[]{this.hashCode(), code, SessionRetryContext.this.maxRetries, this.ms()});
                this.promise.complete(result);
            }
        }

        private void handleException(@Nonnull Throwable ex) {
            if (!SessionRetryContext.this.canRetry(ex)) {
                logger.debug("RetryCtx[{}] NON-RETRYABLE ERROR[{}], finished after {} retries, {} ms total", this.hashCode(), SessionRetryContext.this.errorMsg(ex), this.retryNumber.get(), this.ms());
                this.promise.completeExceptionally(ex);
                return;
            }
            int retry = this.retryNumber.incrementAndGet();
            if (retry <= SessionRetryContext.this.maxRetries) {
                long next = SessionRetryContext.this.backoffTimeMillis(ex, retry);
                logger.debug("RetryCtx[{}] RETRYABLE ERROR[{}], scheduling next retry #{} in {} ms, {} ms total", this.hashCode(), SessionRetryContext.this.errorMsg(ex), retry, next, this.ms());
                this.scheduleNext(next);
            } else {
                logger.debug("RetryCtx[{}] RETRYABLE ERROR[{}], finished by retries limit ({}), {} ms total", this.hashCode(), SessionRetryContext.this.errorMsg(ex), SessionRetryContext.this.maxRetries, this.ms());
                this.promise.completeExceptionally(ex);
            }
        }
    }
}

