package io.quarkus.resteasy.reactive.server.test.stream;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.restassured.RestAssured;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.Cancellable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpMethod;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.sse.SseEventSource;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.jboss.resteasy.reactive.client.impl.MultiInvoker;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.extension.RegisterExtension;

@DisabledOnOs({OS.WINDOWS})
/* loaded from: input_file:io/quarkus/resteasy/reactive/server/test/stream/StreamTestCase.class */
public class StreamTestCase {

    @TestHTTPResource
    URI uri;

    @RegisterExtension
    static final QuarkusUnitTest config = new QuarkusUnitTest().withApplicationRoot(javaArchive -> {
        javaArchive.addClasses(new Class[]{StreamResource.class});
    });

    @Test
    public void testStreamingDoesNotCloseConnection() throws Exception {
        Vertx vertx = Vertx.vertx();
        try {
            CompletableFuture<Object> completableFuture = new CompletableFuture<>();
            HttpClient createHttpClient = vertx.createHttpClient(new HttpClientOptions().setKeepAlive(true).setIdleTimeout(10).setIdleTimeoutUnit(TimeUnit.SECONDS));
            sendRequest(completableFuture, createHttpClient, () -> {
                sendRequest(completableFuture, createHttpClient, () -> {
                    completableFuture.complete(null);
                });
            });
            completableFuture.get();
            vertx.close().toCompletionStage().toCompletableFuture().get();
        } catch (Throwable th) {
            vertx.close().toCompletionStage().toCompletableFuture().get();
            throw th;
        }
    }

    private void sendRequest(final CompletableFuture<Object> completableFuture, HttpClient httpClient, final Runnable runnable) {
        Objects.requireNonNull(completableFuture);
        final Handler handler = completableFuture::completeExceptionally;
        httpClient.request(HttpMethod.GET, RestAssured.port, "localhost", "/stream/text/stream").onFailure(handler).onSuccess(new Handler<HttpClientRequest>() { // from class: io.quarkus.resteasy.reactive.server.test.stream.StreamTestCase.1
            public void handle(HttpClientRequest httpClientRequest) {
                httpClientRequest.end();
                Future onFailure = httpClientRequest.connect().onFailure(handler);
                CompletableFuture completableFuture2 = completableFuture;
                Handler handler2 = handler;
                Runnable runnable2 = runnable;
                onFailure.onSuccess(httpClientResponse -> {
                    httpClientResponse.request().connection().closeHandler(new Handler<Void>() { // from class: io.quarkus.resteasy.reactive.server.test.stream.StreamTestCase.1.1
                        public void handle(Void r6) {
                            completableFuture2.completeExceptionally(new Throwable("Connection was closed"));
                        }
                    });
                    httpClientResponse.body().onFailure(handler2).onSuccess(buffer -> {
                        try {
                            Assertions.assertEquals("foobar", buffer.toString(StandardCharsets.US_ASCII));
                        } catch (Throwable th) {
                            completableFuture2.completeExceptionally(th);
                        }
                        runnable2.run();
                    });
                });
            }
        });
    }

    @Test
    public void testStreaming() throws Exception {
        RestAssured.get("/stream/text/stream", new Object[0]).then().statusCode(200).body(Matchers.equalTo("foobar"), new Matcher[0]);
        RestAssured.get("/stream/text/stream/publisher", new Object[0]).then().statusCode(200).body(Matchers.equalTo("foobar"), new Matcher[0]);
        RestAssured.get("/stream/text/collect", new Object[0]).then().statusCode(200).body(Matchers.equalTo("foobar"), new Matcher[0]);
        RestAssured.get("/stream/byte-arrays/stream", new Object[0]).then().statusCode(200).body(Matchers.equalTo("foobar"), new Matcher[0]);
        RestAssured.get("/stream/byte-arrays/collect", new Object[0]).then().statusCode(200).body(Matchers.equalTo("foobar"), new Matcher[0]);
        RestAssured.get("/stream/char-arrays/stream", new Object[0]).then().statusCode(200).body(Matchers.equalTo("foobar"), new Matcher[0]);
        RestAssured.get("/stream/char-arrays/stream/publisher", new Object[0]).then().statusCode(200).body(Matchers.equalTo("foobar"), new Matcher[0]);
        RestAssured.get("/stream/char-arrays/collect", new Object[0]).then().statusCode(200).body(Matchers.equalTo("foobar"), new Matcher[0]);
        RestAssured.get("/stream/buffer/stream", new Object[0]).then().statusCode(200).body(Matchers.equalTo("foobar"), new Matcher[0]);
        RestAssured.get("/stream/buffer/collect", new Object[0]).then().statusCode(200).body(Matchers.equalTo("foobar"), new Matcher[0]);
    }

