/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.jdbc.context;

import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.jdbc.YdbResultSet;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.common.ColumnInfo;
import tech.ydb.jdbc.exception.ExceptionFactory;
import tech.ydb.jdbc.impl.BaseYdbResultSet;
import tech.ydb.jdbc.impl.YdbQueryResult;
import tech.ydb.jdbc.query.QueryStatement;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.result.ValueReader;

public class StreamQueryResult
implements YdbQueryResult {
    private static final Logger LOGGER = Logger.getLogger(StreamQueryResult.class.getName());
    private static final int DDL_EXPRESSION = -1;
    private static final int UPDATE_EXPRESSION = -2;
    private final String msg;
    private final YdbStatement statement;
    private final Runnable streamStopper;
    private final CompletableFuture<Status> streamFuture = new CompletableFuture();
    private final AtomicBoolean streamCancelled = new AtomicBoolean(false);
    private final int[] resultIndexes;
    private final List<CompletableFuture<Result<LazyResultSet>>> resultFutures = new ArrayList<CompletableFuture<Result<LazyResultSet>>>();
    private int resultIndex = 0;
    private volatile boolean resultClosed = false;

    public StreamQueryResult(String msg, YdbStatement statement, YdbQuery query, Runnable streamStopper) {
        this.msg = msg;
        this.statement = statement;
        this.streamStopper = streamStopper;
        this.resultIndexes = new int[query.getStatements().size()];
        int idx = 0;
        for (QueryStatement exp : query.getStatements()) {
            if (exp.isDDL()) {
                this.resultIndexes[idx++] = -1;
                continue;
            }
            if (exp.hasUpdateCount()) {
                this.resultIndexes[idx++] = -2;
                continue;
            }
            if (!exp.hasResults()) continue;
            this.resultIndexes[idx++] = this.resultFutures.size();
            this.resultFutures.add(new CompletableFuture());
        }
    }

    public void onStreamResultSet(int index, ResultSetReader rsr) {
        Result<LazyResultSet> res;
        CompletableFuture<Result<LazyResultSet>> future = this.resultFutures.get(index);
        if (!future.isDone()) {
            ColumnInfo[] columns = ColumnInfo.fromResultSetReader(rsr);
            LazyResultSet rs = new LazyResultSet(this.statement, columns);
            rs.addResultSet(rsr);
            if (future.complete((Result<LazyResultSet>)Result.success((Object)rs))) {
                return;
            }
        }
        if ((res = future.join()).isSuccess()) {
            ((LazyResultSet)res.getValue()).addResultSet(rsr);
        }
    }

    public void onStreamFinished(Throwable th) {
        this.streamFuture.completeExceptionally(th);
        for (CompletableFuture<Result<LazyResultSet>> future : this.resultFutures) {
            future.completeExceptionally(th);
        }
        this.completeAllSets();
    }

    public void onStreamFinished(Status status) {
        for (CompletableFuture<Result<LazyResultSet>> future : this.resultFutures) {
            if (status.isSuccess()) {
                future.complete((Result<LazyResultSet>)Result.success((Object)new LazyResultSet(this.statement, new ColumnInfo[0]), (Status)status));
                continue;
            }
            future.complete((Result<LazyResultSet>)Result.fail((Status)status));
        }
        this.streamFuture.complete(status);
        this.completeAllSets();
    }

    private void completeAllSets() {
        for (CompletableFuture<Result<LazyResultSet>> future : this.resultFutures) {
            Result<LazyResultSet> rs;
            if (future.isCompletedExceptionally() || !(rs = future.join()).isSuccess()) continue;
            ((LazyResultSet)rs.getValue()).complete();
        }
    }

    private void closeResultSet(int index) throws SQLException {
        try {
            CompletableFuture<Result<LazyResultSet>> future = this.resultFutures.get(index);
            if (future != null) {
                ((LazyResultSet)future.join().getValue()).close();
            }
        }
        catch (UnexpectedResultException ex) {
            throw ExceptionFactory.createException("Cannot call '" + this.msg + "' with " + ex.getStatus(), ex);
        }
    }

    private void checkStream() {
        if (!this.resultClosed) {
            return;
        }
        if (!this.streamFuture.isDone() && this.streamCancelled.compareAndSet(false, true)) {
            LOGGER.log(Level.FINE, "Stream cancel");
            this.streamStopper.run();
        }
    }

    @Override
    public void close() throws SQLException {
        if (this.streamFuture.isDone() && this.resultClosed) {
            return;
        }
        LOGGER.log(Level.FINE, "Stream closing");
        this.resultClosed = true;
        Status status = this.streamFuture.join();
        if (this.streamCancelled.get()) {
            LOGGER.log(Level.FINE, "Stream canceled and finished with status {0}", status);
            return;
        }
        LOGGER.log(Level.FINE, "Stream closed with status {0}", status);
        if (!status.isSuccess()) {
            throw ExceptionFactory.createException("Cannot execute '" + this.msg + "' with " + status, new UnexpectedResultException("Unexpected status", status));
        }
    }

    @Override
    public int getUpdateCount() throws SQLException {
        if (this.resultIndex >= this.resultIndexes.length) {
            return -1;
        }
        int index = this.resultIndexes[this.resultIndex];
        if (index == -1) {
            return 0;
        }
        if (index == -2) {
            return 1;
        }
        return -1;
    }

    @Override
    public YdbResultSet getCurrentResultSet() throws SQLException {
        if (this.resultIndex >= this.resultIndexes.length) {
            return null;
        }
        int index = this.resultIndexes[this.resultIndex];
        if (index < 0 || index >= this.resultFutures.size()) {
            return null;
        }
        try {
            return (YdbResultSet)this.resultFutures.get(index).join().getValue();
        }
        catch (UnexpectedResultException ex) {
            throw ExceptionFactory.createException("Cannot call '" + this.msg + "' with " + ex.getStatus(), ex);
        }
    }

    @Override
    public boolean hasResultSets() throws SQLException {
        if (this.resultIndex >= this.resultIndexes.length) {
            return false;
        }
        return this.resultIndexes[this.resultIndex] >= 0;
    }

    @Override
    public boolean getMoreResults(int current) throws SQLException {
        if (this.resultFutures == null || this.resultIndex >= this.resultFutures.size()) {
            return false;
        }
        switch (current) {
            case 2: {
                break;
            }
            case 1: {
                this.closeResultSet(this.resultIndex);
                break;
            }
            case 3: {
                for (int idx = 0; idx <= this.resultIndex; ++idx) {
                    this.closeResultSet(this.resultIndex);
                }
                break;
            }
            default: {
                throw new SQLException("ResultSet mode is not supported: " + current);
            }
        }
        ++this.resultIndex;
        return this.hasResultSets();
    }

    private class LazyResultSet
    extends BaseYdbResultSet {
        private final BlockingQueue<ResultSetReader> readers;
        private final AtomicLong rowsCount;
        private final CompletableFuture<Void> isCompleted;
        private volatile boolean isClosed;
        private ResultSetReader current;
        private int rowIndex;

        LazyResultSet(YdbStatement statement, ColumnInfo[] columns) {
            super(statement, columns);
            this.readers = new ArrayBlockingQueue<ResultSetReader>(5);
            this.rowsCount = new AtomicLong();
            this.isCompleted = new CompletableFuture();
            this.isClosed = false;
            this.current = null;
            this.rowIndex = 0;
        }

        public void cleanQueue() {
            boolean isEmpty = false;
            while (!isEmpty) {
                isEmpty = this.readers.poll() == null;
            }
        }

        public void addResultSet(ResultSetReader rsr) {
            try {
                do {
                    StreamQueryResult.this.checkStream();
                } while (!this.readers.offer(rsr, 100L, TimeUnit.MILLISECONDS));
            }
            catch (InterruptedException ex) {
                if (StreamQueryResult.this.streamFuture.completeExceptionally(ex)) {
                    LOGGER.log(Level.WARNING, "LazyResultSet offer interrupted");
                    StreamQueryResult.this.streamStopper.run();
                }
                return;
            }
            long total = this.rowsCount.addAndGet(rsr.getRowCount());
            LOGGER.log(Level.FINEST, "LazyResultSet got {0} rows", total);
            if (this.isClosed) {
                this.cleanQueue();
            }
        }

        @Override
        protected ValueReader getValue(int columnIndex) throws SQLException {
            if (this.current == null) {
                throw new SQLException("Current row index is out of bounds: " + this.rowIndex);
            }
            return this.current.getColumn(columnIndex);
        }

        @Override
        public boolean next() throws SQLException {
            while (!this.isClosed) {
                if (this.current != null && this.current.next()) {
                    ++this.rowIndex;
                    return true;
                }
                if (this.isCompleted.isDone() && this.readers.isEmpty()) {
                    this.current = null;
                    if (this.rowsCount.get() > 0L) {
                        this.rowIndex = this.rowsCount.intValue() + 1;
                    }
                    return false;
                }
                try {
                    this.current = this.readers.poll(100L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException ex) {
                    throw new SQLException(ex);
                }
            }
            return false;
        }

        public void complete() {
            this.isCompleted.complete(null);
        }

        @Override
        public void close() {
            this.isClosed = true;
            this.current = null;
            this.cleanQueue();
        }

        @Override
        public int getRow() throws SQLException {
            return this.rowIndex;
        }

        @Override
        public boolean isClosed() throws SQLException {
            return this.isClosed;
        }

        @Override
        public boolean isBeforeFirst() throws SQLException {
            return this.rowsCount.get() > 0L && this.rowIndex < 1;
        }

        @Override
        public boolean isAfterLast() throws SQLException {
            this.isCompleted.join();
            return this.rowsCount.get() > 0L && this.rowIndex > this.rowsCount.intValue();
        }

        @Override
        public boolean isFirst() throws SQLException {
            return this.rowIndex == 1;
        }

        @Override
        public boolean isLast() throws SQLException {
            this.isCompleted.join();
            return this.rowsCount.get() > 0L && this.rowIndex == this.rowsCount.intValue();
        }

        @Override
        public void beforeFirst() throws SQLException {
            throw new SQLFeatureNotSupportedException("ResultSet in TYPE_FORWARD_ONLY mode");
        }

        @Override
        public void afterLast() throws SQLException {
            throw new SQLFeatureNotSupportedException("ResultSet in TYPE_FORWARD_ONLY mode");
        }

        @Override
        public boolean first() throws SQLException {
            throw new SQLFeatureNotSupportedException("ResultSet in TYPE_FORWARD_ONLY mode");
        }

        @Override
        public boolean last() throws SQLException {
            throw new SQLFeatureNotSupportedException("ResultSet in TYPE_FORWARD_ONLY mode");
        }

        @Override
        public boolean absolute(int row) throws SQLException {
            throw new SQLFeatureNotSupportedException("ResultSet in TYPE_FORWARD_ONLY mode");
        }

        @Override
        public boolean relative(int rows) throws SQLException {
            throw new SQLFeatureNotSupportedException("ResultSet in TYPE_FORWARD_ONLY mode");
        }

        @Override
        public boolean previous() throws SQLException {
            throw new SQLFeatureNotSupportedException("ResultSet in TYPE_FORWARD_ONLY mode");
        }

        @Override
        public void setFetchDirection(int direction) throws SQLException {
            if (direction != 1000) {
                throw new SQLFeatureNotSupportedException("ResultSet in TYPE_FORWARD_ONLY mode");
            }
        }

        @Override
        public int getFetchDirection() throws SQLException {
            return 1000;
        }
    }
}

