package com.twitter.finatra.http.streaming;

import com.twitter.concurrent.AsyncStream;
import com.twitter.finagle.http.Response;
import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.finatra.http.marshalling.MessageBodyManager;
import com.twitter.finatra.http.response.ResponseBuilder;
import com.twitter.finatra.jackson.ScalaObjectMapper;
import com.twitter.finatra.utils.FileResolver;
import com.twitter.io.Buf;
import com.twitter.io.BufReaders;
import com.twitter.io.Bufs;
import com.twitter.io.Reader;
import com.twitter.io.Readers;
import com.twitter.io.StreamTermination$Discarded$;
import com.twitter.io.StreamTermination$FullyRead$;
import com.twitter.util.Await;
import com.twitter.util.Duration;
import com.twitter.util.Function;
import com.twitter.util.Future;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.reflect.Manifest;
import scala.reflect.ManifestFactory;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:com/twitter/finatra/http/streaming/StreamingResponseJavaTest.class */
public class StreamingResponseJavaTest extends Assert {
    private ResponseBuilder responseBuilder = new ResponseBuilder(ScalaObjectMapper.apply(), new FileResolver("src/main/webapp/", ""), (MessageBodyManager) Mockito.mock(MessageBodyManager.class), (StatsReceiver) Mockito.mock(StatsReceiver.class), true);

    private Future<BoxedUnit> burnLoop(Reader<Buf> reader) {
        return reader.read().flatMap(Function.func(option -> {
            return option.nonEmpty() ? burnLoop(reader) : Future.Unit();
        }));
    }

    private <A> A await(Future<A> future) throws Exception {
        return (A) Await.result(future, Duration.apply(5L, TimeUnit.SECONDS));
    }

    private <A> Response fromReader(Reader<A> reader, Manifest<A> manifest) throws Exception {
        return (Response) await(this.responseBuilder.streaming(reader, ToReader.ReaderIdentity(), manifest).toFutureResponse());
    }

    private Reader<Buf> infiniteReader(Buf buf) {
        return Readers.fromSeq(Stream.iterate(buf, buf2 -> {
            return buf2.concat(buf);
        }));
    }

    @Test
    public void emptyReader() throws Exception {
        Response fromReader = fromReader(Readers.newEmptyReader(), ManifestFactory.Object());
        Assert.assertEquals("[]", Buf.decodeString((Buf) await(BufReaders.readAll(fromReader.reader())), StandardCharsets.UTF_8));
        Assert.assertEquals(await(fromReader.reader().onClose()), StreamTermination$FullyRead$.MODULE$);
    }

    @Test
    public void serdeReaderofString() throws Exception {
        Assert.assertEquals("[\"first\",\"second\",\"third\"]", Buf.decodeString((Buf) await(BufReaders.readAll(fromReader(Readers.fromSeq(Arrays.asList("first", "second", "third")), ManifestFactory.classType(String.class)).reader())), StandardCharsets.UTF_8));
    }

    @Test
    public void serdeReaderofObject() throws Exception {
        Assert.assertEquals("[{\"v1\":1,\"v2\":\"first\"},{\"v1\":2,\"v2\":\"second\"}]", Buf.decodeString((Buf) await(BufReaders.readAll(fromReader(Readers.fromSeq(Arrays.asList(new FooClass(1, "first"), new FooClass(2, "second"))), ManifestFactory.classType(FooClass.class)).reader())), StandardCharsets.UTF_8));
    }

    @Test
    public void readerWriteFailureWithReaderDicardedException() throws Exception {
        Response fromReader = fromReader(infiniteReader(Bufs.UTF_8.apply("foo")), ManifestFactory.classType(Buf.class));
        fromReader.reader().discard();
        burnLoop(fromReader.reader());
        Assert.assertEquals(await(fromReader.reader().onClose()), StreamTermination$Discarded$.MODULE$);
    }

    private <A> Response fromStream(AsyncStream<A> asyncStream, Manifest<A> manifest) throws Exception {
        return (Response) await(this.responseBuilder.streaming(asyncStream, ToReader.AsyncStreamToReader(), manifest).toFutureResponse());
    }

    @Test
    public void serdeAsyncStreamOfString() throws Exception {
        Assert.assertEquals("[\"first\",\"second\",\"third\"]", Buf.decodeString((Buf) await(BufReaders.readAll(fromStream(AsyncStream.fromSeq(((Iterator) JavaConverters.asScalaIteratorConverter(Arrays.asList("first", "second", "third").iterator()).asScala()).toSeq()), ManifestFactory.classType(String.class)).reader())), StandardCharsets.UTF_8));
    }

    @Test
    public void constructJsonStreamWithAffix() throws Exception {
        List asList = Arrays.asList("coke", "sprite", "coffee", "tea", "fanta");
        ArrayList arrayList = new ArrayList();
        asList.forEach(str -> {
            arrayList.add(new Lunch(str, str.length() * 2, str.length()));
        });
        Assert.assertEquals("{\"options\":[{\"drink\":\"coke\",\"protein\":8,\"carbs\":4},{\"drink\":\"sprite\",\"protein\":12,\"carbs\":6},{\"drink\":\"coffee\",\"protein\":12,\"carbs\":6},{\"drink\":\"tea\",\"protein\":6,\"carbs\":3},{\"drink\":\"fanta\",\"protein\":10,\"carbs\":5}],\"date\": \"02/12/2020\"}", Buf.decodeString((Buf) await(BufReaders.readAll(fromReader(Readers.concat(Arrays.asList(Readers.newBufReader(Bufs.utf8Buf("{\"options\":"), Integer.MAX_VALUE), this.responseBuilder.streaming(Readers.fromSeq(arrayList), ToReader.ReaderIdentity(), ManifestFactory.classType(Lunch.class)).toBufReader(), Readers.newBufReader(Bufs.utf8Buf(",\"date\": \"02/12/2020\"}"), Integer.MAX_VALUE))), ManifestFactory.classType(Buf.class)).reader())), StandardCharsets.UTF_8));
    }
}
