package co.cask.cdap.data.stream.service;

import co.cask.cdap.api.data.format.FormatSpecification;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.data.schema.UnsupportedTypeException;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.common.BadRequestException;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.namespace.AbstractNamespaceClient;
import co.cask.cdap.data.stream.StreamCoordinatorClient;
import co.cask.cdap.data.stream.StreamFileWriterFactory;
import co.cask.cdap.data.stream.service.StreamMetricsCollectorFactory;
import co.cask.cdap.data.stream.service.upload.ContentWriterFactory;
import co.cask.cdap.data.stream.service.upload.LengthBasedContentWriterFactory;
import co.cask.cdap.data.stream.service.upload.StreamBodyConsumerFactory;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.format.RecordFormats;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.StreamProperties;
import co.cask.http.AbstractHttpHandler;
import co.cask.http.BodyConsumer;
import co.cask.http.HandlerContext;
import co.cask.http.HttpResponder;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closeables;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.twill.common.Threads;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Path("/v3/namespaces/{namespace-id}/streams")
/* loaded from: input_file:co/cask/cdap/data/stream/service/StreamHandler.class */
public final class StreamHandler extends AbstractHttpHandler {
    private static final Logger LOG = LoggerFactory.getLogger(StreamHandler.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(StreamProperties.class, new StreamPropertiesAdapter()).registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();
    private final CConfiguration cConf;
    private final StreamAdmin streamAdmin;
    private final MetricsContext streamHandlerMetricsContext;
    private final LoadingCache<Id.Namespace, MetricsContext> streamMetricsCollectors;
    private final ConcurrentStreamWriter streamWriter;
    private final long batchBufferThreshold;
    private final StreamBodyConsumerFactory streamBodyConsumerFactory = new StreamBodyConsumerFactory();
    private final AbstractNamespaceClient namespaceClient;
    private ExecutorService asyncExecutor;
    private final StreamWriterSizeCollector sizeCollector;

    /* loaded from: input_file:co/cask/cdap/data/stream/service/StreamHandler$StreamPropertiesAdapter.class */
    private static final class StreamPropertiesAdapter implements JsonSerializer<StreamProperties>, JsonDeserializer<StreamProperties> {
        private StreamPropertiesAdapter() {
        }

        public JsonElement serialize(StreamProperties streamProperties, Type type, JsonSerializationContext jsonSerializationContext) {
            JsonObject jsonObject = new JsonObject();
            if (streamProperties.getTTL() != null) {
                jsonObject.addProperty("ttl", Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(streamProperties.getTTL().longValue())));
            }
            if (streamProperties.getFormat() != null) {
                jsonObject.add("format", jsonSerializationContext.serialize(streamProperties.getFormat(), FormatSpecification.class));
            }
            if (streamProperties.getNotificationThresholdMB() != null) {
                jsonObject.addProperty("notification.threshold.mb", streamProperties.getNotificationThresholdMB());
            }
            return jsonObject;
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public StreamProperties m38deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
            JsonObject asJsonObject = jsonElement.getAsJsonObject();
            Long valueOf = asJsonObject.has("ttl") ? Long.valueOf(TimeUnit.SECONDS.toMillis(asJsonObject.get("ttl").getAsLong())) : null;
            FormatSpecification formatSpecification = null;
            if (asJsonObject.has("format")) {
                formatSpecification = (FormatSpecification) jsonDeserializationContext.deserialize(asJsonObject.get("format"), FormatSpecification.class);
            }
            return new StreamProperties(valueOf, formatSpecification, asJsonObject.has("notification.threshold.mb") ? Integer.valueOf(asJsonObject.get("notification.threshold.mb").getAsInt()) : null);
        }
    }

    @Inject
    public StreamHandler(CConfiguration cConfiguration, StreamCoordinatorClient streamCoordinatorClient, StreamAdmin streamAdmin, StreamFileWriterFactory streamFileWriterFactory, final MetricsCollectionService metricsCollectionService, StreamWriterSizeCollector streamWriterSizeCollector, AbstractNamespaceClient abstractNamespaceClient) {
        this.cConf = cConfiguration;
        this.streamAdmin = streamAdmin;
        this.sizeCollector = streamWriterSizeCollector;
        this.batchBufferThreshold = cConfiguration.getLong("stream.batch.buffer.threshold");
        this.streamHandlerMetricsContext = metricsCollectionService.getContext(getStreamHandlerMetricsContext());
        this.streamMetricsCollectors = CacheBuilder.newBuilder().build(new CacheLoader<Id.Namespace, MetricsContext>() { // from class: co.cask.cdap.data.stream.service.StreamHandler.1
            public MetricsContext load(Id.Namespace namespace) {
                return metricsCollectionService.getContext(StreamHandler.this.getStreamMetricsContext(namespace));
            }
        });
        this.streamWriter = new ConcurrentStreamWriter(streamCoordinatorClient, streamAdmin, streamFileWriterFactory, cConfiguration.getInt("stream.worker.threads"), createStreamMetricsCollectorFactory());
        this.namespaceClient = abstractNamespaceClient;
    }

