package co.cask.cdap.data2.transaction.queue.inmemory;

import co.cask.cdap.api.data.stream.StreamSpecification;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data.stream.service.StreamMetaStore;
import co.cask.cdap.data2.registry.UsageRegistry;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.StreamProperties;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;

@Singleton
/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/inmemory/InMemoryStreamAdmin.class */
public class InMemoryStreamAdmin extends InMemoryQueueAdmin implements StreamAdmin {
    private final StreamMetaStore streamMetaStore;
    private final UsageRegistry usageRegistry;

    @Inject
    public InMemoryStreamAdmin(InMemoryQueueService inMemoryQueueService, UsageRegistry usageRegistry, StreamMetaStore streamMetaStore) {
        super(inMemoryQueueService);
        this.usageRegistry = usageRegistry;
        this.streamMetaStore = streamMetaStore;
    }

    @Override // co.cask.cdap.data2.transaction.queue.inmemory.InMemoryQueueAdmin, co.cask.cdap.data2.transaction.queue.QueueAdmin
    public void dropAllInNamespace(Id.Namespace namespace) throws Exception {
        this.queueService.resetStreamsWithPrefix(QueueName.prefixForNamedspacedStream(namespace.getId()));
        Iterator<StreamSpecification> it = this.streamMetaStore.listStreams(namespace).iterator();
        while (it.hasNext()) {
            this.streamMetaStore.removeStream(Id.Stream.from(namespace, it.next().getName()));
        }
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void configureInstances(Id.Stream stream, long j, int i) throws Exception {
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void configureGroups(Id.Stream stream, Map<Long, Integer> map) throws Exception {
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public StreamConfig getConfig(Id.Stream stream) {
        throw new UnsupportedOperationException("Stream config not supported for non-file based stream.");
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void updateConfig(Id.Stream stream, StreamProperties streamProperties) throws IOException {
        throw new UnsupportedOperationException("Stream config not supported for non-file based stream.");
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public boolean exists(Id.Stream stream) throws Exception {
        return exists(QueueName.fromStream(stream));
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void create(Id.Stream stream) throws Exception {
        create(QueueName.fromStream(stream));
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void create(Id.Stream stream, @Nullable Properties properties) throws Exception {
        create(QueueName.fromStream(stream), properties);
        this.streamMetaStore.addStream(stream);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void truncate(Id.Stream stream) throws Exception {
        truncate(QueueName.fromStream(stream));
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void drop(Id.Stream stream) throws Exception {
        drop(QueueName.fromStream(stream));
        this.streamMetaStore.removeStream(stream);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamAdmin
    public void register(Iterable<? extends Id> iterable, Id.Stream stream) {
        this.usageRegistry.registerAll(iterable, stream);
    }
}
