package com.github.pgasync.impl;

import com.github.pgasync.Connection;
import com.github.pgasync.ConnectionPool;
import com.github.pgasync.ConnectionPoolBuilder;
import com.github.pgasync.ResultSet;
import com.github.pgasync.Row;
import com.github.pgasync.SqlException;
import com.github.pgasync.Transaction;
import com.github.pgasync.impl.conversion.DataConverter;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observers.Subscribers;

/* loaded from: input_file:com/github/pgasync/impl/PgConnectionPool.class */
public abstract class PgConnectionPool implements ConnectionPool {
    final int poolSize;

    @GuardedBy("lock")
    int currentSize;

    @GuardedBy("lock")
    boolean closed;
    final InetSocketAddress address;
    final String username;
    final String password;
    final String database;
    final DataConverter dataConverter;
    final Func1<Connection, Observable<Connection>> validator;
    final boolean pipeline;
    final ReentrantLock lock = new ReentrantLock();

    @GuardedBy("lock")
    final Condition closingConnectionReleased = this.lock.newCondition();

    @GuardedBy("lock")
    final Queue<Subscriber<? super Connection>> subscribers = new LinkedList();

    @GuardedBy("lock")
    final Queue<Connection> connections = new LinkedList();

    /* loaded from: input_file:com/github/pgasync/impl/PgConnectionPool$ReleasingTransaction.class */
    class ReleasingTransaction implements Transaction {
        final AtomicBoolean released = new AtomicBoolean();
        final Connection txconn;
        final Transaction transaction;

        ReleasingTransaction(Connection connection, Transaction transaction) {
            this.txconn = connection;
            this.transaction = transaction;
        }

        @Override // com.github.pgasync.Transaction
        public Observable<Void> rollback() {
            return this.transaction.rollback().doOnTerminate(this::releaseConnection);
        }

        @Override // com.github.pgasync.Transaction
        public Observable<Void> commit() {
            return this.transaction.commit().doOnTerminate(this::releaseConnection);
        }

        @Override // com.github.pgasync.QueryExecutor
        public Observable<Row> queryRows(String str, Object... objArr) {
            return this.released.get() ? Observable.error(new SqlException("Transaction is already completed")) : this.transaction.queryRows(str, new Object[0]).doOnError(th -> {
                releaseConnection();
            });
        }

        @Override // com.github.pgasync.QueryExecutor
        public Observable<ResultSet> querySet(String str, Object... objArr) {
            return this.released.get() ? Observable.error(new SqlException("Transaction is already completed")) : this.transaction.querySet(str, objArr).doOnError(th -> {
                releaseConnection();
            });
        }

        void releaseConnection() {
            PgConnectionPool.this.release(this.txconn);
            this.released.set(true);
        }
    }

    public PgConnectionPool(ConnectionPoolBuilder.PoolProperties poolProperties) {
        this.address = new InetSocketAddress(poolProperties.getHostname(), poolProperties.getPort());
        this.username = poolProperties.getUsername();
        this.password = poolProperties.getPassword();
        this.database = poolProperties.getDatabase();
        this.poolSize = poolProperties.getPoolSize();
        this.dataConverter = poolProperties.getDataConverter();
        this.validator = poolProperties.getValidator();
        this.pipeline = poolProperties.getUsePipelining();
    }

    @Override // com.github.pgasync.QueryExecutor
    public Observable<Row> queryRows(String str, Object... objArr) {
        return getConnection().doOnNext(this::releaseIfPipelining).flatMap(connection -> {
            return connection.queryRows(str, objArr).doOnTerminate(() -> {
                releaseIfNotPipelining(connection);
            });
        });
    }

    @Override // com.github.pgasync.QueryExecutor
    public Observable<ResultSet> querySet(String str, Object... objArr) {
        return getConnection().doOnNext(this::releaseIfPipelining).flatMap(connection -> {
            return connection.querySet(str, objArr).doOnTerminate(() -> {
                releaseIfNotPipelining(connection);
            });
        });
    }

