package co.cask.cdap.data.stream.service;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.TxCallable;
import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.stream.StreamSpecification;
import co.cask.cdap.api.dataset.DatasetManagementException;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.table.Table;
import co.cask.cdap.common.transaction.MultiThreadTransactionAware;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.datafabric.dataset.DatasetsUtil;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.dataset2.lib.table.MDSKey;
import co.cask.cdap.data2.dataset2.lib.table.MetadataStoreDataset;
import co.cask.cdap.data2.transaction.TransactionSystemClientAdapter;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.StreamId;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.tephra.RetryStrategies;
import org.apache.tephra.TransactionSystemClient;

/* loaded from: input_file:co/cask/cdap/data/stream/service/MDSStreamMetaStore.class */
public final class MDSStreamMetaStore implements StreamMetaStore {
    private static final DatasetId APP_META_INSTANCE_ID = NamespaceId.SYSTEM.dataset("app.meta");
    private static final String TYPE_STREAM = "stream";
    private final DatasetFramework datasetFramework;
    private final Transactional transactional;

    @Inject
    public MDSStreamMetaStore(DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient) {
        this.datasetFramework = datasetFramework;
        this.transactional = Transactions.createTransactionalWithRetry(Transactions.createTransactional(new MultiThreadDatasetCache(new SystemDatasetInstantiator(this.datasetFramework), new TransactionSystemClientAdapter(transactionSystemClient), NamespaceId.SYSTEM, ImmutableMap.of(), null, null, new MultiThreadTransactionAware[0])), RetryStrategies.retryOnConflict(20, 100L));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MetadataStoreDataset getMetadataStore(DatasetContext datasetContext) throws IOException, DatasetManagementException {
        return new MetadataStoreDataset(DatasetsUtil.getOrCreateDataset(datasetContext, this.datasetFramework, APP_META_INSTANCE_ID, Table.class.getName(), DatasetProperties.EMPTY));
    }

    @Override // co.cask.cdap.data.stream.service.StreamMetaStore
    public void addStream(StreamId streamId) throws Exception {
        addStream(streamId, null);
    }

    @Override // co.cask.cdap.data.stream.service.StreamMetaStore
    public void addStream(final StreamId streamId, @Nullable final String str) throws Exception {
        Transactionals.execute(this.transactional, new TxRunnable() { // from class: co.cask.cdap.data.stream.service.MDSStreamMetaStore.1
            public void run(DatasetContext datasetContext) throws Exception {
                MDSStreamMetaStore.this.getMetadataStore(datasetContext).write(MDSStreamMetaStore.this.getKey(streamId), MDSStreamMetaStore.this.createStreamSpec(streamId, (String) Optional.fromNullable(str).orNull()));
            }
        }, Exception.class);
    }

    @Override // co.cask.cdap.data.stream.service.StreamMetaStore
    public StreamSpecification getStream(final StreamId streamId) throws Exception {
        return (StreamSpecification) Transactionals.execute(this.transactional, new TxCallable<StreamSpecification>() { // from class: co.cask.cdap.data.stream.service.MDSStreamMetaStore.2
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public StreamSpecification m32call(DatasetContext datasetContext) throws Exception {
                return (StreamSpecification) MDSStreamMetaStore.this.getMetadataStore(datasetContext).getFirst(MDSStreamMetaStore.this.getKey(streamId), StreamSpecification.class);
            }
        }, Exception.class);
    }

    @Override // co.cask.cdap.data.stream.service.StreamMetaStore
    public void removeStream(final StreamId streamId) throws Exception {
        Transactionals.execute(this.transactional, new TxRunnable() { // from class: co.cask.cdap.data.stream.service.MDSStreamMetaStore.3
            public void run(DatasetContext datasetContext) throws Exception {
                MDSStreamMetaStore.this.getMetadataStore(datasetContext).deleteAll(MDSStreamMetaStore.this.getKey(streamId));
            }
        }, Exception.class);
    }

    @Override // co.cask.cdap.data.stream.service.StreamMetaStore
    public boolean streamExists(StreamId streamId) throws Exception {
        return getStream(streamId) != null;
    }

    @Override // co.cask.cdap.data.stream.service.StreamMetaStore
    public List<StreamSpecification> listStreams(final NamespaceId namespaceId) throws Exception {
        return (List) Transactionals.execute(this.transactional, new TxCallable<List<StreamSpecification>>() { // from class: co.cask.cdap.data.stream.service.MDSStreamMetaStore.4
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public List<StreamSpecification> m33call(DatasetContext datasetContext) throws Exception {
                return MDSStreamMetaStore.this.getMetadataStore(datasetContext).list(new MDSKey.Builder().add(MDSStreamMetaStore.TYPE_STREAM, namespaceId.getEntityName()).build(), StreamSpecification.class);
            }
        }, Exception.class);
    }

    @Override // co.cask.cdap.data.stream.service.StreamMetaStore
    public Multimap<NamespaceId, StreamSpecification> listStreams() throws Exception {
        return (Multimap) Transactionals.execute(this.transactional, new TxCallable<Multimap<NamespaceId, StreamSpecification>>() { // from class: co.cask.cdap.data.stream.service.MDSStreamMetaStore.5
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public Multimap<NamespaceId, StreamSpecification> m34call(DatasetContext datasetContext) throws Exception {
                ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
                for (Map.Entry entry : MDSStreamMetaStore.this.getMetadataStore(datasetContext).listKV(new MDSKey.Builder().add(MDSStreamMetaStore.TYPE_STREAM).build(), StreamSpecification.class).entrySet()) {
                    MDSKey.Splitter split = ((MDSKey) entry.getKey()).split();
                    split.skipString();
                    builder.put(new NamespaceId(split.getString()), entry.getValue());
                }
                return builder.build();
            }
        }, Exception.class);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MDSKey getKey(StreamId streamId) {
        return new MDSKey.Builder().add(TYPE_STREAM, streamId.getNamespace(), streamId.getEntityName()).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public StreamSpecification createStreamSpec(StreamId streamId, String str) {
        return new StreamSpecification.Builder().setName(streamId.getEntityName()).setDescription(str).create();
    }
}