    public void init(HandlerContext handlerContext) {
        super.init(handlerContext);
        int i = this.cConf.getInt("stream.async.worker.threads");
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(i, i, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(this.cConf.getInt("stream.async.queue.size") * i), Threads.createDaemonThreadFactory("async-exec-%d"), createAsyncRejectedExecutionHandler());
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        this.asyncExecutor = threadPoolExecutor;
    }

    public void destroy(HandlerContext handlerContext) {
        Closeables.closeQuietly(this.streamWriter);
        this.asyncExecutor.shutdownNow();
    }

    @GET
    @Path("/{stream}")
    public void getInfo(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        Id.Stream from = Id.Stream.from(str, str2);
        checkStreamExists(from);
        StreamConfig config = this.streamAdmin.getConfig(from);
        httpResponder.sendJson(HttpResponseStatus.OK, new StreamProperties(Long.valueOf(config.getTTL()), config.getFormat(), Integer.valueOf(config.getNotificationThresholdMB())), StreamProperties.class, GSON);
    }

    @Path("/{stream}")
    @PUT
    public void create(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        this.namespaceClient.get(Id.Namespace.from(str));
        try {
            this.streamAdmin.create(Id.Stream.from(str, str2));
            httpResponder.sendStatus(HttpResponseStatus.OK);
        } catch (IllegalArgumentException e) {
            throw new BadRequestException(e);
        }
    }

