package co.cask.cdap.data.stream.service;

import co.cask.cdap.api.data.format.FormatSpecification;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.data.schema.UnsupportedTypeException;
import co.cask.cdap.api.data.stream.StreamSpecification;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.common.BadRequestException;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.kerberos.SecurityUtil;
import co.cask.cdap.common.namespace.NamespaceQueryAdmin;
import co.cask.cdap.common.security.AuditDetail;
import co.cask.cdap.common.security.AuditPolicy;
import co.cask.cdap.data.stream.StreamCoordinatorClient;
import co.cask.cdap.data.stream.StreamFileWriterFactory;
import co.cask.cdap.data.stream.service.StreamMetricsCollectorFactory;
import co.cask.cdap.data.stream.service.upload.ContentWriterFactory;
import co.cask.cdap.data.stream.service.upload.LengthBasedContentWriterFactory;
import co.cask.cdap.data.stream.service.upload.StreamBodyConsumerFactory;
import co.cask.cdap.data2.metadata.system.AbstractSystemMetadataWriter;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.format.RecordFormats;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import co.cask.cdap.proto.StreamDetail;
import co.cask.cdap.proto.StreamProperties;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.proto.security.Action;
import co.cask.cdap.security.impersonation.Impersonator;
import co.cask.cdap.security.spi.authentication.AuthenticationContext;
import co.cask.cdap.security.spi.authorization.AuthorizationEnforcer;
import co.cask.http.AbstractHttpHandler;
import co.cask.http.BodyConsumer;
import co.cask.http.HandlerContext;
import co.cask.http.HttpResponder;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closeables;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.twill.common.Threads;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;

@Singleton
@Path("/v3/namespaces/{namespace-id}/streams")
/* loaded from: input_file:co/cask/cdap/data/stream/service/StreamHandler.class */
public final class StreamHandler extends AbstractHttpHandler {
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(StreamProperties.class, new StreamPropertiesAdapter()).registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();
    private final CConfiguration cConf;
    private final StreamAdmin streamAdmin;
    private final MetricsContext streamHandlerMetricsContext;
    private final LoadingCache<NamespaceId, MetricsContext> streamMetricsCollectors;
    private final ConcurrentStreamWriter streamWriter;
    private final long batchBufferThreshold;
    private final StreamBodyConsumerFactory streamBodyConsumerFactory = new StreamBodyConsumerFactory();
    private final NamespaceQueryAdmin namespaceQueryAdmin;
    private ExecutorService asyncExecutor;
    private final StreamWriterSizeCollector sizeCollector;
    private final Impersonator impersonator;
    private final AuthorizationEnforcer authorizationEnforcer;
    private final AuthenticationContext authenticationContext;

    /* loaded from: input_file:co/cask/cdap/data/stream/service/StreamHandler$StreamPropertiesAdapter.class */
    private static final class StreamPropertiesAdapter implements JsonSerializer<StreamProperties>, JsonDeserializer<StreamProperties> {
        private StreamPropertiesAdapter() {
        }

