package co.cask.cdap.data.stream.service;

import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.stream.StreamEventTypeAdapter;
import co.cask.cdap.common.utils.TimeMathParser;
import co.cask.cdap.data.file.FileReader;
import co.cask.cdap.data.file.ReadFilter;
import co.cask.cdap.data.stream.MultiLiveStreamFileReader;
import co.cask.cdap.data.stream.StreamEventOffset;
import co.cask.cdap.data.stream.StreamFileOffset;
import co.cask.cdap.data.stream.StreamFileType;
import co.cask.cdap.data.stream.StreamUtils;
import co.cask.cdap.data.stream.TimeRangeReadFilter;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.proto.Id;
import co.cask.http.AbstractHttpHandler;
import co.cask.http.ChunkResponder;
import co.cask.http.HttpResponder;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonWriter;
import com.google.inject.Inject;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import org.apache.twill.filesystem.Location;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferOutputStream;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;

@Path("/v3/namespaces/{namespace-id}/streams")
/* loaded from: input_file:co/cask/cdap/data/stream/service/StreamFetchHandler.class */
public final class StreamFetchHandler extends AbstractHttpHandler {
    private static final Gson GSON = StreamEventTypeAdapter.register(new GsonBuilder()).create();
    private static final int MAX_EVENTS_PER_READ = 100;
    private static final int CHUNK_SIZE = 8192;
    private final CConfiguration cConf;
    private final StreamAdmin streamAdmin;
    private final StreamMetaStore streamMetaStore;

    @Inject
    public StreamFetchHandler(CConfiguration cConfiguration, StreamAdmin streamAdmin, StreamMetaStore streamMetaStore) {
        this.cConf = cConfiguration;
        this.streamAdmin = streamAdmin;
        this.streamMetaStore = streamMetaStore;
    }