    @Override // com.github.pgasync.TransactionExecutor
    public Observable<Transaction> begin() {
        return getConnection().flatMap(connection -> {
            return connection.begin().doOnError(th -> {
                release(connection);
            }).map(transaction -> {
                return new ReleasingTransaction(connection, transaction);
            });
        });
    }

    @Override // com.github.pgasync.Listenable
    public Observable<String> listen(String str) {
        return getConnection().lift(subscriber -> {
            Action1 action1 = connection -> {
                connection.listen(str).doOnSubscribe(() -> {
                    release(connection);
                }).onErrorResumeNext(th -> {
                    return listen(str);
                }).subscribe(subscriber);
            };
            subscriber.getClass();
            return Subscribers.create(action1, subscriber::onError);
        });
    }

    @Override // com.github.pgasync.Db, java.lang.AutoCloseable
    public void close() throws Exception {
        this.lock.lock();
        try {
            this.closed = true;
            while (!this.subscribers.isEmpty()) {
                Subscriber<? super Connection> poll = this.subscribers.poll();
                if (poll != null) {
                    poll.onError(new SqlException("Connection pool is closing"));
                }
            }
            while (this.currentSize > 0) {
                try {
                    Connection poll2 = this.connections.poll();
                    if (poll2 != null) {
                        this.currentSize--;
                        poll2.close();
                    } else if (this.closingConnectionReleased.await(10L, TimeUnit.SECONDS)) {
                        break;
                    }
                } catch (InterruptedException e) {
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // com.github.pgasync.ConnectionPool
    public Observable<Connection> getConnection() {
        return Observable.create(subscriber -> {
            this.lock.lock();
            try {
                if (this.closed) {
                    this.lock.unlock();
                    subscriber.onError(new SqlException("Connection pool is closed"));
                    if (0 != 0) {
                        this.lock.unlock();
                        return;
                    }
                    return;
                }
                Connection poll = this.connections.poll();
                if (poll != null) {
                    this.lock.unlock();
                    subscriber.onNext(poll);
                    subscriber.onCompleted();
                    if (0 != 0) {
                        this.lock.unlock();
                        return;
                    }
                    return;
                }
                if (!tryIncreaseSize()) {
                    this.subscribers.add(subscriber);
                    if (1 != 0) {
                        this.lock.unlock();
                        return;
                    }
                    return;
                }
                this.lock.unlock();
                new PgConnection(openStream(this.address), this.dataConverter).connect(this.username, this.password, this.database).doOnError(th -> {
                    release(null);
                }).subscribe(subscriber);
                if (0 != 0) {
                    this.lock.unlock();
                }
            } catch (Throwable th2) {
                if (1 != 0) {
                    this.lock.unlock();
                }
                throw th2;
            }
        }).flatMap(connection -> {
            return ((Observable) this.validator.call(connection)).doOnError(th -> {
                release(connection);
            });
        }).retry(this.poolSize + 1);
    }

    private boolean tryIncreaseSize() {
        if (this.currentSize >= this.poolSize) {
            return false;
        }
        this.currentSize++;
        return true;
    }

    private void releaseIfPipelining(Connection connection) {
        if (this.pipeline) {
            release(connection);
        }
    }

    private void releaseIfNotPipelining(Connection connection) {
        if (this.pipeline) {
            return;
        }
        release(connection);
    }

    @Override // com.github.pgasync.ConnectionPool
    public void release(Connection connection) {
        boolean z = connection == null || !((PgConnection) connection).isConnected();
        this.lock.lock();
        try {
            if (!this.subscribers.isEmpty()) {
                Subscriber<? super Connection> poll = this.subscribers.poll();
                this.lock.unlock();
                poll.onNext(connection);
                poll.onCompleted();
                return;
            }
            if (z) {
                this.currentSize--;
            } else {
                this.connections.add(connection);
            }
            if (this.closed) {
                this.closingConnectionReleased.signalAll();
            }
        } finally {
            this.lock.unlock();
        }
    }

    protected abstract PgProtocolStream openStream(InetSocketAddress inetSocketAddress);
}
