package co.cask.cdap.data.stream.service.upload;

import co.cask.common.io.ByteBufferInputStream;
import co.cask.http.AbstractHttpResponder;
import co.cask.http.BodyConsumer;
import co.cask.http.ChunkResponder;
import co.cask.http.HttpResponder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:co/cask/cdap/data/stream/service/upload/StreamBodyConsumerTestBase.class */
public abstract class StreamBodyConsumerTestBase {

    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();

    /* loaded from: input_file:co/cask/cdap/data/stream/service/upload/StreamBodyConsumerTestBase$ContentInfo.class */
    protected interface ContentInfo {
        InputSupplier<? extends InputStream> getContentSupplier();

        boolean verify(Map<String, String> map, InputSupplier<? extends InputStream> inputSupplier) throws IOException;
    }

    /* loaded from: input_file:co/cask/cdap/data/stream/service/upload/StreamBodyConsumerTestBase$FileContentInfo.class */
    protected abstract class FileContentInfo implements ContentInfo {
        private final File file;

        /* JADX INFO: Access modifiers changed from: protected */
        public FileContentInfo(File file) {
            this.file = file;
        }

        @Override // co.cask.cdap.data.stream.service.upload.StreamBodyConsumerTestBase.ContentInfo
        public InputSupplier<? extends InputStream> getContentSupplier() {
            return Files.newInputStreamSupplier(this.file);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/service/upload/StreamBodyConsumerTestBase$TestContentWriter.class */
    public static class TestContentWriter implements ContentWriter {
        private final List<ChannelBuffer> contents = Lists.newLinkedList();
        private final CountDownLatch completion = new CountDownLatch(1);
        private int events;

        public void append(ByteBuffer byteBuffer, boolean z) throws IOException {
            this.contents.add(z ? ChannelBuffers.wrappedBuffer(byteBuffer) : ChannelBuffers.copiedBuffer(byteBuffer));
            this.events++;
        }

        public void appendAll(Iterator<ByteBuffer> it, boolean z) throws IOException {
            while (it.hasNext()) {
                append(it.next(), z);
            }
        }

        public void close() throws IOException {
            this.completion.countDown();
        }

        public boolean waitForClose(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.completion.await(j, timeUnit);
        }

        public ByteBuffer getContent() {
            return ChannelBuffers.wrappedBuffer((ChannelBuffer[]) this.contents.toArray(new ChannelBuffer[this.contents.size()])).toByteBuffer();
        }

        public void cancel() {
            this.contents.clear();
            this.completion.countDown();
        }

        public int getEvents() {
            return this.events;
        }
    }

    /* loaded from: input_file:co/cask/cdap/data/stream/service/upload/StreamBodyConsumerTestBase$TestHttpResponder.class */
    private static class TestHttpResponder extends AbstractHttpResponder {
        private final AtomicReference<HttpResponseStatus> responseStatus;

        private TestHttpResponder() {
            this.responseStatus = new AtomicReference<>();
        }

        public ChunkResponder sendChunkStart(HttpResponseStatus httpResponseStatus, Multimap<String, String> multimap) {
            return null;
        }

        public void sendContent(HttpResponseStatus httpResponseStatus, ChannelBuffer channelBuffer, String str, Multimap<String, String> multimap) {
            this.responseStatus.compareAndSet(null, httpResponseStatus);
        }

        public void sendFile(File file, Multimap<String, String> multimap) {
        }

        public HttpResponseStatus getResponseStatus() {
            return this.responseStatus.get();
        }
    }

    protected abstract ContentInfo generateFile(int i) throws IOException;

    protected abstract BodyConsumer createBodyConsumer(ContentWriterFactory contentWriterFactory);

    @Test
    public void testChunkedContent() throws Exception {
        ContentInfo generateFile = generateFile(1000);
        final HashMap newHashMap = Maps.newHashMap();
        final TestContentWriter testContentWriter = new TestContentWriter();
        BodyConsumer createBodyConsumer = createBodyConsumer(new ContentWriterFactory() { // from class: co.cask.cdap.data.stream.service.upload.StreamBodyConsumerTestBase.1
            public String getStream() {
                return "test-stream";
            }

            public ContentWriter create(Map<String, String> map) throws IOException {
                newHashMap.putAll(map);
                return testContentWriter;
            }
        });
        TestHttpResponder testHttpResponder = new TestHttpResponder();
        sendChunks(generateFile.getContentSupplier(), 10, createBodyConsumer, testHttpResponder);
        Assert.assertTrue(testContentWriter.waitForClose(5L, TimeUnit.SECONDS));
        Assert.assertEquals(HttpResponseStatus.OK, testHttpResponder.getResponseStatus());
        Assert.assertEquals(1000, testContentWriter.getEvents());
        Assert.assertTrue(generateFile.verify(newHashMap, new InputSupplier<InputStream>() { // from class: co.cask.cdap.data.stream.service.upload.StreamBodyConsumerTestBase.2
            /* renamed from: getInput, reason: merged with bridge method [inline-methods] */
            public InputStream m10getInput() throws IOException {
                return new ByteBufferInputStream(testContentWriter.getContent().duplicate());
            }
        }));
    }

    private void sendChunks(InputSupplier<? extends InputStream> inputSupplier, int i, BodyConsumer bodyConsumer, HttpResponder httpResponder) throws IOException {
        InputStream inputStream = (InputStream) inputSupplier.getInput();
        try {
            try {
                byte[] bArr = new byte[i];
                for (int read = inputStream.read(bArr); read >= 0; read = inputStream.read(bArr)) {
                    bodyConsumer.chunk(ChannelBuffers.copiedBuffer(bArr, 0, read), httpResponder);
                }
                bodyConsumer.finished(httpResponder);
                inputStream.close();
            } catch (Exception e) {
                bodyConsumer.handleError(e);
                inputStream.close();
            }
        } catch (Throwable th) {
            inputStream.close();
            throw th;
        }
    }
}
