/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.jdbc.context;

import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.YdbTracer;
import tech.ydb.jdbc.context.StaticQueryResult;
import tech.ydb.jdbc.context.StreamQueryResult;
import tech.ydb.jdbc.context.YdbContext;
import tech.ydb.jdbc.context.YdbExecutor;
import tech.ydb.jdbc.context.YdbValidator;
import tech.ydb.jdbc.exception.ExceptionFactory;
import tech.ydb.jdbc.impl.YdbQueryResult;
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.Session;
import tech.ydb.table.SessionRetryContext;
import tech.ydb.table.TableClient;
import tech.ydb.table.query.Params;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.settings.ExecuteScanQuerySettings;
import tech.ydb.table.settings.ExecuteSchemeQuerySettings;
import tech.ydb.table.values.ListValue;

public abstract class BaseYdbExecutor
implements YdbExecutor {
    private final SessionRetryContext retryCtx;
    private final Duration sessionTimeout;
    private final TableClient tableClient;
    private final AtomicReference<YdbQueryResult> currResult;
    protected final boolean traceEnabled;

    public BaseYdbExecutor(YdbContext ctx) {
        this.retryCtx = ctx.getRetryCtx();
        this.traceEnabled = ctx.isTxTracerEnabled();
        this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
        this.tableClient = ctx.getTableClient();
        this.currResult = new AtomicReference();
    }

    protected Session createNewTableSession(YdbValidator validator) throws SQLException {
        try {
            Result<Session> session = this.tableClient.createSession(this.sessionTimeout).join();
            validator.addStatusIssues(session.getStatus());
            return session.getValue();
        }
        catch (UnexpectedResultException ex) {
            throw ExceptionFactory.createException("Cannot create session with " + ex.getStatus(), ex);
        }
    }

    protected void closeCurrentResult() throws SQLException {
        YdbQueryResult rs = this.currResult.get();
        if (rs != null) {
            rs.close();
        }
    }

    protected YdbQueryResult updateCurrentResult(YdbQueryResult result) throws SQLException {
        YdbQueryResult old = this.currResult.getAndSet(result);
        if (old != null) {
            old.close();
        }
        return result;
    }

    @Override
    public void ensureOpened() throws SQLException {
        this.closeCurrentResult();
        if (this.isClosed()) {
            throw new SQLException("Connection is closed");
        }
    }

    @Override
    public YdbTracer trace(String message) {
        if (!this.traceEnabled) {
            return null;
        }
        YdbTracer tracer = YdbTracer.current();
        tracer.trace(message);
        return tracer;
    }

    @Override
    public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException {
        this.ensureOpened();
        String yql = query.getPreparedYql();
        YdbContext ctx = statement.getConnection().getCtx();
        YdbValidator validator = statement.getValidator();
        YdbTracer tracer = this.trace("--> scheme >>\n" + yql);
        ExecuteSchemeQuerySettings settings = ctx.withDefaultTimeout(new ExecuteSchemeQuerySettings());
        validator.execute((Object)((Object)QueryType.SCHEME_QUERY) + " >>\n" + yql, tracer, () -> this.retryCtx.supplyStatus(session -> session.executeSchemeQuery(yql, settings)));
        if (tracer != null && !this.isInsideTransaction()) {
            tracer.close();
        }
        return this.updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
    }

    @Override
    public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query, String tablePath, ListValue rows) throws SQLException {
        this.ensureOpened();
        String yql = query.getPreparedYql();
        YdbValidator validator = statement.getValidator();
        YdbTracer tracer = this.trace("--> bulk upsert >>\n" + yql);
        validator.execute((Object)((Object)QueryType.BULK_QUERY) + " >>\n" + yql, tracer, () -> this.retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows)));
        if (tracer != null && !this.isInsideTransaction()) {
            tracer.close();
        }
        return this.updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
    }

    @Override
    public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, String yql, Params params) throws SQLException {
        this.ensureOpened();
        YdbContext ctx = statement.getConnection().getCtx();
        YdbValidator validator = statement.getValidator();
        Duration scanQueryTimeout = ctx.getOperationProperties().getScanQueryTimeout();
        ExecuteScanQuerySettings settings = ((ExecuteScanQuerySettings.Builder)ExecuteScanQuerySettings.newBuilder().withRequestTimeout(scanQueryTimeout)).build();
        String msg = (Object)((Object)QueryType.SCAN_QUERY) + " >>\n" + yql;
        YdbTracer tracer = this.trace("--> scan query >>\n" + yql);
        Session session = this.createNewTableSession(validator);
        StreamQueryResult lazy = (StreamQueryResult)validator.call(msg, null, () -> {
            CompletableFuture future = new CompletableFuture();
            GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
            StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
            stream.start(rsr -> {
                future.complete(Result.success(result));
                result.onStreamResultSet(0, (ResultSetReader)rsr);
            }).whenComplete((st, th) -> {
                session.close();
                if (th != null) {
                    result.onStreamFinished((Throwable)th);
                    future.completeExceptionally((Throwable)th);
                    if (tracer != null) {
                        tracer.trace("<-- " + th.getMessage());
                        tracer.close();
                    }
                }
                if (st != null) {
                    validator.addStatusIssues((Status)st);
                    result.onStreamFinished((Status)st);
                    future.complete(st.isSuccess() ? Result.success(result) : Result.fail(st));
                    if (tracer != null) {
                        tracer.trace("<-- " + st.toString());
                        tracer.close();
                    }
                }
            });
            return future;
        });
        return this.updateCurrentResult(lazy);
    }
}

