package co.cask.cdap.test.internal;

import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.common.stream.StreamEventTypeAdapter;
import co.cask.cdap.data.stream.service.StreamFetchHandler;
import co.cask.cdap.data.stream.service.StreamHandler;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import co.cask.cdap.proto.Id;
import co.cask.cdap.test.StreamManager;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;

/* loaded from: input_file:co/cask/cdap/test/internal/DefaultStreamManager.class */
public class DefaultStreamManager implements StreamManager {
    private static final Gson GSON = StreamEventTypeAdapter.register(new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter())).create();
    private static final Type STREAM_EVENT_LIST_TYPE = new TypeToken<List<StreamEvent>>() { // from class: co.cask.cdap.test.internal.DefaultStreamManager.1
    }.getType();
    private final Id.Stream streamId;
    private final StreamHandler streamHandler;
    private final StreamFetchHandler streamFetchHandler;

    @Inject
    public DefaultStreamManager(StreamHandler streamHandler, StreamFetchHandler streamFetchHandler, @Assisted("streamId") Id.Stream stream) {
        this.streamHandler = streamHandler;
        this.streamFetchHandler = streamFetchHandler;
        this.streamId = stream;
    }

    public void createStream() throws IOException {
        DefaultHttpRequest defaultHttpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, String.format("/v3/namespaces/%s/streams/%s", this.streamId.getNamespaceId(), this.streamId.getId()));
        MockResponder mockResponder = new MockResponder();
        try {
            this.streamHandler.create(defaultHttpRequest, mockResponder, this.streamId.getNamespaceId(), this.streamId.getId());
            if (mockResponder.getStatus() != HttpResponseStatus.OK) {
                throw new IOException("Failed to create stream. Status = " + mockResponder.getStatus());
            }
        } catch (Exception e) {
            Throwables.propagateIfPossible(e, IOException.class);
            throw Throwables.propagate(e);
        }
    }

    public void send(String str) throws IOException {
        send(Charsets.UTF_8.encode(str));
    }

    public void send(byte[] bArr) throws IOException {
        send(bArr, 0, bArr.length);
    }

    public void send(byte[] bArr, int i, int i2) throws IOException {
        send(ByteBuffer.wrap(bArr, i, i2));
    }

    public void send(ByteBuffer byteBuffer) throws IOException {
        send((Map<String, String>) ImmutableMap.of(), byteBuffer);
    }

    public void send(Map<String, String> map, String str) throws IOException {
        send(map, Charsets.UTF_8.encode(str));
    }

    public void send(Map<String, String> map, byte[] bArr) throws IOException {
        send(map, bArr, 0, bArr.length);
    }

    public void send(Map<String, String> map, byte[] bArr, int i, int i2) throws IOException {
        send(map, ByteBuffer.wrap(bArr, i, i2));
    }

    public void send(Map<String, String> map, ByteBuffer byteBuffer) throws IOException {
        DefaultHttpRequest defaultHttpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, String.format("/v3/namespaces/%s/streams/%s", this.streamId.getNamespaceId(), this.streamId.getId()));
        for (Map.Entry<String, String> entry : map.entrySet()) {
            defaultHttpRequest.setHeader(this.streamId.getId() + "." + entry.getKey(), entry.getValue());
        }
        ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer(byteBuffer);
        defaultHttpRequest.setContent(wrappedBuffer);
        defaultHttpRequest.setHeader("Content-Length", Integer.valueOf(wrappedBuffer.readableBytes()));
        MockResponder mockResponder = new MockResponder();
        try {
            this.streamHandler.enqueue(defaultHttpRequest, mockResponder, this.streamId.getNamespaceId(), this.streamId.getId());
            if (mockResponder.getStatus() != HttpResponseStatus.OK) {
                throw new IOException("Failed to write to stream. Status = " + mockResponder.getStatus());
            }
        } catch (Exception e) {
            Throwables.propagateIfPossible(e, IOException.class);
            throw Throwables.propagate(e);
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
    }

    public List<StreamEvent> getEvents(long j, long j2, int i) throws IOException {
        return getEvents(this.streamId, j, j2, i);
    }

    private List<StreamEvent> getEvents(Id.Stream stream, long j, long j2, int i) throws IOException {
        DefaultHttpRequest defaultHttpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, String.format("/v3/namespaces/%s/streams/%s/events?start=%d&end=%d&limit=%d", stream.getNamespaceId(), stream.getId(), Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i)));
        MockResponder mockResponder = new MockResponder();
        try {
            this.streamFetchHandler.fetch(defaultHttpRequest, mockResponder, stream.getNamespaceId(), stream.getId(), j, j2, i);
            if (mockResponder.getStatus() != HttpResponseStatus.OK) {
                throw new IOException("Failed to read from stream. Status = " + mockResponder.getStatus());
            }
            return (List) mockResponder.decodeResponseContent(STREAM_EVENT_LIST_TYPE, GSON);
        } catch (Exception e) {
            Throwables.propagateIfPossible(e, IOException.class);
            throw Throwables.propagate(e);
        }
    }
}
