/*
 * Decompiled with CFR 0.152.
 */
package com.github.davidmoten.rx.jdbc;

import com.github.davidmoten.rx.jdbc.Parameter;
import com.github.davidmoten.rx.jdbc.QuerySelect;
import com.github.davidmoten.rx.jdbc.QuerySelectProducer;
import com.github.davidmoten.rx.jdbc.Util;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.subscriptions.Subscriptions;

final class QuerySelectOperation {
    private static final Logger log = LoggerFactory.getLogger(QuerySelectOperation.class);

    QuerySelectOperation() {
    }

    static <T> Observable<T> execute(QuerySelect query, List<Parameter> parameters, Func1<? super ResultSet, ? extends T> function) {
        return Observable.create(new QuerySelectOnSubscribe(query, parameters, function));
    }

    private static class QuerySelectOnSubscribe<T>
    implements Observable.OnSubscribe<T> {
        private final Func1<? super ResultSet, ? extends T> function;
        private final QuerySelect query;
        private final List<Parameter> parameters;

        private QuerySelectOnSubscribe(QuerySelect query, List<Parameter> parameters, Func1<? super ResultSet, ? extends T> function) {
            this.query = query;
            this.parameters = parameters;
            this.function = function;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void call(Subscriber<? super T> subscriber) {
            final State state = new State();
            try {
                this.connectAndPrepareStatement(subscriber, state);
                this.executeQuery(subscriber, state);
                subscriber.setProducer(new QuerySelectProducer<T>(this.function, subscriber, state.con, state.ps, state.rs));
                subscriber.add(Subscriptions.create((Action0)new Action0(){

                    public void call() {
                        QuerySelectOnSubscribe.this.closeQuietly(state);
                    }
                }));
            }
            catch (Exception e) {
                try {
                    this.closeQuietly(state);
                }
                finally {
                    this.handleException(e, subscriber);
                }
            }
        }

        private void connectAndPrepareStatement(Subscriber<? super T> subscriber, State state) throws SQLException {
            log.debug("connectionProvider={}", (Object)this.query.context().connectionProvider());
            this.checkSubscription(subscriber, state);
            if (state.keepGoing) {
                log.debug("getting connection");
                state.con = this.query.context().connectionProvider().get();
                log.debug("preparing statement,sql={}", (Object)this.query.sql());
                state.ps = state.con.prepareStatement(this.query.sql());
                log.debug("setting parameters");
                Util.setParameters(state.ps, this.parameters);
            }
        }

        private void executeQuery(Subscriber<? super T> subscriber, State state) throws SQLException {
            this.checkSubscription(subscriber, state);
            if (state.keepGoing) {
                try {
                    log.debug("executing ps");
                    state.rs = state.ps.executeQuery();
                    log.debug("executed ps={}", (Object)state.ps);
                }
                catch (SQLException e) {
                    throw new SQLException("failed to run sql=" + this.query.sql(), e);
                }
            }
        }

        private void handleException(Exception e, Subscriber<? super T> subscriber) {
            log.debug("onError: " + e.getMessage());
            if (subscriber.isUnsubscribed()) {
                log.debug("unsubscribed");
            } else {
                subscriber.onError((Throwable)e);
            }
        }

        private void closeQuietly(State state) {
            if (state.closed.compareAndSet(false, true)) {
                log.debug("closing rs");
                Util.closeQuietly(state.rs);
                log.debug("closing ps");
                Util.closeQuietly(state.ps);
                log.debug("closing con");
                Util.closeQuietlyIfAutoCommit(state.con);
                log.debug("closed");
            }
        }

        private void checkSubscription(Subscriber<? super T> subscriber, State state) {
            if (subscriber.isUnsubscribed()) {
                state.keepGoing = false;
                log.debug("unsubscribing");
            }
        }

        private static class State {
            volatile boolean keepGoing = true;
            volatile Connection con;
            volatile PreparedStatement ps;
            volatile ResultSet rs;
            final AtomicBoolean closed = new AtomicBoolean(false);

            private State() {
            }
        }
    }
}

