/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.yoj.repository.ydb;

import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import io.grpc.Context;
import io.grpc.Deadline;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.proto.ValueProtos;
import tech.ydb.table.Session;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.Params;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.settings.BulkUpsertSettings;
import tech.ydb.table.settings.CommitTxSettings;
import tech.ydb.table.settings.ExecuteDataQuerySettings;
import tech.ydb.table.settings.ExecuteScanQuerySettings;
import tech.ydb.table.settings.ReadTableSettings;
import tech.ydb.table.settings.RollbackTxSettings;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.table.values.ListValue;
import tech.ydb.table.values.StructValue;
import tech.ydb.table.values.TupleValue;
import tech.ydb.table.values.Value;
import tech.ydb.yoj.repository.BaseDb;
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.IsolationLevel;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.TxOptions;
import tech.ydb.yoj.repository.db.bulk.BulkParams;
import tech.ydb.yoj.repository.db.cache.RepositoryCache;
import tech.ydb.yoj.repository.db.cache.RepositoryCacheImpl;
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
import tech.ydb.yoj.repository.db.exception.IllegalTransactionIsolationLevelException;
import tech.ydb.yoj.repository.db.exception.IllegalTransactionScanException;
import tech.ydb.yoj.repository.db.exception.OptimisticLockException;
import tech.ydb.yoj.repository.db.exception.RepositoryException;
import tech.ydb.yoj.repository.db.exception.UnavailableException;
import tech.ydb.yoj.repository.db.readtable.ReadTableParams;
import tech.ydb.yoj.repository.ydb.YdbLegacySpliterator;
import tech.ydb.yoj.repository.ydb.YdbOperations;
import tech.ydb.yoj.repository.ydb.YdbRepository;
import tech.ydb.yoj.repository.ydb.YdbSpliterator;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapper;
import tech.ydb.yoj.repository.ydb.client.ResultSetConverter;
import tech.ydb.yoj.repository.ydb.client.YdbConverter;
import tech.ydb.yoj.repository.ydb.client.YdbValidator;
import tech.ydb.yoj.repository.ydb.exception.BadSessionException;
import tech.ydb.yoj.repository.ydb.exception.ResultTruncatedException;
import tech.ydb.yoj.repository.ydb.exception.UnexpectedException;
import tech.ydb.yoj.repository.ydb.exception.YdbComponentUnavailableException;
import tech.ydb.yoj.repository.ydb.exception.YdbOverloadedException;
import tech.ydb.yoj.repository.ydb.exception.YdbRepositoryException;
import tech.ydb.yoj.repository.ydb.merge.QueriesMerger;
import tech.ydb.yoj.repository.ydb.readtable.ReadTableMapper;
import tech.ydb.yoj.repository.ydb.statement.Statement;
import tech.ydb.yoj.repository.ydb.table.YdbTable;
import tech.ydb.yoj.util.lang.Interrupts;

