/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.pgclient;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgConnection;
import io.vertx.pgclient.PgTestBase;
import io.vertx.pgclient.pubsub.PgChannel;
import io.vertx.pgclient.pubsub.PgSubscriber;
import io.vertx.sqlclient.ProxyServer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class PubSubTest
extends PgTestBase {
    Vertx vertx;
    PgSubscriber subscriber;

    @Override
    @Before
    public void setup() throws Exception {
        super.setup();
        this.vertx = Vertx.vertx();
    }

    @After
    public void teardown(TestContext ctx) {
        if (this.subscriber != null) {
            this.subscriber.close();
        }
        this.vertx.close(ctx.asyncAssertSuccess());
    }

    @Test
    public void testNotify(TestContext ctx) {
        this.testNotify(ctx, "the_channel");
    }

    @Test
    public void testNotifyChannelRequiresQuotedID(TestContext ctx) {
        this.testNotify(ctx, "The.Channel");
    }

    public void testNotify(TestContext ctx, String channelName) {
        String quotedChannelName = "\"" + channelName.replace("\"", "\"\"") + "\"";
        Async async = ctx.async(2);
        PgConnection.connect((Vertx)this.vertx, (PgConnectOptions)this.options, (Handler)ctx.asyncAssertSuccess(conn -> conn.query("LISTEN " + quotedChannelName).execute(ctx.asyncAssertSuccess(result1 -> {
            conn.notificationHandler(notification -> {
                ctx.assertEquals((Object)channelName, (Object)notification.getChannel());
                ctx.assertEquals((Object)"the message", (Object)notification.getPayload());
                async.countDown();
            });
            conn.query("NOTIFY " + quotedChannelName + ", 'the message'").execute(ctx.asyncAssertSuccess(result2 -> async.countDown()));
        }))));
    }

    @Test
    public void testConnect(TestContext ctx) {
        this.testConnect(ctx, "channel1", "channel2");
    }

    @Test
    public void testConnectChannelRequiresQuotedID(TestContext ctx) {
        this.testConnect(ctx, "Channel.Test.1", "Channel.Test.2");
    }

    private void testConnect(TestContext ctx, String channel1Name, String channel2Name) {
        String quotedChannel1Name = "\"" + channel1Name.replace("\"", "\"\"") + "\"";
        String quotedChannel2Name = "\"" + channel2Name.replace("\"", "\"\"") + "\"";
        this.subscriber = PgSubscriber.subscriber((Vertx)this.vertx, (PgConnectOptions)this.options);
        Async notifiedLatch = ctx.async();
        PgChannel sub1 = this.subscriber.channel(channel1Name);
        PgChannel sub2 = this.subscriber.channel(channel2Name);
        sub1.handler(notif -> {
            ctx.assertEquals((Object)"msg1", notif);
            notifiedLatch.countDown();
        });
        sub2.handler(notif -> {
            ctx.assertEquals((Object)"msg2", notif);
            notifiedLatch.countDown();
        });
        Async connectLatch = ctx.async();
        this.subscriber.connect(ctx.asyncAssertSuccess(v -> connectLatch.complete()));
        connectLatch.awaitSuccess(10000L);
        this.subscriber.actualConnection().query("NOTIFY " + quotedChannel1Name + ", 'msg1'").execute(ctx.asyncAssertSuccess());
        this.subscriber.actualConnection().query("NOTIFY " + quotedChannel2Name + ", 'msg2'").execute(ctx.asyncAssertSuccess());
        notifiedLatch.awaitSuccess(10000L);
    }

    @Test
    public void testSubscribe(TestContext ctx) {
        this.testSubscribe(ctx, "the_channel");
    }

    @Test
    public void testSubscribeChannelRequiresQuotedID(TestContext ctx) {
        this.testSubscribe(ctx, "The.Channel");
    }

    @Test
    public void testSubscribeChannelContainsQuotes(TestContext ctx) {
        this.testSubscribe(ctx, "\"The\".\"Channel\"");
    }

    @Test
    public void testSubscribeChannelExceedsLengthLimit(TestContext ctx) {
        char[] channelNameChars = new char[68];
        Arrays.fill(channelNameChars, 0, 63, 'a');
        Arrays.fill(channelNameChars, 63, channelNameChars.length, 'b');
        String channelName = new String(channelNameChars);
        this.testSubscribe(ctx, channelName);
    }

    public void testSubscribe(TestContext ctx, String channelName) {
        String quotedChannelName = "\"" + channelName.replace("\"", "\"\"") + "\"";
        this.subscriber = PgSubscriber.subscriber((Vertx)this.vertx, (PgConnectOptions)this.options);
        Async connectLatch = ctx.async();
        this.subscriber.connect(ctx.asyncAssertSuccess(v -> connectLatch.complete()));
        connectLatch.awaitSuccess(10000L);
        PgChannel channel = this.subscriber.channel(channelName);
        Async subscribedLatch = ctx.async();
        ctx.assertEquals((Object)channel, (Object)channel.subscribeHandler(v -> subscribedLatch.complete()));
        Async notifiedLatch = ctx.async();
        channel.handler(notif -> {
            ctx.assertEquals((Object)"msg", notif);
            notifiedLatch.countDown();
        });
        subscribedLatch.awaitSuccess(10000L);
        this.subscriber.actualConnection().query("NOTIFY " + quotedChannelName + ", 'msg'").execute(ctx.asyncAssertSuccess());
        notifiedLatch.awaitSuccess(10000L);
    }

    @Test
    public void testSubscribeNotifyWithUnquotedId(TestContext ctx) {
        this.subscriber = PgSubscriber.subscriber((Vertx)this.vertx, (PgConnectOptions)this.options);
        Async connectLatch = ctx.async();
        this.subscriber.connect(ctx.asyncAssertSuccess(v -> connectLatch.complete()));
        connectLatch.awaitSuccess(10000L);
        PgChannel channel = this.subscriber.channel("the_channel");
        Async subscribedLatch = ctx.async();
        ctx.assertEquals((Object)channel, (Object)channel.subscribeHandler(v -> subscribedLatch.complete()));
        Async notifiedLatch = ctx.async();
        channel.handler(notif -> {
            ctx.assertEquals((Object)"msg", notif);
            notifiedLatch.countDown();
        });
        subscribedLatch.awaitSuccess(10000L);
        this.subscriber.actualConnection().query("NOTIFY The_Channel, 'msg'").execute(ctx.asyncAssertSuccess());
        notifiedLatch.awaitSuccess(10000L);
    }

    @Test
    public void testUnsubscribe(TestContext ctx) {
        this.testUnsubscribe(ctx, "the_channel");
    }

    @Test
    public void testUnsubscribeChannelRequiresQuotedID(TestContext ctx) {
        this.testUnsubscribe(ctx, "The.Channel");
    }

    public void testUnsubscribe(TestContext ctx, String channelName) {
        this.subscriber = PgSubscriber.subscriber((Vertx)this.vertx, (PgConnectOptions)this.options);
        Async connectLatch = ctx.async();
        this.subscriber.connect(ctx.asyncAssertSuccess(v -> connectLatch.complete()));
        connectLatch.awaitSuccess(10000L);
        PgChannel sub = this.subscriber.channel("the_channel");
        Async endLatch = ctx.async();
        sub.endHandler(v -> endLatch.complete());
        Async subscribedLatch = ctx.async();
        sub.subscribeHandler(v -> subscribedLatch.complete());
        sub.handler(notif -> {});
        subscribedLatch.awaitSuccess(10000L);
        sub.handler(null);
        endLatch.awaitSuccess(10000L);
    }

    @Test
    public void testReconnectImmediately(TestContext ctx) {
        this.testReconnect(ctx, 0L, "the_channel");
    }

    @Test
    public void testReconnectImmediatelyChannelRequiresQuotedID(TestContext ctx) {
        this.testReconnect(ctx, 0L, "The.Channel");
    }

    @Test
    public void testReconnectWithDelay(TestContext ctx) {
        this.testReconnect(ctx, 100L, "the_channel");
    }

    @Test
    public void testReconnectWithDelayChannelRequiresQuotedID(TestContext ctx) {
        this.testReconnect(ctx, 100L, "The.Channel");
    }

    public void testReconnect(TestContext ctx, long delay, String channelName) {
        ProxyServer proxy = ProxyServer.create((Vertx)this.vertx, (int)this.options.getPort(), (String)this.options.getHost());
        AtomicReference connRef = new AtomicReference();
        proxy.proxyHandler(conn -> {
            connRef.set(conn);
            conn.connect();
        });
        Async listenLatch = ctx.async();
        proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(v -> {
            this.options.setPort(8080).setHost("localhost");
            listenLatch.complete();
        }));
        listenLatch.awaitSuccess(10000L);
        this.subscriber = PgSubscriber.subscriber((Vertx)this.vertx, (PgConnectOptions)this.options);
        PgChannel sub = this.subscriber.channel(channelName);
        Async connect1Latch = ctx.async();
        Async connect2Latch = ctx.async();
        Async connect3Latch = ctx.async();
        AtomicInteger times = new AtomicInteger();
        sub.subscribeHandler(v -> {
            switch (times.getAndIncrement()) {
                case 0: {
                    connect1Latch.complete();
                    break;
                }
                case 1: {
                    connect2Latch.complete();
                    break;
                }
                case 2: {
                    connect3Latch.complete();
                }
            }
        });
        this.subscriber.connect(ar -> {});
        sub.handler(notif -> {});
        connect1Latch.awaitSuccess(10000L);
        AtomicInteger count = new AtomicInteger();
        this.subscriber.reconnectPolicy(retries -> {
            ctx.assertEquals((Object)0, retries);
            ctx.assertFalse(this.subscriber.closed());
            if (count.getAndIncrement() < 2) {
                return delay;
            }
            return -1L;
        });
        Async closeLatch = ctx.async();
        this.subscriber.closeHandler(v -> closeLatch.complete());
        ((ProxyServer.Connection)connRef.get()).close();
        connect2Latch.awaitSuccess(10000L);
        ((ProxyServer.Connection)connRef.get()).close();
        connect3Latch.awaitSuccess(10000L);
        ((ProxyServer.Connection)connRef.get()).close();
        closeLatch.awaitSuccess(10000L);
        ctx.assertEquals((Object)3, (Object)count.get());
        ctx.assertTrue(this.subscriber.closed());
    }

    @Test
    public void testClose(TestContext ctx) {
        this.testClose(ctx, "the_channel");
    }

    @Test
    public void testCloseChannelRequiresQuotedID(TestContext ctx) {
        this.testClose(ctx, "The.Channel");
    }

    public void testClose(TestContext ctx, String channelName) {
        PgSubscriber subscriber = PgSubscriber.subscriber((Vertx)this.vertx, (PgConnectOptions)this.options);
        PgChannel sub = subscriber.channel(channelName);
        Async endLatch = ctx.async();
        sub.endHandler(v -> endLatch.complete());
        sub.handler(notif -> {});
        Async connectLatch = ctx.async();
        subscriber.connect(ctx.asyncAssertSuccess(v -> connectLatch.complete()));
        connectLatch.awaitSuccess(10000L);
        Async closeLatch = ctx.async();
        subscriber.closeHandler(v -> closeLatch.complete());
        subscriber.close();
        endLatch.awaitSuccess(10000L);
        closeLatch.awaitSuccess(10000L);
    }
}

