package co.cask.cdap.data.stream.service;

import co.cask.cdap.api.data.format.FormatSpecification;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.data.schema.UnsupportedTypeException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.metrics.MetricsCollectionService;
import co.cask.cdap.common.metrics.MetricsCollector;
import co.cask.cdap.data.format.RecordFormats;
import co.cask.cdap.data.stream.StreamCoordinatorClient;
import co.cask.cdap.data.stream.StreamFileWriterFactory;
import co.cask.cdap.data.stream.service.StreamMetricsCollectorFactory;
import co.cask.cdap.data.stream.service.upload.BufferedContentWriterFactory;
import co.cask.cdap.data.stream.service.upload.ContentWriterFactory;
import co.cask.cdap.data.stream.service.upload.FileContentWriterFactory;
import co.cask.cdap.data.stream.service.upload.StreamBodyConsumerFactory;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.explore.client.ExploreFacade;
import co.cask.cdap.gateway.auth.Authenticator;
import co.cask.cdap.gateway.handlers.AuthenticatedHttpHandler;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import co.cask.cdap.proto.StreamProperties;
import co.cask.http.BodyConsumer;
import co.cask.http.HandlerContext;
import co.cask.http.HttpResponder;
import com.google.common.base.CharMatcher;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closeables;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import com.google.inject.Inject;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.twill.common.Threads;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path("/v2/streams")
/* loaded from: input_file:co/cask/cdap/data/stream/service/StreamHandler.class */
public final class StreamHandler extends AuthenticatedHttpHandler {
    private static final Logger LOG = LoggerFactory.getLogger(StreamHandler.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(StreamProperties.class, new StreamPropertiesAdapter()).registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();
    private final CConfiguration cConf;
    private final StreamAdmin streamAdmin;
    private final MetricsCollector streamHandlerMetricsCollector;
    private final MetricsCollector streamMetricsCollector;
    private final ConcurrentStreamWriter streamWriter;
    private final ExploreFacade exploreFacade;
    private final boolean exploreEnabled;
    private final long batchBufferThreshold;
    private final StreamBodyConsumerFactory streamBodyConsumerFactory;
    private ExecutorService asyncExecutor;
    private final StreamMetaStore streamMetaStore;
    private final StreamWriterSizeCollector sizeCollector;

    /* loaded from: input_file:co/cask/cdap/data/stream/service/StreamHandler$StreamPropertiesAdapter.class */
    private static final class StreamPropertiesAdapter implements JsonSerializer<StreamProperties> {
        private StreamPropertiesAdapter() {
        }

        public JsonElement serialize(StreamProperties streamProperties, Type type, JsonSerializationContext jsonSerializationContext) {
            JsonObject jsonObject = new JsonObject();
            jsonObject.addProperty("name", streamProperties.getName());
            jsonObject.addProperty("ttl", Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(streamProperties.getTTL().longValue())));
            jsonObject.add("format", jsonSerializationContext.serialize(streamProperties.getFormat(), FormatSpecification.class));
            jsonObject.addProperty("threshold", streamProperties.getThreshold());
            return jsonObject;
        }
    }

    @Inject
    public StreamHandler(CConfiguration cConfiguration, Authenticator authenticator, StreamCoordinatorClient streamCoordinatorClient, StreamAdmin streamAdmin, StreamMetaStore streamMetaStore, StreamFileWriterFactory streamFileWriterFactory, MetricsCollectionService metricsCollectionService, ExploreFacade exploreFacade, StreamWriterSizeCollector streamWriterSizeCollector) {
        super(authenticator);
        this.cConf = cConfiguration;
        this.streamAdmin = streamAdmin;
        this.streamMetaStore = streamMetaStore;
        this.exploreFacade = exploreFacade;
        this.sizeCollector = streamWriterSizeCollector;
        this.exploreEnabled = cConfiguration.getBoolean("explore.enabled");
        this.batchBufferThreshold = cConfiguration.getLong("stream.batch.buffer.threshold");
        this.streamBodyConsumerFactory = new StreamBodyConsumerFactory();
        this.streamHandlerMetricsCollector = metricsCollectionService.getCollector(getStreamHandlerMetricsContext());
        this.streamMetricsCollector = metricsCollectionService.getCollector(getStreamMetricsContext());
        this.streamWriter = new ConcurrentStreamWriter(streamCoordinatorClient, streamAdmin, streamMetaStore, streamFileWriterFactory, cConfiguration.getInt("stream.worker.threads"), createStreamMetricsCollectorFactory());
    }