public class YdbRepositoryTransaction<REPO extends YdbRepository>
implements BaseDb,
RepositoryTransaction,
YdbTable.QueryExecutor,
TransactionLocal.Holder {
    private static final Logger log = LoggerFactory.getLogger(YdbRepositoryTransaction.class);
    private final List<YdbRepository.Query<?>> pendingWrites = new ArrayList();
    private final List<YdbSpliterator<?>> spliterators = new ArrayList();
    private final TxOptions options;
    private final TransactionLocal transactionLocal;
    private final RepositoryCache cache;
    protected final REPO repo;
    private Session session = null;
    private Stopwatch sessionSw;
    protected String txId = null;
    private String firstNonNullTxId = null;
    private String closeAction = null;
    private boolean isBadSession = false;

    public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) {
        if (options == null) {
            throw new NullPointerException("options is marked non-null but is null");
        }
        this.repo = repo;
        this.options = options;
        this.transactionLocal = new TransactionLocal(options);
        this.cache = options.isFirstLevelCache() ? new RepositoryCacheImpl() : RepositoryCache.empty();
    }

    private <V> YdbSpliterator<V> createSpliterator(String request, boolean isOrdered) {
        YdbSpliterator spliterator = new YdbSpliterator(request, isOrdered);
        this.spliterators.add(spliterator);
        return spliterator;
    }

    public <T extends Entity<T>> Table<T> table(Class<T> c) {
        return new YdbTable<T>(c, this);
    }

    public void commit() {
        if (this.isBadSession) {
            throw new IllegalStateException("Transaction was invalidated. Commit isn't possible");
        }
        try {
            this.flushPendingWrites();
        }
        catch (Throwable t) {
            this.rollback();
            throw t;
        }
        this.endTransaction("commit", this::doCommit);
    }

    public void rollback() {
        Interrupts.runInCleanupMode(() -> {
            try {
                this.endTransaction("rollback", () -> {
                    Status status = (Status)YdbOperations.safeJoin(this.session.rollbackTransaction(this.txId, new RollbackTxSettings()));
                    this.validate("rollback", status.getCode(), status.toString());
                });
            }
            catch (Throwable t) {
                log.info("Failed to rollback the transaction", t);
            }
        });
    }

    private void doCommit() {
        try {
            Status status = (Status)YdbOperations.safeJoin(this.session.commitTransaction(this.txId, new CommitTxSettings()));
            YdbValidator.validatePkConstraint(status.getIssues());
            this.validate("commit", status.getCode(), status.toString());
        }
        catch (YdbComponentUnavailableException | YdbOverloadedException e) {
            throw new UnavailableException("Unknown transaction state: commit was sent, but result is unknown", e);
        }
    }

    private void closeStreams() {
        Exception summaryException = null;
        for (YdbSpliterator<?> spliterator : this.spliterators) {
            try {
                spliterator.close();
            }
            catch (Exception e) {
                if (summaryException == null) {
                    summaryException = e;
                    continue;
                }
                summaryException.addSuppressed(e);
            }
        }
        if (summaryException != null) {
            throw new UnexpectedException("Exceptions on stream close. Thread leak are possible", summaryException);
        }
    }

    private void validate(String request, StatusCode statusCode, String response) {
        if (!this.isBadSession) {
            this.isBadSession = YdbValidator.isTransactionClosedByServer(statusCode);
        }
        try {
            YdbValidator.validate(request, statusCode, response);
        }
        catch (OptimisticLockException | BadSessionException e) {
            this.transactionLocal.log().info("Request got %s: DB tx was invalidated", new Object[]{e.getClass().getSimpleName()});
            throw e;
        }
    }

    private boolean isFinalActionNeeded(String actionName) {
        if (this.session == null || this.isBadSession) {
            this.transactionLocal.log().info("No-op %s: no active DB session", new Object[]{actionName});
            return false;
        }
        if (this.options.isScan()) {
            this.transactionLocal.log().info("No-op %s: scan tx", new Object[]{actionName});
            return false;
        }
        if (this.options.isReadOnly()) {
            this.transactionLocal.log().info("No-op %s: read-only tx @%s", new Object[]{actionName, this.options.getIsolationLevel()});
            return false;
        }
        if (this.txId == null) {
            this.transactionLocal.log().info("No-op %s: no active transaction in session", new Object[]{actionName});
            return false;
        }
        return true;
    }

    /*
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void endTransaction(String actionName, Runnable finalAction) {
        try {
            this.closeStreams();
            if (this.isFinalActionNeeded(actionName)) {
                this.doCall(actionName, finalAction);
            }
            this.closeAction = actionName;
            if (this.session == null) return;
        }
        catch (RepositoryException e) {
            try {
                throw e;
                catch (Exception e2) {
                    throw new UnexpectedException("Could not " + actionName + " " + this.txId, (Throwable)e2);
                }
            }
            catch (Throwable throwable) {
                this.closeAction = actionName;
                if (this.session == null) throw throwable;
                this.transactionLocal.log().info("[[%s]] TOTAL (txId=%s,sessionId=%s)", new Object[]{this.sessionSw, this.firstNonNullTxId, this.session.getId()});
                ((YdbRepository)this.repo).getSessionManager().release(this.session);
                this.session = null;
                throw throwable;
            }
        }
        this.transactionLocal.log().info("[[%s]] TOTAL (txId=%s,sessionId=%s)", new Object[]{this.sessionSw, this.firstNonNullTxId, this.session.getId()});
        ((YdbRepository)this.repo).getSessionManager().release(this.session);
        this.session = null;
    }

    private TxControl<?> getTxControl() {
        return switch (this.options.getIsolationLevel()) {
            default -> throw new IncompatibleClassChangeError();
            case IsolationLevel.SERIALIZABLE_READ_WRITE -> {
                TxControl.TxSerializableRw txControl = this.txId != null ? TxControl.id((String)this.txId) : TxControl.serializableRw();
                yield txControl.setCommitTx(false);
            }
            case IsolationLevel.ONLINE_CONSISTENT_READ_ONLY -> TxControl.onlineRo().setAllowInconsistentReads(false);
            case IsolationLevel.ONLINE_INCONSISTENT_READ_ONLY -> TxControl.onlineRo().setAllowInconsistentReads(true);
            case IsolationLevel.STALE_CONSISTENT_READ_ONLY -> TxControl.staleRo();
        };
    }

    private String getYql(Statement<?, ?> statement) {
        return "--!syntax_v1\n" + statement.getQuery(((YdbRepository)this.repo).getTablespace());
    }

    private <PARAMS> Params getSdkParams(Statement<PARAMS, ?> statement, PARAMS params) {
        Map<String, ValueProtos.TypedValue> values = params == null ? Map.of() : statement.toQueryParameters(params);
        return YdbConverter.convertToParams(values);
    }

    private void flushPendingWrites() {
        this.transactionLocal.projectionCache().applyProjectionChanges((RepositoryTransaction)this);
        QueriesMerger.create(this.cache).merge(this.pendingWrites).forEach(this::execute);
    }

    @Override
    public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement, PARAMS params) {
        List result = statement.readFromCache(params, this.cache);
        if (result != null) {
            String actionStr = statement.toDebugString(params);
            String resultStr = this.debugResult(result);
            this.transactionLocal.log().debug("[statement cache] %s -> %s", new Object[]{actionStr, resultStr});
            return result;
        }
        result = this.doCall(statement.toDebugString(params), () -> {
            if (this.options.isScan()) {
                if (this.options.getScanOptions().isUseNewSpliterator()) {
                    return this.doExecuteScanQueryList(statement, params);
                }
                return this.doExecuteScanQueryLegacy(statement, params);
            }
            return this.doExecuteDataQuery(statement, params);
        });
        this.trace(statement, result);
        statement.storeToCache(params, result, this.cache);
        return result;
    }

    private <PARAMS, RESULT> List<RESULT> doExecuteDataQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
        String yql = this.getYql(statement);
        TxControl<?> txControl = this.getTxControl();
        Params sdkParams = this.getSdkParams(statement, params);
        ExecuteDataQuerySettings settings = new ExecuteDataQuerySettings();
        if (!statement.isPreparable()) {
            settings.disableQueryCache();
        }
        Deadline grpcDeadline = Context.current().getDeadline();
        Duration grpcTimeout = null;
        if (grpcDeadline != null) {
            grpcTimeout = Duration.ofNanos(grpcDeadline.timeRemaining(TimeUnit.NANOSECONDS));
        }
        TxOptions.TimeoutOptions timeoutOptions = this.options.minTimeoutOptions(grpcTimeout);
        settings.setTimeout(timeoutOptions.getTimeout());
        settings.setCancelAfter(timeoutOptions.getCancelAfter());
        Result result = (Result)YdbOperations.safeJoin(this.session.executeDataQuery(yql, txControl, sdkParams, settings));
        if (result.isSuccess()) {
            this.txId = Strings.emptyToNull((String)((DataQueryResult)result.getValue()).getTxId());
            if (this.firstNonNullTxId == null) {
                this.firstNonNullTxId = this.txId;
            }
        }
        YdbValidator.validatePkConstraint(result.getStatus().getIssues());
        this.validate(yql, result.getStatus().getCode(), result.toString());
        DataQueryResult queryResult = (DataQueryResult)result.getValue();
        if (queryResult.getResultSetCount() > 1) {
            throw new YdbRepositoryException("Multi-table queries are not supported", (Object)yql, (Object)queryResult);
        }
        if (queryResult.getResultSetCount() == 0) {
            return null;
        }
        YdbValidator.validateTruncatedResults(yql, queryResult);
        ResultSetReader resultSet = queryResult.getResultSet(0);
        return new ResultSetConverter(resultSet).stream(statement::readResult).collect(Collectors.toList());
    }

    private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryLegacy(Statement<PARAMS, RESULT> statement, PARAMS params) {
        ExecuteScanQuerySettings settings = ((ExecuteScanQuerySettings.Builder)ExecuteScanQuerySettings.newBuilder().withRequestTimeout(this.options.getScanOptions().getTimeout())).setMode(ExecuteScanQuerySettings.Mode.EXEC).build();
        String yql = this.getYql(statement);
        Params sdkParams = this.getSdkParams(statement, params);
        ArrayList result = new ArrayList();
        Status status = (Status)YdbOperations.safeJoin(this.session.executeScanQuery(yql, sdkParams, settings, rs -> {
            if ((long)(result.size() + rs.getRowCount()) > this.options.getScanOptions().getMaxSize()) {
                throw new ResultTruncatedException(String.format("Query result size became greater than %d", this.options.getScanOptions().getMaxSize()), (Object)yql, (Object)result.size());
            }
            new ResultSetConverter((ResultSetReader)rs).stream(statement::readResult).forEach(result::add);
        }));
        this.validate("SCAN_QUERY: " + yql, status.getCode(), status.toString());
        return result;
    }

    private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryList(Statement<PARAMS, RESULT> statement, PARAMS params) {
        ArrayList result = new ArrayList();
        try (Stream<RESULT> stream = this.executeScanQuery(statement, params);){
            stream.forEach(r -> {
                if ((long)result.size() >= this.options.getScanOptions().getMaxSize()) {
                    throw new ResultTruncatedException(String.format("Query result size became greater than %d", this.options.getScanOptions().getMaxSize()), (Object)this.getYql(statement), (Object)result.size());
                }
                result.add(r);
            });
        }
        return result;
    }

    @Override
    public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
        if (!this.options.isScan()) {
            throw new IllegalStateException("Scan query can be used only from scan tx");
        }
        ExecuteScanQuerySettings settings = ((ExecuteScanQuerySettings.Builder)ExecuteScanQuerySettings.newBuilder().withRequestTimeout(this.options.getScanOptions().getTimeout())).setMode(ExecuteScanQuerySettings.Mode.EXEC).build();
        String yql = this.getYql(statement);
        Params sdkParams = this.getSdkParams(statement, params);
        YdbSpliterator spliterator = this.createSpliterator("scanQuery: " + yql, false);
        this.initSession();
        this.session.executeScanQuery(yql, sdkParams, settings, rs -> new ResultSetConverter((ResultSetReader)rs).stream(statement::readResult).forEach(spliterator::onNext)).whenComplete(spliterator::onSupplierThreadComplete);
        return spliterator.createStream();
    }

    @Override
    public <PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value) {
        if (this.options.isScan()) {
            throw new IllegalTransactionScanException("Mutable operations");
        }
        if (this.options.isReadOnly()) {
            throw new IllegalTransactionIsolationLevelException("Mutable operations", this.options.getIsolationLevel());
        }
        YdbRepository.Query<PARAMS> query = new YdbRepository.Query<PARAMS>(statement, value);
        if (this.options.isImmediateWrites()) {
            this.execute(query);
            this.transactionLocal.projectionCache().applyProjectionChanges((RepositoryTransaction)this);
        } else {
            this.pendingWrites.add(query);
        }
    }

    private <PARAMS> void execute(YdbRepository.Query<PARAMS> query) {
        if (query.getValues().size() == 1) {
            this.execute(query.getStatement(), query.getValues().get(0));
        } else {
            this.execute(query.getStatement(), query.getValues());
        }
    }

    @Override
    public <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams params) {
        String tableName = mapper.getTableName(((YdbRepository)this.repo).getTablespace());
        this.doCall("bulk upsert to table " + mapper.getTableName(""), () -> {
            Value[] values = (Value[])input.stream().map(x -> StructValue.of(mapper.map(x).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> YdbConverter.toSDK((ValueProtos.TypedValue)entry.getValue()))))).toArray(Value[]::new);
            BulkUpsertSettings settings = new BulkUpsertSettings();
            settings.setTimeout(params.getTimeout());
            settings.setCancelAfter(params.getCancelAfter());
            settings.setTraceId(params.getTraceId());
            try {
                Status status = (Status)YdbOperations.safeJoin(this.session.executeBulkUpsert(tableName, ListValue.of((Value[])values), settings));
                this.validate("bulkInsert", status.getCode(), status.toString());
            }
            catch (RepositoryException e) {
                throw e;
            }
            catch (Exception e) {
                throw new UnexpectedException("Could not bulk insert into table " + tableName, (Throwable)e);
            }
        });
    }

    public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT> mapper, ReadTableParams<PARAMS> params) throws RepositoryException {
        Spliterator<Object> spliterator;
        List values;
        if (this.options.isReadWrite()) {
            throw new IllegalTransactionIsolationLevelException("readTable", this.options.getIsolationLevel());
        }
        String tableName = mapper.getTableName(((YdbRepository)this.repo).getTablespace());
        ReadTableSettings.Builder settings = ((ReadTableSettings.Builder)ReadTableSettings.newBuilder().orderedRead(params.isOrdered()).withRequestTimeout(params.getTimeout())).rowLimit(params.getRowLimit()).columns(mapper.getColumns()).batchLimitBytes(params.getBatchLimitBytes()).batchLimitRows(params.getBatchLimitRows());
        if (params.getFromKey() != null) {
            values = mapper.mapKey(params.getFromKey()).stream().map(typedValue -> YdbConverter.toSDK(typedValue.getType(), typedValue.getValue())).collect(Collectors.toList());
            settings.fromKey(TupleValue.of(values), params.isFromInclusive());
        }
        if (params.getToKey() != null) {
            values = mapper.mapKey(params.getToKey()).stream().map(typedValue -> YdbConverter.toSDK(typedValue.getType(), typedValue.getValue())).collect(Collectors.toList());
            settings.toKey(TupleValue.of(values), params.isToInclusive());
        }
        if (params.isUseNewSpliterator()) {
            spliterator = this.createSpliterator("readTable: " + tableName, params.isOrdered());
            this.initSession();
            this.session.readTable(tableName, settings.build(), resultSet -> new ResultSetConverter((ResultSetReader)resultSet).stream(mapper::mapResult).forEach(spliterator::onNext)).whenComplete((arg_0, arg_1) -> spliterator.onSupplierThreadComplete(arg_0, arg_1));
            return ((YdbSpliterator)spliterator).createStream();
        }
        try {
            spliterator = new YdbLegacySpliterator(params.isOrdered(), action -> this.doCall("read table " + mapper.getTableName(""), () -> {
                Status status = (Status)YdbOperations.safeJoin(this.session.readTable(tableName, settings.build(), rs -> new ResultSetConverter((ResultSetReader)rs).stream(mapper::mapResult).forEach((Consumer<Object>)action)), params.getTimeout().plusMinutes(5L));
                this.validate("readTable", status.getCode(), status.toString());
            }));
            return ((YdbLegacySpliterator)spliterator).makeStream();
        }
        catch (RepositoryException e) {
            throw e;
        }
        catch (Exception e) {
            throw new UnexpectedException("Could not read table " + tableName, (Throwable)e);
        }
    }

    private void doCall(String actionStr, Runnable call) {
        this.doCall(actionStr, () -> {
            call.run();
            return null;
        });
    }

    private void initSession() {
        if (this.closeAction != null) {
            throw new IllegalStateException("Transaction already closed by " + this.closeAction);
        }
        if (this.session == null) {
            this.session = ((YdbRepository)this.repo).getSessionManager().getSession();
            this.sessionSw = Stopwatch.createStarted();
        }
    }

    private <R> R doCall(String actionStr, Supplier<R> call) {
        R r;
        this.initSession();
        Stopwatch sw = Stopwatch.createStarted();
        Object resultStr = "";
        try {
            R result = call.get();
            resultStr = result == null ? "" : " -> " + this.debugResult(result);
            r = result;
        }
        catch (Exception e) {
            try {
                resultStr = " => " + e.getClass().getName();
                throw e;
            }
            catch (Throwable throwable) {
                this.transactionLocal.log().debug("[ %s ] %s", new Object[]{sw, actionStr + (String)resultStr});
                throw throwable;
            }
        }
        this.transactionLocal.log().debug("[ %s ] %s", new Object[]{sw, actionStr + (String)resultStr});
        return r;
    }

    private String debugResult(Object result) {
        if (result instanceof Iterable) {
            int size = Iterables.size((Iterable)((Iterable)result));
            return size == 1 ? String.valueOf(((Iterable)result).iterator().next()) : "[" + size + "]";
        }
        return String.valueOf(result);
    }

    private void trace(final @NonNull Statement<?, ?> statement, final Object results) {
        if (statement == null) {
            throw new NullPointerException("statement is marked non-null but is null");
        }
        log.trace("{}", new Object(){

            public String toString() {
                return String.format("[txId=%s,sessionId=%s] %s%s", YdbRepositoryTransaction.this.firstNonNullTxId, YdbRepositoryTransaction.this.session.getId(), statement, YdbRepositoryTransaction.this.debugResult(results));
            }
        });
    }

    @Generated
    public TxOptions getOptions() {
        return this.options;
    }

    @Override
    @Generated
    public TransactionLocal getTransactionLocal() {
        return this.transactionLocal;
    }
}