    @Test
    public void testClientStreaming() throws Exception {
        List list = (List) ClientBuilder.newBuilder().build().target(this.uri.toString() + "stream/text/stream").request().rx(MultiInvoker.class).get(String.class).collect().asList().await().atMost(Duration.ofSeconds(5L));
        Assertions.assertEquals(2, list.size());
        Assertions.assertEquals("foo", list.get(0));
        Assertions.assertEquals("bar", list.get(1));
    }

    @Test
    public void testInfiniteStreamClosedByClientImmediately() throws Exception {
        Client build = ClientBuilder.newBuilder().build();
        build.target(this.uri.toString() + "stream/infinite/stream").request().rx(MultiInvoker.class).get(String.class).subscribe().with(str -> {
            System.err.println("Received " + str);
        }).cancel();
        Thread.sleep(2000L);
        Assertions.assertEquals("OK", (String) build.target(this.uri.toString() + "stream/infinite/stream-was-cancelled").request().get(String.class));
    }

    @Test
    public void testInfiniteStreamClosedByClientAfterRegistration() throws Exception {
        Client build = ClientBuilder.newBuilder().build();
        Multi multi = build.target(this.uri.toString() + "stream/infinite/stream").request().rx(MultiInvoker.class).get(String.class);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Cancellable with = multi.subscribe().with(str -> {
            System.err.println("Received " + str);
            countDownLatch.countDown();
        });
        Assertions.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        with.cancel();
        Thread.sleep(2000L);
        Assertions.assertEquals("OK", (String) build.target(this.uri.toString() + "stream/infinite/stream-was-cancelled").request().get(String.class));
    }

    @Test
    public void testSse() throws InterruptedException {
        SseEventSource build = SseEventSource.target(ClientBuilder.newBuilder().build().target(this.uri.toString() + "stream/sse")).build();
        try {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
            CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
            build.register(inboundSseEvent -> {
                copyOnWriteArrayList2.add(inboundSseEvent.readData());
            }, th -> {
                copyOnWriteArrayList.add(th);
            }, () -> {
                countDownLatch.countDown();
            });
            build.open();
            Assertions.assertTrue(countDownLatch.await(20L, TimeUnit.SECONDS));
            Assertions.assertEquals(Arrays.asList("a", "b", "c"), copyOnWriteArrayList2);
            Assertions.assertEquals(0, copyOnWriteArrayList.size());
            if (build != null) {
                build.close();
            }
        } catch (Throwable th2) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    @Test
    public void testSseThrows() throws InterruptedException {
        SseEventSource build = SseEventSource.target(ClientBuilder.newBuilder().build().target(this.uri.toString() + "stream/sse/throws")).build();
        try {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
            CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
            build.register(inboundSseEvent -> {
                copyOnWriteArrayList2.add(inboundSseEvent.readData());
            }, th -> {
                copyOnWriteArrayList.add(th);
            }, () -> {
                countDownLatch.countDown();
            });
            build.open();
            Assertions.assertTrue(countDownLatch.await(20L, TimeUnit.SECONDS));
            Assertions.assertEquals(0, copyOnWriteArrayList2.size());
            Assertions.assertEquals(1, copyOnWriteArrayList.size());
            if (build != null) {
                build.close();
            }
        } catch (Throwable th2) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    @Test
    public void testSseForMultiWithOutboundSseEvent() throws InterruptedException {
        SseEventSource build = SseEventSource.target(ClientBuilder.newBuilder().build().target(this.uri.toString() + "stream/sse/raw")).build();
        try {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
            CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
            CopyOnWriteArrayList copyOnWriteArrayList3 = new CopyOnWriteArrayList();
            CopyOnWriteArrayList copyOnWriteArrayList4 = new CopyOnWriteArrayList();
            build.register(inboundSseEvent -> {
                copyOnWriteArrayList2.add(inboundSseEvent.readData());
                copyOnWriteArrayList3.add(inboundSseEvent.getId());
                copyOnWriteArrayList4.add(inboundSseEvent.getName());
            }, th -> {
                copyOnWriteArrayList.add(th);
            }, () -> {
                countDownLatch.countDown();
            });
            build.open();
            Assertions.assertTrue(countDownLatch.await(20L, TimeUnit.SECONDS));
            Assertions.assertEquals(Arrays.asList("uno", "dos", "tres"), copyOnWriteArrayList2);
            Assertions.assertEquals(Arrays.asList("one", "two", "three"), copyOnWriteArrayList3);
            Assertions.assertEquals(Arrays.asList("eins", "zwei", "drei"), copyOnWriteArrayList4);
            Assertions.assertEquals(0, copyOnWriteArrayList.size());
            if (build != null) {
                build.close();
            }
        } catch (Throwable th2) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }
}