    @POST
    @Path("/{stream}")
    public void enqueue(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        try {
            this.streamWriter.enqueue(Id.Stream.from(str, str2), getHeaders(httpRequest, str2), httpRequest.getContent().toByteBuffer());
            httpResponder.sendStatus(HttpResponseStatus.OK);
        } catch (IOException e) {
            LOG.error("Failed to write to stream {}", str2, e);
            httpResponder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @POST
    @Path("/{stream}/async")
    public void asyncEnqueue(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        this.streamWriter.asyncEnqueue(Id.Stream.from(str, str2), getHeaders(httpRequest, str2), httpRequest.getContent().toByteBuffer(), this.asyncExecutor);
        httpResponder.sendStatus(HttpResponseStatus.ACCEPTED);
    }

    @POST
    @Path("/{stream}/batch")
    public BodyConsumer batch(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        Id.Stream from = Id.Stream.from(str, str2);
        checkStreamExists(from);
        try {
            return this.streamBodyConsumerFactory.create(httpRequest, createContentWriterFactory(from, httpRequest));
        } catch (UnsupportedOperationException e) {
            httpResponder.sendString(HttpResponseStatus.NOT_ACCEPTABLE, e.getMessage());
            return null;
        }
    }

    @POST
    @Path("/{stream}/truncate")
    public void truncate(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        Id.Stream from = Id.Stream.from(str, str2);
        checkStreamExists(from);
        this.streamAdmin.truncate(from);
        httpResponder.sendStatus(HttpResponseStatus.OK);
    }

    @Path("/{stream}")
    @DELETE
    public void delete(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        Id.Stream from = Id.Stream.from(str, str2);
        checkStreamExists(from);
        this.streamAdmin.drop(from);
        httpResponder.sendStatus(HttpResponseStatus.OK);
    }

    @Path("/{stream}/properties")
    @PUT
    public void setConfig(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        Id.Stream from = Id.Stream.from(str, str2);
        checkStreamExists(from);
        StreamProperties andValidateConfig = getAndValidateConfig(httpRequest, httpResponder);
        if (andValidateConfig == null) {
            return;
        }
        this.streamAdmin.updateConfig(from, andValidateConfig);
        httpResponder.sendStatus(HttpResponseStatus.OK);
    }

    private void checkStreamExists(Id.Stream stream) throws Exception {
        if (!this.streamAdmin.exists(stream)) {
            throw new NotFoundException(stream);
        }
    }

    private StreamMetricsCollectorFactory createStreamMetricsCollectorFactory() {
        return new StreamMetricsCollectorFactory() { // from class: co.cask.cdap.data.stream.service.StreamHandler.2
            @Override // co.cask.cdap.data.stream.service.StreamMetricsCollectorFactory
            public StreamMetricsCollectorFactory.StreamMetricsCollector createMetricsCollector(final Id.Stream stream) {
                final MetricsContext childContext = ((MetricsContext) StreamHandler.this.streamMetricsCollectors.getUnchecked(stream.getNamespace())).childContext("str", stream.getId());
                return new StreamMetricsCollectorFactory.StreamMetricsCollector() { // from class: co.cask.cdap.data.stream.service.StreamHandler.2.1
                    @Override // co.cask.cdap.data.stream.service.StreamMetricsCollectorFactory.StreamMetricsCollector
                    public void emitMetrics(long j, long j2) {
                        if (j > 0) {
                            childContext.increment("collect.bytes", j);
                            StreamHandler.this.sizeCollector.received(stream, j);
                        }
                        if (j2 > 0) {
                            childContext.increment("collect.events", j2);
                        }
                    }
                };
            }
        };
    }

    private Map<String, String> getStreamHandlerMetricsContext() {
        return ImmutableMap.of("ns", Id.Namespace.SYSTEM.getId(), "cmp", "gateway", "hnd", "stream_rest", "ins", this.cConf.get("stream.container.instance.id", "0"));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, String> getStreamMetricsContext(Id.Namespace namespace) {
        return ImmutableMap.of("ns", namespace.getId(), "cmp", "gateway", "hnd", "stream_rest", "ins", this.cConf.get("stream.container.instance.id", "0"));
    }

    private StreamProperties getAndValidateConfig(HttpRequest httpRequest, HttpResponder httpResponder) {
        try {
            StreamProperties streamProperties = (StreamProperties) GSON.fromJson(new InputStreamReader(new ChannelBufferInputStream(httpRequest.getContent())), StreamProperties.class);
            Long ttl = streamProperties.getTTL();
            if (ttl != null && ttl.longValue() < 0) {
                httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "TTL value should be positive.");
                return null;
            }
            FormatSpecification format = streamProperties.getFormat();
            if (format != null) {
                String name = format.getName();
                if (name == null) {
                    httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "A format name must be specified.");
                    return null;
                }
                try {
                    format = new FormatSpecification(format.getName(), RecordFormats.createInitializedFormat(format).getSchema(), format.getSettings());
                } catch (UnsupportedTypeException e) {
                    httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "Format " + name + " does not support the requested schema.");
                    return null;
                } catch (Exception e2) {
                    httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "Invalid format, unable to instantiate format " + name);
                    return null;
                }
            }
            Integer notificationThresholdMB = streamProperties.getNotificationThresholdMB();
            if (notificationThresholdMB == null || notificationThresholdMB.intValue() > 0) {
                return new StreamProperties(ttl, format, notificationThresholdMB);
            }
            httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "Threshold value should be greater than zero.");
            return null;
        } catch (Exception e3) {
            httpResponder.sendString(HttpResponseStatus.BAD_REQUEST, "Invalid stream configuration. Please check that the configuration is a valid JSON Object with a valid schema.");
            return null;
        }
    }

    private RejectedExecutionHandler createAsyncRejectedExecutionHandler() {
        return new RejectedExecutionHandler() { // from class: co.cask.cdap.data.stream.service.StreamHandler.3
            @Override // java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                if (threadPoolExecutor.isShutdown()) {
                    return;
                }
                StreamHandler.this.streamHandlerMetricsContext.increment("collect.async.reject", 1L);
                runnable.run();
            }
        };
    }

    private Map<String, String> getHeaders(HttpRequest httpRequest, String str) {
        return getHeaders(httpRequest, str, ImmutableMap.builder());
    }

    private Map<String, String> getHeaders(HttpRequest httpRequest, String str, ImmutableMap.Builder<String, String> builder) {
        String str2 = str + ".";
        for (Map.Entry entry : httpRequest.getHeaders()) {
            if (((String) entry.getKey()).startsWith(str2)) {
                builder.put(((String) entry.getKey()).substring(str2.length()), entry.getValue());
            }
        }
        return builder.build();
    }

    private ContentWriterFactory createContentWriterFactory(Id.Stream stream, HttpRequest httpRequest) throws IOException {
        return new LengthBasedContentWriterFactory(this.streamAdmin.getConfig(stream), this.streamWriter, getHeaders(httpRequest, stream.getId(), ImmutableMap.builder().put("content.type", HttpHeaders.getHeader(httpRequest, "Content-Type", ""))), this.batchBufferThreshold);
    }
}