    public void init(HandlerContext handlerContext) {
        super.init(handlerContext);
        int i = this.cConf.getInt("stream.async.worker.threads");
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(i, i, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(this.cConf.getInt("stream.async.queue.size") * i), Threads.createDaemonThreadFactory("async-exec-%d"), createAsyncRejectedExecutionHandler());
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        this.asyncExecutor = threadPoolExecutor;
    }

    public void destroy(HandlerContext handlerContext) {
        Closeables.closeQuietly(this.streamWriter);
        this.asyncExecutor.shutdownNow();
    }

    @GET
    @Path("/{stream}/info")
    public void getInfo(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("stream") String str) throws Exception {
        if (!this.streamMetaStore.streamExists(getAuthenticatedAccountId(httpRequest), str)) {
            httpResponder.sendStatus(HttpResponseStatus.NOT_FOUND);
            return;
        }
        StreamConfig config = this.streamAdmin.getConfig(str);
        httpResponder.sendJson(HttpResponseStatus.OK, new StreamProperties(config.getName(), config.getTTL(), config.getFormat(), config.getNotificationThresholdMB().intValue()), StreamProperties.class, GSON);
    }

    @Path("/{stream}")
    @PUT
    public void create(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("stream") String str) throws Exception {
        String authenticatedAccountId = getAuthenticatedAccountId(httpRequest);
        if (!isValidName(str)) {
            httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "Stream name can only contains alphanumeric, '-' and '_' characters only.");
            return;
        }
        this.streamAdmin.create(str);
        this.streamMetaStore.addStream(authenticatedAccountId, str);
        if (this.exploreEnabled) {
            try {
                this.exploreFacade.enableExploreStream(str);
            } catch (Exception e) {
                LOG.error(String.format("Cannot enable exploration of stream %s: %s", str, e.getMessage()), e);
            }
        }
        httpResponder.sendStatus(HttpResponseStatus.OK);
    }

    @POST
    @Path("/{stream}")
    public void enqueue(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("stream") String str) throws Exception {
        try {
            this.streamWriter.enqueue(getAuthenticatedAccountId(httpRequest), str, getHeaders(httpRequest, str), httpRequest.getContent().toByteBuffer());
            httpResponder.sendStatus(HttpResponseStatus.OK);
        } catch (IOException e) {
            LOG.error("Failed to write to stream {}", str, e);
            httpResponder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
        } catch (IllegalArgumentException e2) {
            httpResponder.sendString(HttpResponseStatus.NOT_FOUND, "Stream does not exists");
        }
    }

    @POST
    @Path("/{stream}/async")
    public void asyncEnqueue(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("stream") String str) throws Exception {
        this.streamWriter.asyncEnqueue(getAuthenticatedAccountId(httpRequest), str, getHeaders(httpRequest, str), httpRequest.getContent().toByteBuffer(), this.asyncExecutor);
        httpResponder.sendStatus(HttpResponseStatus.ACCEPTED);
    }

    @POST
    @Path("/{stream}/batch")
    public BodyConsumer batch(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("stream") String str) throws Exception {
        String authenticatedAccountId = getAuthenticatedAccountId(httpRequest);
        if (!this.streamMetaStore.streamExists(authenticatedAccountId, str)) {
            httpResponder.sendString(HttpResponseStatus.NOT_FOUND, "Stream does not exists");
            return null;
        }
        try {
            return this.streamBodyConsumerFactory.create(httpRequest, createContentWriterFactory(authenticatedAccountId, str, httpRequest));
        } catch (UnsupportedOperationException e) {
            httpResponder.sendString(HttpResponseStatus.NOT_ACCEPTABLE, e.getMessage());
            return null;
        }
    }

    @POST
    @Path("/{stream}/truncate")
    public void truncate(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("stream") String str) throws Exception {
        if (!this.streamMetaStore.streamExists(getAuthenticatedAccountId(httpRequest), str)) {
            httpResponder.sendString(HttpResponseStatus.NOT_FOUND, "Stream does not exists");
            return;
        }
        try {
            this.streamAdmin.truncate(str);
            httpResponder.sendStatus(HttpResponseStatus.OK);
        } catch (IOException e) {
            httpResponder.sendString(HttpResponseStatus.NOT_FOUND, "Stream does not exists");
        }
    }

    @Path("/{stream}/config")
    @PUT
    public void setConfig(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("stream") String str) throws Exception {
        if (!this.streamMetaStore.streamExists(getAuthenticatedAccountId(httpRequest), str)) {
            httpResponder.sendString(HttpResponseStatus.NOT_FOUND, "Stream does not exist.");
            return;
        }
        try {
            StreamConfig config = this.streamAdmin.getConfig(str);
            StreamConfig andValidateConfig = getAndValidateConfig(config, httpRequest, httpResponder);
            if (andValidateConfig == null) {
                return;
            }
            this.streamAdmin.updateConfig(andValidateConfig);
            Schema schema = config.getFormat().getSchema();
            Schema schema2 = andValidateConfig.getFormat().getSchema();
            if (this.exploreEnabled && !schema.equals(schema2)) {
                this.exploreFacade.disableExploreStream(str);
                this.exploreFacade.enableExploreStream(str);
            }
            httpResponder.sendStatus(HttpResponseStatus.OK);
        } catch (IOException e) {
            httpResponder.sendString(HttpResponseStatus.NOT_FOUND, "Stream " + str + " does not exist.");
        }
    }

    private StreamMetricsCollectorFactory createStreamMetricsCollectorFactory() {
        return new StreamMetricsCollectorFactory() { // from class: co.cask.cdap.data.stream.service.StreamHandler.1
            @Override // co.cask.cdap.data.stream.service.StreamMetricsCollectorFactory
            public StreamMetricsCollectorFactory.StreamMetricsCollector createMetricsCollector(final String str) {
                final MetricsCollector childCollector = StreamHandler.this.streamMetricsCollector.childCollector("str", str);
                return new StreamMetricsCollectorFactory.StreamMetricsCollector() { // from class: co.cask.cdap.data.stream.service.StreamHandler.1.1
                    @Override // co.cask.cdap.data.stream.service.StreamMetricsCollectorFactory.StreamMetricsCollector
                    public void emitMetrics(long j, long j2) {
                        if (j > 0) {
                            childCollector.increment("collect.bytes", j);
                            StreamHandler.this.sizeCollector.received(str, j);
                        }
                        if (j2 > 0) {
                            childCollector.increment("collect.events", j2);
                        }
                    }
                };
            }
        };
    }

    private Map<String, String> getStreamHandlerMetricsContext() {
        return ImmutableMap.of("ns", "system", "cmp", "gateway", "hnd", "stream.rest", "ins", this.cConf.get("stream.container.instance.id", "0"));
    }

    private Map<String, String> getStreamMetricsContext() {
        return ImmutableMap.of("ns", "default", "cmp", "gateway", "hnd", "stream.rest", "ins", this.cConf.get("stream.container.instance.id", "0"));
    }

    private StreamConfig getAndValidateConfig(StreamConfig streamConfig, HttpRequest httpRequest, HttpResponder httpResponder) {
        Long valueOf;
        FormatSpecification formatSpecification;
        try {
            StreamProperties streamProperties = (StreamProperties) GSON.fromJson(new InputStreamReader(new ChannelBufferInputStream(httpRequest.getContent())), StreamProperties.class);
            Long ttl = streamProperties.getTTL();
            if (ttl == null) {
                valueOf = Long.valueOf(streamConfig.getTTL());
            } else {
                if (ttl.longValue() < 0) {
                    httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "TTL value should be positive.");
                    return null;
                }
                valueOf = Long.valueOf(TimeUnit.SECONDS.toMillis(ttl.longValue()));
            }
            FormatSpecification format = streamProperties.getFormat();
            if (format == null) {
                formatSpecification = streamConfig.getFormat();
            } else {
                String name = format.getName();
                if (name == null) {
                    httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "A format name must be specified.");
                    return null;
                }
                try {
                    formatSpecification = new FormatSpecification(format.getName(), RecordFormats.createInitializedFormat(format).getSchema(), format.getSettings());
                } catch (Exception e) {
                    httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "Invalid format, unable to instantiate format " + name);
                    return null;
                } catch (UnsupportedTypeException e2) {
                    httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "Format " + name + " does not support the requested schema.");
                    return null;
                }
            }
            Integer threshold = streamProperties.getThreshold();
            if (threshold == null) {
                threshold = streamConfig.getNotificationThresholdMB();
            } else if (threshold.intValue() <= 0) {
                httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "Threshold value should be greater than zero.");
                return null;
            }
            return new StreamConfig(streamConfig.getName(), streamConfig.getPartitionDuration(), streamConfig.getIndexInterval(), valueOf.longValue(), streamConfig.getLocation(), formatSpecification, threshold);
        } catch (Exception e3) {
            httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "Invalid stream configuration. Please check that the configuration is a valid JSON Object with a valid schema.");
            return null;
        }
    }

    private RejectedExecutionHandler createAsyncRejectedExecutionHandler() {
        return new RejectedExecutionHandler() { // from class: co.cask.cdap.data.stream.service.StreamHandler.2
            @Override // java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                if (threadPoolExecutor.isShutdown()) {
                    return;
                }
                StreamHandler.this.streamHandlerMetricsCollector.increment("collect.async.reject", 1L);
                runnable.run();
            }
        };
    }

    private boolean isValidName(String str) {
        return CharMatcher.inRange('A', 'Z').or(CharMatcher.inRange('a', 'z')).or(CharMatcher.is('-')).or(CharMatcher.is('_')).or(CharMatcher.inRange('0', '9')).matchesAllOf(str);
    }

    private Map<String, String> getHeaders(HttpRequest httpRequest, String str) {
        return getHeaders(httpRequest, str, ImmutableMap.builder());
    }

    private Map<String, String> getHeaders(HttpRequest httpRequest, String str, ImmutableMap.Builder<String, String> builder) {
        String str2 = str + ".";
        for (Map.Entry entry : httpRequest.getHeaders()) {
            if (((String) entry.getKey()).startsWith(str2)) {
                builder.put(((String) entry.getKey()).substring(str2.length()), entry.getValue());
            }
        }
        return builder.build();
    }

    private ContentWriterFactory createContentWriterFactory(String str, String str2, HttpRequest httpRequest) throws IOException {
        long contentLength = HttpHeaders.getContentLength(httpRequest, -1L);
        Map<String, String> headers = getHeaders(httpRequest, str2, ImmutableMap.builder().put("content.type", HttpHeaders.getHeader(httpRequest, "Content-Type", "")));
        return (contentLength < 0 || contentLength > this.batchBufferThreshold) ? new FileContentWriterFactory(str, this.streamAdmin.getConfig(str2), this.streamWriter, headers) : new BufferedContentWriterFactory(str, str2, this.streamWriter, headers);
    }
}
