/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.pgclient;

import io.netty.channel.EventLoop;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.impl.ContextInternal;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.Repeat;
import io.vertx.ext.unit.junit.RepeatRule;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgConnection;
import io.vertx.pgclient.PgPool;
import io.vertx.pgclient.PgPoolTestBase;
import io.vertx.pgclient.impl.PgSocketConnection;
import io.vertx.pgclient.spi.PgDriver;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.PreparedQuery;
import io.vertx.sqlclient.ProxyServer;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.SqlClient;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.Tuple;
import io.vertx.sqlclient.impl.SqlConnectionInternal;
import io.vertx.sqlclient.spi.ConnectionFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import org.junit.Rule;
import org.junit.Test;

public class PgPoolTest
extends PgPoolTestBase {
    @Rule
    public RepeatRule rule = new RepeatRule();
    private Set<PgPool> pools = new HashSet<PgPool>();

    @Override
    public void tearDown(TestContext ctx) {
        int size = this.pools.size();
        if (size > 0) {
            Async async = ctx.async(size);
            Set<PgPool> pools = this.pools;
            this.pools = new HashSet<PgPool>();
            pools.forEach(pool -> pool.close(ar -> async.countDown()));
            async.awaitSuccess(20000L);
        }
        super.tearDown(ctx);
    }

    @Override
    protected PgPool createPool(PgConnectOptions connectOptions, PoolOptions poolOptions) {
        PgPool pool = PgPool.pool((Vertx)this.vertx, (PgConnectOptions)connectOptions, (PoolOptions)poolOptions);
        this.pools.add(pool);
        return pool;
    }

    @Test
    public void testClosePool(TestContext ctx) {
        Async async = ctx.async();
        PgPool pool = this.createPool(this.options, new PoolOptions().setMaxSize(1).setMaxWaitQueueSize(0));
        pool.getConnection(ctx.asyncAssertSuccess(conn -> conn.close(ctx.asyncAssertSuccess(v1 -> pool.close(v2 -> async.complete())))));
        async.await(4000000L);
    }

    @Test
    public void testReconnectQueued(TestContext ctx) {
        Async async = ctx.async();
        ProxyServer proxy = ProxyServer.create((Vertx)this.vertx, (int)this.options.getPort(), (String)this.options.getHost());
        AtomicReference proxyConn = new AtomicReference();
        proxy.proxyHandler(conn -> {
            proxyConn.set(conn);
            conn.connect();
        });
        proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(v1 -> {
            PgPool pool = this.createPool(new PgConnectOptions(this.options).setPort(8080).setHost("localhost"), 1);
            pool.getConnection(ctx.asyncAssertSuccess(conn -> ((ProxyServer.Connection)proxyConn.get()).close()));
            pool.getConnection(ctx.asyncAssertSuccess(conn -> conn.query("SELECT id, randomnumber from WORLD").execute(ctx.asyncAssertSuccess(v2 -> async.complete()))));
        }));
    }

    @Test
    public void testAuthFailure(TestContext ctx) {
        Async async = ctx.async();
        PgPool pool = this.createPool(new PgConnectOptions(this.options).setPassword("wrong"), 1);
        pool.query("SELECT id, randomnumber from WORLD").execute(ctx.asyncAssertFailure(v2 -> async.complete()));
    }

    @Test
    public void testConnectionFailure(TestContext ctx) {
        Async async = ctx.async();
        ProxyServer proxy = ProxyServer.create((Vertx)this.vertx, (int)this.options.getPort(), (String)this.options.getHost());
        AtomicReference proxyConn = new AtomicReference();
        proxy.proxyHandler(conn -> {
            proxyConn.set(conn);
            conn.connect();
        });
        PgPool pool = this.createPool(new PgConnectOptions(this.options).setPort(8080).setHost("localhost"), new PoolOptions().setMaxSize(1).setMaxWaitQueueSize(0));
        pool.getConnection(ctx.asyncAssertFailure(err -> proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(v1 -> pool.getConnection(ctx.asyncAssertSuccess(conn -> async.complete()))))));
    }

    @Test
    public void testRunWithExisting(TestContext ctx) {
        Async async = ctx.async();
        this.vertx.runOnContext(v -> {
            try {
                PgPool.pool((PoolOptions)new PoolOptions());
                ctx.fail();
            }
            catch (IllegalStateException ignore) {
                async.complete();
            }
        });
    }

    @Test
    public void testRunStandalone(TestContext ctx) {
        Async async = ctx.async();
        PgPool pool = this.createPool(new PgConnectOptions(this.options), new PoolOptions());
        pool.query("SELECT id, randomnumber from WORLD").execute(ctx.asyncAssertSuccess(v -> async.complete()));
        async.await(4000L);
    }

    @Test
    public void testMaxWaitQueueSize(TestContext ctx) {
        Async async = ctx.async();
        PgPool pool = this.createPool(this.options, new PoolOptions().setMaxSize(1).setMaxWaitQueueSize(0));
        pool.getConnection(ctx.asyncAssertSuccess(v -> pool.getConnection(ctx.asyncAssertFailure(err -> v.close(ctx.asyncAssertSuccess(vv -> async.complete()))))));
        async.await(4000000L);
    }

    @Test
    public void testConcurrentMultipleConnection(TestContext ctx) {
        PgPool pool = this.createPool(new PgConnectOptions(this.options).setCachePreparedStatements(true), 2);
        int numRequests = 2;
        Async async = ctx.async(numRequests);
        for (int i = 0; i < numRequests; ++i) {
            pool.preparedQuery("SELECT * FROM Fortune WHERE id=$1").execute(Tuple.of((Object)1), ctx.asyncAssertSuccess(results -> {
                ctx.assertEquals((Object)1, (Object)results.size());
                Tuple row = (Tuple)results.iterator().next();
                ctx.assertEquals((Object)1, (Object)row.getInteger(0));
                ctx.assertEquals((Object)"fortune: No such file or directory", (Object)row.getString(1));
                async.countDown();
            }));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUseAvailableResources(TestContext ctx) {
        int poolSize = 10;
        Async async = ctx.async(poolSize + 1);
        PgPool pool = PgPool.pool((PgConnectOptions)this.options, (PoolOptions)new PoolOptions().setMaxSize(poolSize));
        AtomicReference ctrlConnRef = new AtomicReference();
        PgConnection.connect((Vertx)this.vertx, (PgConnectOptions)this.options, (Handler)ctx.asyncAssertSuccess(ctrlConn -> {
            ctrlConnRef.set(ctrlConn);
            for (int i = 0; i < poolSize; ++i) {
                this.vertx.setTimer((long)(10 * (i + 1)), l -> pool.query("select pg_sleep(5)").execute(ctx.asyncAssertSuccess(res -> async.countDown())));
            }
            this.vertx.setTimer((long)(10 * poolSize + 50), event -> ctrlConn.query("select count(*) as cnt from pg_stat_activity where application_name like '%vertx%'").execute(ctx.asyncAssertSuccess(rows -> {
                Integer count = ((Row)rows.iterator().next()).getInteger("cnt");
                ctx.assertEquals((Object)(poolSize + 1), (Object)count);
                async.countDown();
            })));
        }));
        try {
            async.await();
        }
        finally {
            PgConnection ctrlConn2 = (PgConnection)ctrlConnRef.get();
            if (ctrlConn2 != null) {
                ctrlConn2.close();
            }
            pool.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEventLoopSize(TestContext ctx) {
        int num = VertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE;
        int size = num * 2;
        PgPool pool = PgPool.pool((PgConnectOptions)this.options, (PoolOptions)new PoolOptions().setMaxSize(size).setEventLoopSize(2));
        Set eventLoops = Collections.synchronizedSet(new HashSet());
        Async async = ctx.async(size);
        for (int i = 0; i < size; ++i) {
            pool.getConnection(ctx.asyncAssertSuccess(conn -> {
                PgSocketConnection c = (PgSocketConnection)((SqlConnectionInternal)conn).unwrap().unwrap();
                EventLoop eventLoop = ((ContextInternal)c.context()).nettyEventLoop();
                eventLoops.add(eventLoop);
                async.countDown();
            }));
        }
        try {
            async.await();
        }
        finally {
            pool.close();
        }
        ctx.assertEquals((Object)2, (Object)eventLoops.size());
    }

    @Test
    public void testPipelining(TestContext ctx) {
        AtomicLong latency = new AtomicLong(0L);
        ProxyServer proxy = ProxyServer.create((Vertx)this.vertx, (int)this.options.getPort(), (String)this.options.getHost());
        proxy.proxyHandler(conn -> {
            conn.clientHandler(buff -> {
                long delay = latency.get();
                if (delay == 0L) {
                    conn.serverSocket().write(buff);
                } else {
                    this.vertx.setTimer(delay, id -> conn.serverSocket().write(buff));
                }
            });
            conn.connect();
        });
        Async latch = ctx.async();
        proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(res -> latch.complete()));
        latch.awaitSuccess(20000L);
        this.options.setPort(8080);
        this.options.setHost("localhost");
        int num = 3;
        Async async = ctx.async(num);
        SqlClient pool = PgPool.client((PgConnectOptions)this.options, (PoolOptions)new PoolOptions().setMaxSize(1));
        AtomicLong start = new AtomicLong();
        pool.query("select 1").execute(ctx.asyncAssertSuccess(res1 -> {
            start.set(System.currentTimeMillis());
            latency.set(1000L);
            for (int i = 0; i < num; ++i) {
                pool.query("select 1").execute(ctx.asyncAssertSuccess(res2 -> async.countDown()));
            }
        }));
        async.awaitSuccess(20000L);
        long elapsed = System.currentTimeMillis() - start.get();
        ctx.assertTrue(elapsed < 2000L, "Was expecting pipelined latency " + elapsed + " < 2000");
    }

    @Test
    public void testCannotAcquireConnectionOnPipelinedPool(TestContext ctx) {
        PgPool pool = (PgPool)PgPool.client((PgConnectOptions)this.options, (PoolOptions)new PoolOptions().setMaxSize(1));
        pool.getConnection(ctx.asyncAssertFailure());
    }

    @Test
    public void testPoolIdleTimeout(TestContext ctx) {
        ProxyServer proxy = ProxyServer.create((Vertx)this.vertx, (int)this.options.getPort(), (String)this.options.getHost());
        AtomicReference proxyConn = new AtomicReference();
        int pooleCleanerPeriod = 100;
        int idleTimeout = 3000;
        Async latch = ctx.async();
        proxy.proxyHandler(conn -> {
            proxyConn.set(conn);
            long now = System.currentTimeMillis();
            conn.clientCloseHandler(v -> {
                long lifetime = System.currentTimeMillis() - now;
                int delta = 500;
                int lowerBound = idleTimeout - pooleCleanerPeriod - delta;
                int upperBound = idleTimeout + pooleCleanerPeriod + delta;
                ctx.assertTrue(lifetime >= (long)lowerBound, "Was expecting connection to be closed in more than " + lowerBound + ": " + lifetime);
                ctx.assertTrue(lifetime <= (long)upperBound, "Was expecting connection to be closed in less than " + upperBound + ": " + lifetime);
                latch.complete();
            });
            conn.connect();
        });
        Async listenLatch = ctx.async();
        proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(res -> listenLatch.complete()));
        listenLatch.awaitSuccess(20000L);
        this.poolOptions.setPoolCleanerPeriod(pooleCleanerPeriod).setIdleTimeout(idleTimeout).setIdleTimeoutUnit(TimeUnit.MILLISECONDS);
        this.options.setPort(8080);
        this.options.setHost("localhost");
        PgPool pool = this.createPool(this.options, this.poolOptions);
        pool.getConnection().flatMap(SqlClient::close).onComplete(ctx.asyncAssertSuccess());
    }

    @Test
    public void testPoolConnectTimeout(TestContext ctx) {
        Async async = ctx.async(2);
        ProxyServer proxy = ProxyServer.create((Vertx)this.vertx, (int)this.options.getPort(), (String)this.options.getHost());
        List connections = Collections.synchronizedList(new ArrayList());
        proxy.proxyHandler(conn -> {
            connections.add(conn);
            async.countDown();
        });
        Async listenLatch = ctx.async();
        proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(res -> listenLatch.complete()));
        listenLatch.awaitSuccess(20000L);
        this.poolOptions.setConnectionTimeout(1).setConnectionTimeoutUnit(TimeUnit.SECONDS);
        this.options.setPort(8080);
        this.options.setHost("localhost");
        PgPool pool = this.createPool(this.options, this.poolOptions);
        long now = System.currentTimeMillis();
        pool.getConnection(ctx.asyncAssertFailure(err -> {
            ctx.assertTrue(System.currentTimeMillis() - now > 900L);
            async.countDown();
        }));
        async.awaitSuccess(20000L);
        connections.forEach(conn -> conn.clientSocket().close());
    }

    @Test
    @Repeat(value=50)
    public void testNoConnectionLeaks(TestContext ctx) {
        Async killConnections = ctx.async();
        PgConnection.connect((Vertx)this.vertx, (PgConnectOptions)this.options, (Handler)ctx.asyncAssertSuccess(conn -> {
            Collector collector = Collectors.mapping(row -> row.getInteger(0), Collectors.toList());
            String sql = "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND datname = $1";
            PreparedQuery preparedQuery = conn.preparedQuery(sql).collecting(collector);
            Tuple params = Tuple.of((Object)this.options.getDatabase());
            preparedQuery.execute(params).compose(cf -> conn.close()).onComplete(ctx.asyncAssertSuccess(v -> killConnections.complete()));
        }));
        killConnections.awaitSuccess();
        String sql = "SELECT pg_backend_pid() AS pid, (SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE '%vertx%') AS cnt";
        int idleTimeout = 50;
        this.poolOptions.setMaxSize(1).setIdleTimeout(idleTimeout).setIdleTimeoutUnit(TimeUnit.MILLISECONDS).setPoolCleanerPeriod(5);
        PgPool pool = this.createPool(this.options, this.poolOptions);
        Async async = ctx.async();
        AtomicInteger pid = new AtomicInteger();
        this.vertx.getOrCreateContext().runOnContext(v -> pool.query(sql).execute(ctx.asyncAssertSuccess(rs1 -> {
            Row row1 = (Row)rs1.iterator().next();
            pid.set(row1.getInteger("pid"));
            ctx.assertEquals((Object)1, (Object)row1.getInteger("cnt"));
            this.vertx.setTimer((long)(2 * idleTimeout), l -> pool.query(sql).execute(ctx.asyncAssertSuccess(rs2 -> {
                Row row2 = (Row)rs2.iterator().next();
                ctx.assertEquals((Object)1, (Object)row2.getInteger("cnt"));
                ctx.assertNotEquals((Object)pid.get(), (Object)row2.getInteger("pid"));
                async.complete();
            })));
        })));
        async.awaitSuccess();
    }

    @Test
    public void testConnectionHook1(TestContext ctx) {
        Async async = ctx.async(2);
        Handler hook = f -> this.vertx.setTimer(1000L, id -> f.close().onComplete(ctx.asyncAssertSuccess(v -> async.countDown())));
        PgPool pool = this.createPool(this.options, new PoolOptions().setMaxSize(1)).connectHandler(hook);
        pool.getConnection(ctx.asyncAssertSuccess(conn -> conn.query("SELECT id, randomnumber from WORLD").execute(ctx.asyncAssertSuccess(v2 -> async.countDown()))));
    }

    @Test
    public void testConnectionHook2(TestContext ctx) {
        Async async = ctx.async(2);
        Handler hook = f -> this.vertx.setTimer(1000L, id -> f.close().onComplete(ctx.asyncAssertSuccess(v -> async.countDown())));
        PgPool pool = this.createPool(this.options, new PoolOptions().setMaxSize(1)).connectHandler(hook);
        pool.query("SELECT id, randomnumber from WORLD").execute(ctx.asyncAssertSuccess(v2 -> async.countDown()));
    }

    @Test
    public void testConnectionClosedInHook(TestContext ctx) {
        Async async = ctx.async(2);
        ProxyServer proxy = ProxyServer.create((Vertx)this.vertx, (int)this.options.getPort(), (String)this.options.getHost());
        AtomicReference proxyConn = new AtomicReference();
        proxy.proxyHandler(conn -> {
            proxyConn.set(conn);
            conn.connect();
        });
        proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(v1 -> {
            Handler hook = f -> {
                f.closeHandler(v -> async.countDown());
                ((ProxyServer.Connection)proxyConn.get()).close();
            };
            PgPool pool = this.createPool(new PgConnectOptions(this.options).setPort(8080).setHost("localhost"), new PoolOptions().setMaxSize(1)).connectHandler(hook);
            pool.getConnection(ctx.asyncAssertFailure(conn -> async.countDown()));
        }));
    }

    @Test
    public void testConnectionClosedInProvider1(TestContext ctx) {
        this.testConnectionClosedInProvider(ctx, true);
    }

    @Test
    public void testConnectionClosedInProvider2(TestContext ctx) {
        this.testConnectionClosedInProvider(ctx, false);
    }

    private void testConnectionClosedInProvider(TestContext ctx, boolean immediately) {
        Async async = ctx.async(2);
        ProxyServer proxy = ProxyServer.create((Vertx)this.vertx, (int)this.options.getPort(), (String)this.options.getHost());
        AtomicReference proxyConn = new AtomicReference();
        proxy.proxyHandler(conn -> {
            proxyConn.set(conn);
            conn.connect();
        });
        proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(v1 -> {
            PgConnectOptions options = new PgConnectOptions(this.options).setPort(8080).setHost("localhost");
            ConnectionFactory factory = PgDriver.INSTANCE.createConnectionFactory(this.vertx, (SqlConnectOptions)options);
            PgPool pool = this.createPool(options, new PoolOptions().setMaxSize(1));
            pool.connectionProvider(context -> {
                Future fut = factory.connect(context);
                if (immediately) {
                    return fut.map(conn -> {
                        conn.close();
                        return conn;
                    });
                }
                return fut.flatMap(conn -> conn.close().map(conn));
            });
            pool.getConnection(ctx.asyncAssertFailure(conn -> this.vertx.runOnContext(v -> {
                ctx.assertEquals((Object)0, (Object)pool.size());
                async.complete();
            })));
        }));
    }
}