    @GET
    @Path("/{stream}/events")
    public void fetch(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2, @QueryParam("start") @DefaultValue("0") String str3, @QueryParam("end") @DefaultValue("9223372036854775807") String str4, @QueryParam("limit") @DefaultValue("2147483647") int i) throws Exception {
        long parseTime = TimeMathParser.parseTime(str3, TimeUnit.MILLISECONDS);
        long parseTime2 = TimeMathParser.parseTime(str4, TimeUnit.MILLISECONDS);
        Id.Stream from = Id.Stream.from(str, str2);
        if (verifyGetEventsRequest(from, parseTime, parseTime2, i, httpResponder)) {
            StreamConfig config = this.streamAdmin.getConfig(from);
            long currentTimeMillis = System.currentTimeMillis();
            long max = Math.max(parseTime, currentTimeMillis - config.getTTL());
            long min = Math.min(parseTime2, currentTimeMillis);
            FileReader<StreamEventOffset, Iterable<StreamFileOffset>> createReader = createReader(config, max);
            Throwable th = null;
            try {
                TimeRangeReadFilter timeRangeReadFilter = new TimeRangeReadFilter(max, min);
                ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(MAX_EVENTS_PER_READ);
                int readEvents = readEvents(createReader, newArrayListWithCapacity, i, timeRangeReadFilter);
                if (readEvents <= 0) {
                    httpResponder.sendStatus(HttpResponseStatus.NO_CONTENT);
                    if (createReader != null) {
                        if (0 == 0) {
                            createReader.close();
                            return;
                        }
                        try {
                            createReader.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                ChunkResponder sendChunkStart = httpResponder.sendChunkStart(HttpResponseStatus.OK, ImmutableMultimap.of("Content-Type", "application/json; charset=utf-8"));
                ChannelBuffer dynamicBuffer = ChannelBuffers.dynamicBuffer();
                JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter((OutputStream) new ChannelBufferOutputStream(dynamicBuffer), Charsets.UTF_8));
                jsonWriter.beginArray();
                while (i > 0 && readEvents > 0) {
                    i -= readEvents;
                    Iterator<StreamEvent> it = newArrayListWithCapacity.iterator();
                    while (it.hasNext()) {
                        GSON.toJson(it.next(), StreamEvent.class, jsonWriter);
                        jsonWriter.flush();
                        if (dynamicBuffer.readableBytes() >= CHUNK_SIZE) {
                            sendChunkStart.sendChunk(dynamicBuffer.copy());
                            dynamicBuffer.clear();
                        }
                    }
                    newArrayListWithCapacity.clear();
                    if (i > 0) {
                        readEvents = readEvents(createReader, newArrayListWithCapacity, i, timeRangeReadFilter);
                    }
                }
                jsonWriter.endArray();
                jsonWriter.close();
                if (dynamicBuffer.readable()) {
                    sendChunkStart.sendChunk(dynamicBuffer);
                }
                Closeables.closeQuietly(sendChunkStart);
                if (createReader != null) {
                    if (0 == 0) {
                        createReader.close();
                        return;
                    }
                    try {
                        createReader.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                if (createReader != null) {
                    if (0 != 0) {
                        try {
                            createReader.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        createReader.close();
                    }
                }
                throw th4;
            }
        }
    }

    private int readEvents(FileReader<StreamEventOffset, Iterable<StreamFileOffset>> fileReader, List<StreamEvent> list, int i, TimeRangeReadFilter timeRangeReadFilter) throws IOException, InterruptedException {
        int i2;
        int read = fileReader.read(list, getReadLimit(i), 0L, TimeUnit.SECONDS, timeRangeReadFilter);
        while (true) {
            i2 = read;
            if (i2 != 0 || !timeRangeReadFilter.isActive()) {
                break;
            }
            timeRangeReadFilter.reset();
            read = fileReader.read(list, getReadLimit(i), 0L, TimeUnit.SECONDS, timeRangeReadFilter);
        }
        return i2;
    }

    private boolean verifyGetEventsRequest(Id.Stream stream, long j, long j2, int i, HttpResponder httpResponder) throws Exception {
        if (j < 0) {
            httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "Start time must be >= 0");
            return false;
        }
        if (j2 < 0) {
            httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "End time must be >= 0");
            return false;
        }
        if (j >= j2) {
            httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "Start time must be smaller than end time");
            return false;
        }
        if (i <= 0) {
            httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "Cannot request for <=0 events");
            return false;
        }
        if (this.streamMetaStore.streamExists(stream)) {
            return true;
        }
        httpResponder.sendStatus(HttpResponseStatus.NOT_FOUND);
        return false;
    }

    private Location getStartPartitionLocation(StreamConfig streamConfig, long j, int i) throws IOException {
        long j2 = 0;
        long j3 = Long.MAX_VALUE;
        Location location = null;
        Location location2 = null;
        for (Location location3 : StreamUtils.createGenerationLocation(streamConfig.getLocation(), i).list()) {
            if (location3.isDirectory() && StreamUtils.isPartition(location3.getName())) {
                long partitionStartTime = StreamUtils.getPartitionStartTime(location3.getName());
                if (partitionStartTime > j2 && partitionStartTime <= j) {
                    j2 = partitionStartTime;
                    location = location3;
                }
                if (partitionStartTime < j3 && partitionStartTime >= j) {
                    j3 = partitionStartTime;
                    location2 = location3;
                }
            }
        }
        return location == null ? location2 : location;
    }

    private FileReader<StreamEventOffset, Iterable<StreamFileOffset>> createReader(StreamConfig streamConfig, long j) throws IOException {
        int generation = StreamUtils.getGeneration(streamConfig);
        Location startPartitionLocation = getStartPartitionLocation(streamConfig, j, generation);
        if (startPartitionLocation == null) {
            return createEmptyReader();
        }
        ArrayList newArrayList = Lists.newArrayList();
        int i = this.cConf.getInt("stream.container.instances");
        String str = this.cConf.get("stream.file.prefix");
        for (int i2 = 0; i2 < i; i2++) {
            newArrayList.add(new StreamFileOffset(StreamUtils.createStreamLocation(startPartitionLocation, str + '.' + i2, 0, StreamFileType.EVENT), 0L, generation));
        }
        MultiLiveStreamFileReader multiLiveStreamFileReader = new MultiLiveStreamFileReader(streamConfig, newArrayList);
        multiLiveStreamFileReader.initialize();
        return multiLiveStreamFileReader;
    }

    private <T, P> FileReader<T, P> createEmptyReader() {
        return new FileReader<T, P>() { // from class: co.cask.cdap.data.stream.service.StreamFetchHandler.1
            @Override // co.cask.cdap.data.file.FileReader
            public void initialize() throws IOException {
            }

            @Override // co.cask.cdap.data.file.FileReader
            public int read(Collection<? super T> collection, int i, long j, TimeUnit timeUnit) throws IOException, InterruptedException {
                return -1;
            }

            @Override // co.cask.cdap.data.file.FileReader
            public int read(Collection<? super T> collection, int i, long j, TimeUnit timeUnit, ReadFilter readFilter) throws IOException, InterruptedException {
                return -1;
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
            }

            @Override // co.cask.cdap.data.file.PositionReporter
            public P getPosition() {
                throw new UnsupportedOperationException("Position not supported for empty FileReader");
            }
        };
    }

    private int getReadLimit(int i) {
        return i > MAX_EVENTS_PER_READ ? MAX_EVENTS_PER_READ : i;
    }
}