        public JsonElement serialize(StreamProperties streamProperties, Type type, JsonSerializationContext jsonSerializationContext) {
            JsonObject jsonObject = new JsonObject();
            if (streamProperties.getTTL() != null) {
                jsonObject.addProperty(AbstractSystemMetadataWriter.TTL_KEY, Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(streamProperties.getTTL().longValue())));
            }
            if (streamProperties.getFormat() != null) {
                jsonObject.add("format", jsonSerializationContext.serialize(streamProperties.getFormat(), FormatSpecification.class));
            }
            if (streamProperties.getNotificationThresholdMB() != null) {
                jsonObject.addProperty("notification.threshold.mb", streamProperties.getNotificationThresholdMB());
            }
            if (streamProperties.getDescription() != null) {
                jsonObject.addProperty(AbstractSystemMetadataWriter.DESCRIPTION_KEY, streamProperties.getDescription());
            }
            if (streamProperties.getOwnerPrincipal() != null) {
                jsonObject.addProperty("principal", streamProperties.getOwnerPrincipal());
            }
            return jsonObject;
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public StreamProperties m38deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
            JsonObject asJsonObject = jsonElement.getAsJsonObject();
            Long valueOf = asJsonObject.has(AbstractSystemMetadataWriter.TTL_KEY) ? Long.valueOf(TimeUnit.SECONDS.toMillis(asJsonObject.get(AbstractSystemMetadataWriter.TTL_KEY).getAsLong())) : null;
            FormatSpecification formatSpecification = null;
            if (asJsonObject.has("format")) {
                formatSpecification = (FormatSpecification) jsonDeserializationContext.deserialize(asJsonObject.get("format"), FormatSpecification.class);
            }
            return new StreamProperties(valueOf, formatSpecification, asJsonObject.has("notification.threshold.mb") ? Integer.valueOf(asJsonObject.get("notification.threshold.mb").getAsInt()) : null, asJsonObject.has(AbstractSystemMetadataWriter.DESCRIPTION_KEY) ? asJsonObject.get(AbstractSystemMetadataWriter.DESCRIPTION_KEY).getAsString() : null, asJsonObject.has("principal") ? asJsonObject.get("principal").getAsString() : null);
        }
    }

    @Inject
    StreamHandler(CConfiguration cConfiguration, StreamCoordinatorClient streamCoordinatorClient, StreamAdmin streamAdmin, StreamFileWriterFactory streamFileWriterFactory, final MetricsCollectionService metricsCollectionService, StreamWriterSizeCollector streamWriterSizeCollector, NamespaceQueryAdmin namespaceQueryAdmin, Impersonator impersonator, AuthorizationEnforcer authorizationEnforcer, AuthenticationContext authenticationContext) {
        this.cConf = cConfiguration;
        this.streamAdmin = streamAdmin;
        this.sizeCollector = streamWriterSizeCollector;
        this.batchBufferThreshold = cConfiguration.getLong("stream.batch.buffer.threshold");
        this.streamHandlerMetricsContext = metricsCollectionService.getContext(getStreamHandlerMetricsContext());
        this.streamMetricsCollectors = CacheBuilder.newBuilder().build(new CacheLoader<NamespaceId, MetricsContext>() { // from class: co.cask.cdap.data.stream.service.StreamHandler.1
            public MetricsContext load(NamespaceId namespaceId) {
                return metricsCollectionService.getContext(StreamHandler.this.getStreamMetricsContext(namespaceId));
            }
        });
        this.streamWriter = new ConcurrentStreamWriter(streamCoordinatorClient, streamAdmin, streamFileWriterFactory, cConfiguration.getInt("stream.worker.threads"), createStreamMetricsCollectorFactory(), impersonator);
        this.namespaceQueryAdmin = namespaceQueryAdmin;
        this.impersonator = impersonator;
        this.authorizationEnforcer = authorizationEnforcer;
        this.authenticationContext = authenticationContext;
    }

    public void init(HandlerContext handlerContext) {
        super.init(handlerContext);
        int i = this.cConf.getInt("stream.async.worker.threads");
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(i, i, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(this.cConf.getInt("stream.async.queue.size") * i), Threads.createDaemonThreadFactory("async-exec-%d"), createAsyncRejectedExecutionHandler());
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        this.asyncExecutor = threadPoolExecutor;
    }

    public void destroy(HandlerContext handlerContext) {
        Closeables.closeQuietly(this.streamWriter);
        this.asyncExecutor.shutdownNow();
    }

    @GET
    @Path("/")
    public void listStreams(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str) throws Exception {
        this.namespaceQueryAdmin.get(new NamespaceId(str));
        List<StreamSpecification> listStreams = this.streamAdmin.listStreams(new NamespaceId(str));
        ArrayList arrayList = new ArrayList(listStreams.size());
        Iterator<StreamSpecification> it = listStreams.iterator();
        while (it.hasNext()) {
            arrayList.add(new StreamDetail(it.next().getName()));
        }
        httpResponder.sendJson(HttpResponseStatus.OK, arrayList);
    }

    @GET
    @Path("/{stream}")
    public void getInfo(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        StreamId validateAndGetStreamId = validateAndGetStreamId(str, str2);
        checkStreamExists(validateAndGetStreamId);
        httpResponder.sendJson(HttpResponseStatus.OK, this.streamAdmin.getProperties(validateAndGetStreamId), StreamProperties.class, GSON);
    }

    @Path("/{stream}")
    @PUT
    @AuditPolicy({AuditDetail.REQUEST_BODY})
    public void create(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        this.namespaceQueryAdmin.get(new NamespaceId(str));
        StreamId validateAndGetStreamId = validateAndGetStreamId(str, str2);
        Properties properties = new Properties();
        if (httpRequest.getContent().readable()) {
            StreamProperties andValidateConfig = getAndValidateConfig(httpRequest);
            if (andValidateConfig.getTTL() != null) {
                properties.put("stream.event.ttl", Long.toString(andValidateConfig.getTTL().longValue()));
            }
            if (andValidateConfig.getNotificationThresholdMB() != null) {
                properties.put("stream.notification.threshold", Integer.toString(andValidateConfig.getNotificationThresholdMB().intValue()));
            }
            if (andValidateConfig.getDescription() != null) {
                properties.put("stream.description", andValidateConfig.getDescription());
            }
            if (andValidateConfig.getFormat() != null) {
                properties.put("stream.format.specification", GSON.toJson(andValidateConfig.getFormat()));
            }
            if (andValidateConfig.getOwnerPrincipal() != null) {
                properties.put("principal", andValidateConfig.getOwnerPrincipal());
            }
        }
        this.streamAdmin.create(validateAndGetStreamId, properties);
        httpResponder.sendStatus(HttpResponseStatus.OK);
    }

    @POST
    @Path("/{stream}")
    public void enqueue(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        StreamId validateAndGetStreamId = validateAndGetStreamId(str, str2);
        this.authorizationEnforcer.enforce(validateAndGetStreamId, this.authenticationContext.getPrincipal(), Action.WRITE);
        this.streamWriter.enqueue(validateAndGetStreamId, getHeaders(httpRequest, str2), httpRequest.getContent().toByteBuffer());
        httpResponder.sendStatus(HttpResponseStatus.OK);
    }

    @POST
    @Path("/{stream}/async")
    public void asyncEnqueue(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        StreamId validateAndGetStreamId = validateAndGetStreamId(str, str2);
        this.authorizationEnforcer.enforce(validateAndGetStreamId, this.authenticationContext.getPrincipal(), Action.WRITE);
        this.streamWriter.asyncEnqueue(validateAndGetStreamId, getHeaders(httpRequest, str2), httpRequest.getContent().toByteBuffer(), this.asyncExecutor);
        httpResponder.sendStatus(HttpResponseStatus.ACCEPTED);
    }

    @POST
    @Path("/{stream}/batch")
    public BodyConsumer batch(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        StreamId validateAndGetStreamId = validateAndGetStreamId(str, str2);
        checkStreamExists(validateAndGetStreamId);
        this.authorizationEnforcer.enforce(validateAndGetStreamId, this.authenticationContext.getPrincipal(), Action.WRITE);
        try {
            return this.streamBodyConsumerFactory.create(httpRequest, createContentWriterFactory(validateAndGetStreamId, httpRequest));
        } catch (UnsupportedOperationException e) {
            httpResponder.sendString(HttpResponseStatus.NOT_ACCEPTABLE, e.getMessage());
            return null;
        }
    }

    @POST
    @Path("/{stream}/truncate")
    public void truncate(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        StreamId validateAndGetStreamId = validateAndGetStreamId(str, str2);
        checkStreamExists(validateAndGetStreamId);
        this.streamAdmin.truncate(validateAndGetStreamId);
        httpResponder.sendStatus(HttpResponseStatus.OK);
    }

    @Path("/{stream}")
    @DELETE
    public void delete(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        StreamId validateAndGetStreamId = validateAndGetStreamId(str, str2);
        checkStreamExists(validateAndGetStreamId);
        this.streamWriter.close(validateAndGetStreamId);
        this.streamAdmin.drop(validateAndGetStreamId);
        httpResponder.sendStatus(HttpResponseStatus.OK);
    }

    @Path("/{stream}/properties")
    @PUT
    @AuditPolicy({AuditDetail.REQUEST_BODY})
    public void setConfig(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("namespace-id") String str, @PathParam("stream") String str2) throws Exception {
        StreamId validateAndGetStreamId = validateAndGetStreamId(str, str2);
        checkStreamExists(validateAndGetStreamId);
        this.streamAdmin.updateConfig(validateAndGetStreamId, getAndValidateConfig(httpRequest));
        httpResponder.sendStatus(HttpResponseStatus.OK);
    }

    private void checkStreamExists(StreamId streamId) throws Exception {
        if (!this.streamAdmin.exists(streamId)) {
            throw new NotFoundException(streamId);
        }
    }

    private StreamMetricsCollectorFactory createStreamMetricsCollectorFactory() {
        return new StreamMetricsCollectorFactory() { // from class: co.cask.cdap.data.stream.service.StreamHandler.2
            @Override // co.cask.cdap.data.stream.service.StreamMetricsCollectorFactory
            public StreamMetricsCollectorFactory.StreamMetricsCollector createMetricsCollector(final StreamId streamId) {
                final MetricsContext childContext = ((MetricsContext) StreamHandler.this.streamMetricsCollectors.getUnchecked(streamId.getParent())).childContext("str", streamId.getEntityName());
                return new StreamMetricsCollectorFactory.StreamMetricsCollector() { // from class: co.cask.cdap.data.stream.service.StreamHandler.2.1
                    @Override // co.cask.cdap.data.stream.service.StreamMetricsCollectorFactory.StreamMetricsCollector
                    public void emitMetrics(long j, long j2) {
                        if (j > 0) {
                            childContext.increment("collect.bytes", j);
                            StreamHandler.this.sizeCollector.received(streamId, j);
                        }
                        if (j2 > 0) {
                            childContext.increment("collect.events", j2);
                        }
                    }
                };
            }
        };
    }

    private Map<String, String> getStreamHandlerMetricsContext() {
        return ImmutableMap.of("ns", NamespaceId.SYSTEM.getNamespace(), "cmp", "gateway", "hnd", "stream_rest", "ins", this.cConf.get("stream.container.instance.id", "0"));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, String> getStreamMetricsContext(NamespaceId namespaceId) {
        return ImmutableMap.of("ns", namespaceId.getNamespace(), "cmp", "gateway", "hnd", "stream_rest", "ins", this.cConf.get("stream.container.instance.id", "0"));
    }

    private StreamProperties getAndValidateConfig(HttpRequest httpRequest) throws BadRequestException {
        try {
            StreamProperties streamProperties = (StreamProperties) GSON.fromJson(new InputStreamReader(new ChannelBufferInputStream(httpRequest.getContent())), StreamProperties.class);
            Long ttl = streamProperties.getTTL();
            if (ttl != null && ttl.longValue() < 0) {
                throw new BadRequestException("Invalid TTL " + ttl + ". TTL value should be positive.");
            }
            FormatSpecification format = streamProperties.getFormat();
            if (format != null) {
                String name = format.getName();
                if (name == null) {
                    throw new BadRequestException("A format name must be specified.");
                }
                try {
                    format = new FormatSpecification(format.getName(), RecordFormats.createInitializedFormat(format).getSchema(), format.getSettings());
                } catch (UnsupportedTypeException e) {
                    throw new BadRequestException("Format " + name + " does not support the requested schema.");
                } catch (Exception e2) {
                    throw new BadRequestException("Invalid format, unable to instantiate format " + name);
                }
            }
            Integer notificationThresholdMB = streamProperties.getNotificationThresholdMB();
            if (notificationThresholdMB != null && notificationThresholdMB.intValue() <= 0) {
                throw new BadRequestException("Invalid threshold " + notificationThresholdMB + ". Threshold value should be greater than zero.");
            }
            if (streamProperties.getOwnerPrincipal() != null) {
                SecurityUtil.validateKerberosPrincipal(streamProperties.getOwnerPrincipal());
            }
            return new StreamProperties(ttl, format, notificationThresholdMB, streamProperties.getDescription(), streamProperties.getOwnerPrincipal());
        } catch (Exception e3) {
            throw new BadRequestException("Invalid stream configuration. Please check that the configuration is a valid JSON Object with a valid schema. Cause: " + e3.getMessage());
        }
    }

    private RejectedExecutionHandler createAsyncRejectedExecutionHandler() {
        return new RejectedExecutionHandler() { // from class: co.cask.cdap.data.stream.service.StreamHandler.3
            @Override // java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                if (threadPoolExecutor.isShutdown()) {
                    return;
                }
                StreamHandler.this.streamHandlerMetricsContext.increment("collect.async.reject", 1L);
                runnable.run();
            }
        };
    }

    private Map<String, String> getHeaders(HttpRequest httpRequest, String str) {
        return getHeaders(httpRequest, str, ImmutableMap.builder());
    }

    private Map<String, String> getHeaders(HttpRequest httpRequest, String str, ImmutableMap.Builder<String, String> builder) {
        String str2 = str + ".";
        for (Map.Entry entry : httpRequest.getHeaders()) {
            if (((String) entry.getKey()).startsWith(str2)) {
                builder.put(((String) entry.getKey()).substring(str2.length()), entry.getValue());
            }
        }
        return builder.build();
    }

    private ContentWriterFactory createContentWriterFactory(StreamId streamId, HttpRequest httpRequest) throws IOException {
        return new LengthBasedContentWriterFactory(this.streamAdmin.getConfig(streamId), this.streamWriter, getHeaders(httpRequest, streamId.getEntityName(), ImmutableMap.builder().put("content.type", HttpHeaders.getHeader(httpRequest, "Content-Type", ""))), this.batchBufferThreshold, this.impersonator);
    }

    private StreamId validateAndGetStreamId(String str, String str2) throws BadRequestException {
        try {
            return new StreamId(str, str2);
        } catch (IllegalArgumentException | NullPointerException e) {
            throw new BadRequestException(e);
        }
    }
}
