/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.websocket;

import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarTestBase;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

public abstract class WebSocketTestSuite
extends PulsarTestSuite {
    private static final Logger log = LoggerFactory.getLogger(WebSocketTestSuite.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void testWebSocket(String url) throws Exception {
        String tenant = "websocket-test-" + WebSocketTestSuite.randomName(10);
        String namespace = tenant + "/ns1";
        String topic = namespace + "/topic-" + WebSocketTestSuite.randomName(5);
        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();
        try {
            admin.tenants().createTenant(tenant, (TenantInfo)new TenantInfoImpl(Collections.emptySet(), Collections.singleton(this.pulsarCluster.getClusterName())));
            admin.namespaces().createNamespace(namespace, Collections.singleton(this.pulsarCluster.getClusterName()));
            log.debug("Using url {}", (Object)url);
            WebSocketConsumer consumer = new WebSocketConsumer(url, topic);
            try {
                log.debug("Created ws consumer");
                WebSocketPublisher publisher = new WebSocketPublisher(url, topic);
                try {
                    log.debug("Created ws publisher");
                    publisher.send("SGVsbG8gV29ybGQ=");
                    log.debug("Sent message through publisher");
                    Map<String, Object> response = publisher.getResponse();
                    Assert.assertEquals((Object)response.get("result"), (Object)"ok", (String)("Bad response: " + response));
                    log.debug("Publisher received response {}", response);
                    String received = consumer.getPayloadFromResponse();
                    log.debug("Consumer received message {} ", (Object)received);
                    Assert.assertEquals((String)received, (String)"SGVsbG8gV29ybGQ=");
                }
                finally {
                    if (Collections.singletonList(publisher).get(0) != null) {
                        publisher.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(admin).get(0) != null) {
                admin.close();
            }
        }
    }

    @WebSocket
    protected static class WebSocketConsumer
    extends Client {
        WebSocketConsumer(String url, String topic) throws Exception {
            super(url + "/ws/v2/consumer/persistent/" + topic + "/" + PulsarTestBase.randomName(8));
        }

        String getPayloadFromResponse() throws Exception {
            Map<String, Object> response = this.getResponse();
            return String.valueOf(response.get("payload"));
        }
    }

    @WebSocket
    protected static class WebSocketPublisher
    extends Client {
        WebSocketPublisher(String url, String topic) throws Exception {
            super(url + "/ws/v2/producer/persistent/" + topic);
        }

        void send(String payload) throws IOException {
            this.sendText("{\n  \"payload\": \"" + payload + "\",\n  \"properties\": {\"key1\": \"value1\", \"key2\": \"value2\"},\n  \"context\": \"1\"\n}");
        }
    }

    @WebSocket
    public static class Client
    extends WebSocketAdapter
    implements AutoCloseable {
        final BlockingQueue<String> incomingMessages = new ArrayBlockingQueue<String>(10);
        private final WebSocketClient client;

        Client(String webSocketUri) throws Exception {
            HttpClient httpClient = new HttpClient();
            this.client = new WebSocketClient(httpClient);
            this.client.start();
            this.client.connect((Object)this, URI.create(webSocketUri)).get();
        }

        void sendText(String payload) throws IOException {
            this.getSession().getRemote().sendString(payload);
        }

        public void onWebSocketText(String s) {
            this.incomingMessages.add(s);
        }

        Map<String, Object> getResponse() throws Exception {
            String response = this.incomingMessages.poll(5L, TimeUnit.SECONDS);
            if (response == null) {
                Assert.fail((String)"Did not get websocket response within timeout");
            }
            return (Map)ObjectMapperFactory.getMapper().getObjectMapper().readValue(response, (TypeReference)new TypeReference<Map<String, Object>>(){});
        }

        @Override
        public void close() throws Exception {
            this.client.stop();
        }
    }
}

