package co.cask.cdap.data2.transaction.queue.inmemory;

import co.cask.cdap.api.data.stream.StreamSpecification;
import co.cask.cdap.common.StreamNotFoundException;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data.stream.service.StreamMetaStore;
import co.cask.cdap.data.view.ViewAdmin;
import co.cask.cdap.data2.audit.AuditPublisher;
import co.cask.cdap.data2.audit.AuditPublishers;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.metadata.store.MetadataStore;
import co.cask.cdap.data2.metadata.writer.LineageWriter;
import co.cask.cdap.data2.registry.UsageRegistry;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.proto.StreamProperties;
import co.cask.cdap.proto.ViewSpecification;
import co.cask.cdap.proto.audit.AuditPayload;
import co.cask.cdap.proto.audit.AuditType;
import co.cask.cdap.proto.id.EntityId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.proto.id.StreamViewId;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;

@Singleton
/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/inmemory/InMemoryStreamAdmin.class */
public class InMemoryStreamAdmin extends InMemoryQueueAdmin implements StreamAdmin {
    private final StreamMetaStore streamMetaStore;
    private final UsageRegistry usageRegistry;
    private final LineageWriter lineageWriter;
    private final MetadataStore metadataStore;
    private final ViewAdmin viewAdmin;
    private AuditPublisher auditPublisher;

    @Inject
    public InMemoryStreamAdmin(InMemoryQueueService inMemoryQueueService, UsageRegistry usageRegistry, LineageWriter lineageWriter, StreamMetaStore streamMetaStore, MetadataStore metadataStore, ViewAdmin viewAdmin) {
        super(inMemoryQueueService);
        this.usageRegistry = usageRegistry;
        this.streamMetaStore = streamMetaStore;
        this.lineageWriter = lineageWriter;
        this.metadataStore = metadataStore;
        this.viewAdmin = viewAdmin;
    }

    @Inject(optional = true)
    public void setAuditPublisher(AuditPublisher auditPublisher) {
        this.auditPublisher = auditPublisher;
    }

    @Override // co.cask.cdap.data2.transaction.queue.inmemory.InMemoryQueueAdmin, co.cask.cdap.data2.transaction.queue.QueueAdmin
    public void dropAllInNamespace(NamespaceId namespaceId) throws Exception {
        this.queueService.resetStreamsWithPrefix(QueueName.prefixForNamedspacedStream(namespaceId.getNamespace()));
        Iterator<StreamSpecification> it = this.streamMetaStore.listStreams(namespaceId).iterator();
        while (it.hasNext()) {
            StreamId stream = namespaceId.stream(it.next().getName());
            this.metadataStore.removeMetadata(stream);
            this.streamMetaStore.removeStream(stream);
        }
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void configureInstances(StreamId streamId, long j, int i) throws Exception {
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void configureGroups(StreamId streamId, Map<Long, Integer> map) throws Exception {
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public List<StreamSpecification> listStreams(NamespaceId namespaceId) throws Exception {
        return this.streamMetaStore.listStreams(namespaceId);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public StreamConfig getConfig(StreamId streamId) {
        throw new UnsupportedOperationException("Stream config not supported for non-file based stream.");
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public StreamProperties getProperties(StreamId streamId) {
        throw new UnsupportedOperationException("Stream properties not supported for non-file based stream.");
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void updateConfig(StreamId streamId, StreamProperties streamProperties) throws IOException {
        throw new UnsupportedOperationException("Stream config not supported for non-file based stream.");
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public boolean exists(StreamId streamId) throws Exception {
        return exists(QueueName.fromStream(streamId));
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    @Nullable
    public StreamConfig create(StreamId streamId) throws Exception {
        create(QueueName.fromStream(streamId));
        publishAudit(streamId, AuditType.CREATE);
        return null;
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    @Nullable
    public StreamConfig create(StreamId streamId, @Nullable Properties properties) throws Exception {
        create(QueueName.fromStream(streamId), properties);
        this.streamMetaStore.addStream(streamId, properties != null ? properties.getProperty("stream.description") : null);
        publishAudit(streamId, AuditType.CREATE);
        return null;
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void truncate(StreamId streamId) throws Exception {
        Preconditions.checkArgument(exists(streamId), "Stream '%s' does not exist.", new Object[]{streamId});
        truncate(QueueName.fromStream(streamId));
        publishAudit(streamId, AuditType.TRUNCATE);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void drop(StreamId streamId) throws Exception {
        Preconditions.checkArgument(exists(streamId), "Stream '%s' does not exist.", new Object[]{streamId});
        this.metadataStore.removeMetadata(streamId);
        drop(QueueName.fromStream(streamId));
        this.streamMetaStore.removeStream(streamId);
        publishAudit(streamId, AuditType.DELETE);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public boolean createOrUpdateView(StreamViewId streamViewId, ViewSpecification viewSpecification) throws Exception {
        Preconditions.checkArgument(exists(streamViewId.getParent()), "Stream '%s' does not exist.", new Object[]{streamViewId.getStream()});
        return this.viewAdmin.createOrUpdate(streamViewId, viewSpecification);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void deleteView(StreamViewId streamViewId) throws Exception {
        Preconditions.checkArgument(exists(streamViewId.getParent()), "Stream '%s' does not exist.", new Object[]{streamViewId.getStream()});
        this.viewAdmin.delete(streamViewId);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public List<StreamViewId> listViews(StreamId streamId) throws Exception {
        Preconditions.checkArgument(exists(streamId), "Stream '%s' does not exist.", new Object[]{streamId});
        return this.viewAdmin.list(streamId);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public ViewSpecification getView(StreamViewId streamViewId) throws Exception {
        Preconditions.checkArgument(exists(streamViewId.getParent()), "Stream '%s' does not exist.", new Object[]{streamViewId.getStream()});
        return this.viewAdmin.get(streamViewId);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public boolean viewExists(StreamViewId streamViewId) throws Exception {
        if (exists(streamViewId.getParent())) {
            return this.viewAdmin.exists(streamViewId);
        }
        throw new StreamNotFoundException(streamViewId.getParent());
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void register(Iterable<? extends EntityId> iterable, StreamId streamId) {
        this.usageRegistry.registerAll(iterable, streamId);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void addAccess(ProgramRunId programRunId, StreamId streamId, AccessType accessType) {
        this.lineageWriter.addAccess(programRunId, streamId, accessType);
        AuditPublishers.publishAccess(this.auditPublisher, streamId, accessType, programRunId);
    }

    private void publishAudit(StreamId streamId, AuditType auditType) {
        AuditPublishers.publishAudit(this.auditPublisher, streamId, auditType, AuditPayload.EMPTY_PAYLOAD);
    }
}
