package co.cask.cdap.data2.metadata.store;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.DatasetManagementException;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.InstanceNotFoundException;
import co.cask.cdap.api.metadata.MetadataEntity;
import co.cask.cdap.api.metadata.MetadataScope;
import co.cask.cdap.common.ServiceUnavailableException;
import co.cask.cdap.common.metadata.MetadataRecordV2;
import co.cask.cdap.common.service.Retries;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.common.transaction.MultiThreadTransactionAware;
import co.cask.cdap.common.utils.ProjectInfo;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.audit.AuditPublisher;
import co.cask.cdap.data2.audit.AuditPublishers;
import co.cask.cdap.data2.audit.payload.builder.MetadataPayloadBuilder;
import co.cask.cdap.data2.datafabric.dataset.DatasetsUtil;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.DynamicDatasetCache;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.metadata.dataset.Metadata;
import co.cask.cdap.data2.metadata.dataset.MetadataChange;
import co.cask.cdap.data2.metadata.dataset.MetadataDataset;
import co.cask.cdap.data2.metadata.dataset.MetadataDatasetDefinition;
import co.cask.cdap.data2.metadata.dataset.MetadataEntry;
import co.cask.cdap.data2.metadata.dataset.SearchRequest;
import co.cask.cdap.data2.metadata.dataset.SearchResults;
import co.cask.cdap.data2.metadata.dataset.SortInfo;
import co.cask.cdap.data2.transaction.TransactionSystemClientAdapter;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.proto.audit.AuditPayload;
import co.cask.cdap.proto.audit.AuditType;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.metadata.MetadataSearchResponseV2;
import co.cask.cdap.proto.metadata.MetadataSearchResultRecordV2;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.tephra.RetryStrategies;
import org.apache.tephra.TransactionExecutor;
import org.apache.tephra.TransactionSystemClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/metadata/store/DefaultMetadataStore.class */
public class DefaultMetadataStore implements MetadataStore {
    private static final int BATCH_SIZE = 1000;
    private static final String VERSION_TAG_PREFIX = "cdap.version:";
    private final Transactional transactional;
    private final DatasetFramework dsFramework;
    private final DynamicDatasetCache datasetCache;
    private AuditPublisher auditPublisher;
    private transient boolean hasV1SystemDs = true;
    private transient boolean hasV1BusinessDs = true;
    private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataStore.class);
    private static final Map<String, String> EMPTY_PROPERTIES = ImmutableMap.of();
    private static final Set<String> EMPTY_TAGS = ImmutableSet.of();
    private static final DatasetId BUSINESS_METADATA_INSTANCE_ID = NamespaceId.SYSTEM.dataset("business.metadata");
    private static final DatasetId SYSTEM_METADATA_INSTANCE_ID = NamespaceId.SYSTEM.dataset("system.metadata");
    private static final DatasetId V2_BUSINESS_METADATA_INSTANCE_ID = NamespaceId.SYSTEM.dataset("v2.business");
    private static final DatasetId V2_SYSTEM_METADATA_INSTANCE_ID = NamespaceId.SYSTEM.dataset("v2.system");
    private static final String NEEDS_UPGRADE_TAG = "cdap.metadatadataset.needs_upgrade";
    private static final Set<String> NEEDS_UPGRADE_TAG_SET = Collections.singleton(NEEDS_UPGRADE_TAG);
    private static final Comparator<Map.Entry<MetadataEntity, Integer>> SEARCH_RESULT_DESC_SCORE_COMPARATOR = (entry, entry2) -> {
        return ((Integer) entry2.getValue()).intValue() - ((Integer) entry.getValue()).intValue();
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data2/metadata/store/DefaultMetadataStore$DATASET.class */
    public enum DATASET {
        V1,
        V2
    }

    @Inject
    DefaultMetadataStore(TransactionSystemClient transactionSystemClient, DatasetFramework datasetFramework) {
        this.datasetCache = new MultiThreadDatasetCache(new SystemDatasetInstantiator(datasetFramework), new TransactionSystemClientAdapter(transactionSystemClient), NamespaceId.SYSTEM, Collections.emptyMap(), null, null, new MultiThreadTransactionAware[0]);
        this.transactional = Transactions.createTransactionalWithRetry(Transactions.createTransactional(this.datasetCache), RetryStrategies.retryOnConflict(20, 100L));
        this.dsFramework = datasetFramework;
    }

    @VisibleForTesting
    void deleteDatasets() throws Exception {
        this.datasetCache.invalidate();
        try {
            this.dsFramework.deleteInstance(V2_BUSINESS_METADATA_INSTANCE_ID);
        } catch (InstanceNotFoundException e) {
        }
        try {
            this.dsFramework.deleteInstance(V2_SYSTEM_METADATA_INSTANCE_ID);
        } catch (InstanceNotFoundException e2) {
        }
    }

    @Inject(optional = true)
    public void setAuditPublisher(AuditPublisher auditPublisher) {
        this.auditPublisher = auditPublisher;
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public void setProperties(MetadataScope metadataScope, MetadataEntity metadataEntity, Map<String, String> map) {
        checkWriteAllowed(metadataScope, metadataEntity);
        MetadataChange metadataChange = (MetadataChange) execute(metadataDataset -> {
            return metadataDataset.setProperty(metadataEntity, map);
        }, metadataScope, DATASET.V2);
        MetadataRecordV2 metadataRecordV2 = new MetadataRecordV2(metadataEntity, metadataScope, metadataChange.getExisting().getProperties(), metadataChange.getExisting().getTags());
        ImmutableMap.Builder builder = ImmutableMap.builder();
        ImmutableMap.Builder builder2 = ImmutableMap.builder();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String str = (String) metadataRecordV2.getProperties().get(entry.getKey());
            if (str == null || !str.equals(entry.getValue())) {
                if (str != null) {
                    builder2.put(entry.getKey(), str);
                }
                builder.put(entry.getKey(), entry.getValue());
            }
        }
        publishAudit(metadataRecordV2, new MetadataRecordV2(metadataEntity, metadataScope, builder.build(), EMPTY_TAGS), new MetadataRecordV2(metadataEntity, metadataScope, builder2.build(), EMPTY_TAGS));
    }

    private boolean hasEntityInV1(MetadataScope metadataScope, MetadataEntity metadataEntity) {
        return ((Boolean) execute(metadataDataset -> {
            return Boolean.valueOf((metadataDataset.getProperties(metadataEntity).isEmpty() && metadataDataset.getTags(metadataEntity).isEmpty()) ? false : true);
        }, metadataScope, DATASET.V1)).booleanValue();
    }

    private boolean isMigrationCompleted() {
        if (!this.hasV1SystemDs && !this.hasV1BusinessDs) {
            return true;
        }
        try {
            if (this.hasV1SystemDs) {
                this.hasV1SystemDs = this.dsFramework.hasInstance(getMetadataDatasetInstance(MetadataScope.SYSTEM));
            }
            if (this.hasV1BusinessDs) {
                this.hasV1BusinessDs = this.dsFramework.hasInstance(getMetadataDatasetInstance(MetadataScope.USER));
            }
            return false;
        } catch (DatasetManagementException e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public void setProperty(MetadataScope metadataScope, MetadataEntity metadataEntity, String str, String str2) {
        checkWriteAllowed(metadataScope, metadataEntity);
        MetadataChange metadataChange = (MetadataChange) execute(metadataDataset -> {
            return metadataDataset.setProperty(metadataEntity, str, str2);
        }, metadataScope, DATASET.V2);
        publishAudit(new MetadataRecordV2(metadataEntity, metadataScope, metadataChange.getExisting().getProperties(), metadataChange.getExisting().getTags()), new MetadataRecordV2(metadataEntity, metadataScope, ImmutableMap.of(str, str2), EMPTY_TAGS), new MetadataRecordV2(metadataEntity, metadataScope));
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public void addTags(MetadataScope metadataScope, MetadataEntity metadataEntity, Set<String> set) {
        checkWriteAllowed(metadataScope, metadataEntity);
        MetadataChange metadataChange = (MetadataChange) execute(metadataDataset -> {
            return metadataDataset.addTags(metadataEntity, (Set<String>) set);
        }, metadataScope, DATASET.V2);
        publishAudit(new MetadataRecordV2(metadataEntity, metadataScope, metadataChange.getExisting().getProperties(), metadataChange.getExisting().getTags()), new MetadataRecordV2(metadataEntity, metadataScope, EMPTY_PROPERTIES, Sets.newHashSet(set)), new MetadataRecordV2(metadataEntity, metadataScope));
    }

    private void checkWriteAllowed(MetadataScope metadataScope, MetadataEntity metadataEntity) {
        if (isMigrationCompleted()) {
            return;
        }
        if (metadataScope.equals(MetadataScope.SYSTEM)) {
            if (!this.hasV1SystemDs) {
                return;
            }
        } else if (!this.hasV1BusinessDs) {
            return;
        }
        if (hasEntityInV1(metadataScope, metadataEntity)) {
            throw new ServiceUnavailableException("metadata-service", "Metadata migration is in progress. Please retry the same operation once metadata is migrated.");
        }
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public Set<MetadataRecordV2> getMetadata(MetadataEntity metadataEntity) {
        return ImmutableSet.of(getMetadata(MetadataScope.USER, metadataEntity), getMetadata(MetadataScope.SYSTEM, metadataEntity));
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public MetadataRecordV2 getMetadata(MetadataScope metadataScope, MetadataEntity metadataEntity) {
        return (MetadataRecordV2) execute(metadataDataset -> {
            Metadata metadata = metadataDataset.getMetadata(metadataEntity);
            return new MetadataRecordV2(metadataEntity, metadataScope, metadata.getProperties(), metadata.getTags());
        }, metadataScope, DATASET.V2);
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public Set<MetadataRecordV2> getMetadata(MetadataScope metadataScope, Set<MetadataEntity> set) {
        return (Set) execute(metadataDataset -> {
            HashSet hashSet = new HashSet(set.size());
            for (Metadata metadata : metadataDataset.getMetadata((Set<? extends MetadataEntity>) set)) {
                hashSet.add(new MetadataRecordV2(metadata.getMetadataEntity(), metadataScope, metadata.getProperties(), metadata.getTags()));
            }
            return hashSet;
        }, metadataScope, DATASET.V2);
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public Map<String, String> getProperties(MetadataEntity metadataEntity) {
        return ImmutableMap.builder().putAll(getProperties(MetadataScope.USER, metadataEntity)).putAll(getProperties(MetadataScope.SYSTEM, metadataEntity)).build();
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public Map<String, String> getProperties(MetadataScope metadataScope, MetadataEntity metadataEntity) {
        return (Map) execute(metadataDataset -> {
            return metadataDataset.getProperties(metadataEntity);
        }, metadataScope, DATASET.V2);
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public Set<String> getTags(MetadataEntity metadataEntity) {
        return ImmutableSet.builder().addAll(getTags(MetadataScope.USER, metadataEntity)).addAll(getTags(MetadataScope.SYSTEM, metadataEntity)).build();
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public Set<String> getTags(MetadataScope metadataScope, MetadataEntity metadataEntity) {
        return (Set) execute(metadataDataset -> {
            return metadataDataset.getTags(metadataEntity);
        }, metadataScope, DATASET.V2);
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public void removeMetadata(MetadataEntity metadataEntity) {
        removeMetadata(MetadataScope.USER, metadataEntity);
        removeMetadata(MetadataScope.SYSTEM, metadataEntity);
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public void removeMetadata(MetadataScope metadataScope, MetadataEntity metadataEntity) {
        checkWriteAllowed(metadataScope, metadataEntity);
        MetadataChange metadataChange = (MetadataChange) execute(metadataDataset -> {
            return metadataDataset.removeMetadata(metadataEntity);
        }, metadataScope, DATASET.V2);
        MetadataRecordV2 metadataRecordV2 = new MetadataRecordV2(metadataEntity, metadataScope, metadataChange.getExisting().getProperties(), metadataChange.getExisting().getTags());
        publishAudit(metadataRecordV2, new MetadataRecordV2(metadataEntity, metadataScope), new MetadataRecordV2(metadataRecordV2));
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public void removeProperties(MetadataScope metadataScope, MetadataEntity metadataEntity) {
        checkWriteAllowed(metadataScope, metadataEntity);
        MetadataChange metadataChange = (MetadataChange) execute(metadataDataset -> {
            return metadataDataset.removeProperties(metadataEntity);
        }, metadataScope, DATASET.V2);
        MetadataRecordV2 metadataRecordV2 = new MetadataRecordV2(metadataEntity, metadataScope, metadataChange.getExisting().getProperties(), metadataChange.getExisting().getTags());
        publishAudit(metadataRecordV2, new MetadataRecordV2(metadataEntity, metadataScope), new MetadataRecordV2(metadataEntity, metadataScope, metadataRecordV2.getProperties(), EMPTY_TAGS));
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public void removeProperties(MetadataScope metadataScope, MetadataEntity metadataEntity, Set<String> set) {
        checkWriteAllowed(metadataScope, metadataEntity);
        AtomicReference atomicReference = new AtomicReference();
        MetadataChange metadataChange = (MetadataChange) execute(metadataDataset -> {
            MetadataChange removeProperties = metadataDataset.removeProperties(metadataEntity, (Set<String>) set);
            atomicReference.set(removeProperties.getDeletedProperties());
            return removeProperties;
        }, metadataScope, DATASET.V2);
        publishAudit(new MetadataRecordV2(metadataEntity, metadataScope, metadataChange.getExisting().getProperties(), metadataChange.getExisting().getTags()), new MetadataRecordV2(metadataEntity, metadataScope), new MetadataRecordV2(metadataEntity, metadataScope, (Map) atomicReference.get(), EMPTY_TAGS));
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public void removeTags(MetadataScope metadataScope, MetadataEntity metadataEntity) {
        checkWriteAllowed(metadataScope, metadataEntity);
        MetadataChange metadataChange = (MetadataChange) execute(metadataDataset -> {
            return metadataDataset.removeTags(metadataEntity);
        }, metadataScope, DATASET.V2);
        MetadataRecordV2 metadataRecordV2 = new MetadataRecordV2(metadataEntity, metadataScope, metadataChange.getExisting().getProperties(), metadataChange.getExisting().getTags());
        publishAudit(metadataRecordV2, new MetadataRecordV2(metadataEntity, metadataScope), new MetadataRecordV2(metadataEntity, metadataScope, EMPTY_PROPERTIES, metadataRecordV2.getTags()));
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public void removeTags(MetadataScope metadataScope, MetadataEntity metadataEntity, Set<String> set) {
        checkWriteAllowed(metadataScope, metadataEntity);
        MetadataChange metadataChange = (MetadataChange) execute(metadataDataset -> {
            return metadataDataset.removeTags(metadataEntity, (Set<String>) set);
        }, metadataScope, DATASET.V2);
        publishAudit(new MetadataRecordV2(metadataEntity, metadataScope, metadataChange.getExisting().getProperties(), metadataChange.getExisting().getTags()), new MetadataRecordV2(metadataEntity, metadataScope), new MetadataRecordV2(metadataEntity, metadataScope, EMPTY_PROPERTIES, Sets.newHashSet(set)));
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public MetadataSearchResponseV2 search(SearchRequest searchRequest) {
        EnumSet allOf = EnumSet.allOf(MetadataScope.class);
        if ("*".equals(searchRequest.getQuery())) {
            if (SortInfo.DEFAULT.equals(searchRequest.getSortInfo())) {
                LOG.warn("Attempt to search through all indexes. This query can have an adverse effect on performance and is not recommended for production use. It is only meant to be used for administrative purposes such as upgrade. To improve the performance of such queries, please specify sort parameters as well.");
            } else {
                allOf = EnumSet.of(MetadataScope.SYSTEM);
            }
        }
        return search(allOf, searchRequest);
    }

    private MetadataSearchResponseV2 search(Set<MetadataScope> set, SearchRequest searchRequest) {
        LinkedList linkedList = new LinkedList();
        LinkedList linkedList2 = new LinkedList();
        Iterator<MetadataScope> it = set.iterator();
        while (it.hasNext()) {
            SearchResults searchResults = (SearchResults) execute(metadataDataset -> {
                return metadataDataset.search(searchRequest);
            }, it.next(), DATASET.V2);
            linkedList.addAll(searchResults.getResults());
            linkedList2.addAll(searchResults.getCursors());
        }
        int offset = searchRequest.getOffset();
        int limit = searchRequest.getLimit();
        SortInfo sortInfo = searchRequest.getSortInfo();
        Set<MetadataEntity> sortedEntities = getSortedEntities(linkedList, sortInfo);
        int size = sortedEntities.size();
        LinkedHashSet linkedHashSet = new LinkedHashSet((Collection) ImmutableList.copyOf(sortedEntities).subList(Math.min(searchRequest.getOffset(), sortedEntities.size()), Math.min((int) Math.min(2147483647L, offset + limit), sortedEntities.size())));
        return new MetadataSearchResponseV2(sortInfo.getSortBy() + " " + sortInfo.getSortOrder(), offset, limit, searchRequest.getNumCursors(), size, addMetadataToEntities(linkedHashSet, fetchMetadata(linkedHashSet, MetadataScope.SYSTEM), fetchMetadata(linkedHashSet, MetadataScope.USER)), linkedList2, searchRequest.shouldShowHidden(), searchRequest.getEntityScopes());
    }

    private Set<MetadataEntity> getSortedEntities(List<MetadataEntry> list, SortInfo sortInfo) {
        if (SortInfo.SortOrder.WEIGHTED != sortInfo.getSortOrder()) {
            LinkedHashSet linkedHashSet = new LinkedHashSet(list.size());
            Iterator<MetadataEntry> it = list.iterator();
            while (it.hasNext()) {
                linkedHashSet.add(it.next().getMetadataEntity());
            }
            return linkedHashSet;
        }
        HashMap hashMap = new HashMap();
        for (MetadataEntry metadataEntry : list) {
            hashMap.put(metadataEntry.getMetadataEntity(), Integer.valueOf(((Integer) hashMap.getOrDefault(metadataEntry.getMetadataEntity(), 0)).intValue() + 1));
        }
        ArrayList arrayList = new ArrayList(hashMap.entrySet());
        arrayList.sort(SEARCH_RESULT_DESC_SCORE_COMPARATOR);
        LinkedHashSet linkedHashSet2 = new LinkedHashSet(arrayList.size());
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            linkedHashSet2.add(((Map.Entry) it2.next()).getKey());
        }
        return linkedHashSet2;
    }

    private Map<MetadataEntity, Metadata> fetchMetadata(Set<MetadataEntity> set, MetadataScope metadataScope) {
        Set<Metadata> set2 = (Set) execute(metadataDataset -> {
            return metadataDataset.getMetadata((Set<? extends MetadataEntity>) set);
        }, metadataScope, DATASET.V2);
        HashMap hashMap = new HashMap();
        for (Metadata metadata : set2) {
            hashMap.put(metadata.getMetadataEntity(), metadata);
        }
        return hashMap;
    }

    private Set<MetadataSearchResultRecordV2> addMetadataToEntities(Set<MetadataEntity> set, Map<MetadataEntity, Metadata> map, Map<MetadataEntity, Metadata> map2) {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        for (MetadataEntity metadataEntity : set) {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            Metadata metadata = map.get(metadataEntity);
            if (metadata != null) {
                builder.put(MetadataScope.SYSTEM, new co.cask.cdap.api.metadata.Metadata(metadata.getProperties(), metadata.getTags()));
            }
            Metadata metadata2 = map2.get(metadataEntity);
            if (metadata2 != null) {
                builder.put(MetadataScope.USER, new co.cask.cdap.api.metadata.Metadata(metadata2.getProperties(), metadata2.getTags()));
            }
            linkedHashSet.add(new MetadataSearchResultRecordV2(metadataEntity, builder.build()));
        }
        return linkedHashSet;
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public Set<MetadataRecordV2> getSnapshotBeforeTime(MetadataScope metadataScope, Set<MetadataEntity> set, long j) {
        Set<Metadata> set2 = (Set) execute(metadataDataset -> {
            return metadataDataset.getSnapshotBeforeTime((Set<MetadataEntity>) set, j);
        }, metadataScope, DATASET.V2);
        ImmutableSet.Builder builder = ImmutableSet.builder();
        for (Metadata metadata : set2) {
            builder.add(new MetadataRecordV2(metadata.getEntityId(), metadataScope, metadata.getProperties(), metadata.getTags()));
        }
        return builder.build();
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public void rebuildIndexes(MetadataScope metadataScope, RetryStrategy retryStrategy) {
        byte[] bArr = null;
        while (true) {
            byte[] rebuildIndexesWithRetries = rebuildIndexesWithRetries(metadataScope, bArr, retryStrategy);
            bArr = rebuildIndexesWithRetries;
            if (rebuildIndexesWithRetries == null) {
                return;
            } else {
                LOG.debug("Completed a batch for rebuilding {} metadata indexes.", metadataScope);
            }
        }
    }

    private byte[] rebuildIndexesWithRetries(MetadataScope metadataScope, byte[] bArr, RetryStrategy retryStrategy) {
        try {
            return (byte[]) Retries.callWithRetries(() -> {
                return rebuildIndex(bArr, metadataScope);
            }, retryStrategy);
        } catch (Exception e) {
            LOG.error("Failed to reIndex while Upgrading Metadata Dataset.", e);
            throw new RuntimeException(e);
        }
    }

    private void publishAudit(MetadataRecordV2 metadataRecordV2, MetadataRecordV2 metadataRecordV22, MetadataRecordV2 metadataRecordV23) {
        MetadataPayloadBuilder metadataPayloadBuilder = new MetadataPayloadBuilder();
        metadataPayloadBuilder.addPrevious(metadataRecordV2);
        metadataPayloadBuilder.addAdditions(metadataRecordV22);
        metadataPayloadBuilder.addDeletions(metadataRecordV23);
        AuditPublishers.publishAudit(this.auditPublisher, metadataRecordV2.getMetadataEntity(), AuditType.METADATA_CHANGE, (AuditPayload) metadataPayloadBuilder.build());
    }

    private <T> T execute(TransactionExecutor.Function<MetadataDataset, T> function, MetadataScope metadataScope, DATASET dataset) {
        return (T) Transactionals.execute(this.transactional, datasetContext -> {
            return function.apply(getMetadataDataset(datasetContext, this.dsFramework, metadataScope, dataset));
        });
    }

    private void execute(TransactionExecutor.Procedure<MetadataDataset> procedure, MetadataScope metadataScope, DATASET dataset) {
        Transactionals.execute(this.transactional, datasetContext -> {
            procedure.apply(getMetadataDataset(datasetContext, this.dsFramework, metadataScope, dataset));
        });
    }

    private byte[] rebuildIndex(byte[] bArr, MetadataScope metadataScope) {
        return (byte[]) execute(metadataDataset -> {
            return metadataDataset.rebuildIndexes(bArr, BATCH_SIZE);
        }, metadataScope, DATASET.V2);
    }

    private void removeNullOrEmptyTags(DatasetId datasetId, MetadataScope metadataScope) {
        execute(metadataDataset -> {
            metadataDataset.removeNullOrEmptyTags(datasetId.toMetadataEntity());
        }, metadataScope, DATASET.V2);
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public void createOrUpgrade(MetadataScope metadataScope) throws DatasetManagementException, IOException {
        DatasetId metadataDatasetInstance = getMetadataDatasetInstance(metadataScope);
        if (!this.dsFramework.hasInstance(metadataDatasetInstance)) {
            DatasetsUtil.createIfNotExists(this.dsFramework, metadataDatasetInstance, MetadataDataset.class.getName(), DatasetProperties.builder().add(MetadataDatasetDefinition.SCOPE_KEY, metadataScope.name()).build());
            markUpgradeComplete(metadataScope);
        } else if (isUpgradeRequired(metadataScope)) {
            this.dsFramework.updateInstance(metadataDatasetInstance, DatasetProperties.builder().add(MetadataDatasetDefinition.SCOPE_KEY, metadataScope.name()).build());
            removeNullOrEmptyTags(metadataDatasetInstance, metadataScope);
        }
    }

    private static DatasetId getMetadataDatasetInstance(MetadataScope metadataScope) {
        return MetadataScope.USER == metadataScope ? BUSINESS_METADATA_INSTANCE_ID : SYSTEM_METADATA_INSTANCE_ID;
    }

    public static MetadataDataset getMetadataDataset(DatasetContext datasetContext, DatasetFramework datasetFramework, MetadataScope metadataScope) {
        return getMetadataDataset(datasetContext, datasetFramework, metadataScope, DATASET.V2);
    }

    private static MetadataDataset getMetadataDataset(DatasetContext datasetContext, DatasetFramework datasetFramework, MetadataScope metadataScope, DATASET dataset) {
        try {
            if (!dataset.equals(DATASET.V1)) {
                return DatasetsUtil.getOrCreateDataset(datasetContext, datasetFramework, getV2MetadataDatasetInstance(metadataScope), MetadataDataset.class.getName(), DatasetProperties.builder().add(MetadataDatasetDefinition.SCOPE_KEY, metadataScope.name()).build());
            }
            DatasetId metadataDatasetInstance = getMetadataDatasetInstance(metadataScope);
            return datasetContext.getDataset(metadataDatasetInstance.getNamespace(), metadataDatasetInstance.getDataset());
        } catch (DatasetManagementException | IOException e) {
            throw Throwables.propagate(e);
        }
    }

    private static DatasetId getV2MetadataDatasetInstance(MetadataScope metadataScope) {
        return MetadataScope.USER == metadataScope ? V2_BUSINESS_METADATA_INSTANCE_ID : V2_SYSTEM_METADATA_INSTANCE_ID;
    }

    public static void setupDatasets(DatasetFramework datasetFramework) throws IOException, DatasetManagementException {
        datasetFramework.addInstance(MetadataDataset.class.getName(), BUSINESS_METADATA_INSTANCE_ID, DatasetProperties.EMPTY);
        datasetFramework.addInstance(MetadataDataset.class.getName(), SYSTEM_METADATA_INSTANCE_ID, DatasetProperties.EMPTY);
    }

    public static void setupV2Datasets(DatasetFramework datasetFramework) throws IOException, DatasetManagementException {
        datasetFramework.addInstance(MetadataDataset.class.getName(), V2_BUSINESS_METADATA_INSTANCE_ID, DatasetProperties.EMPTY);
        datasetFramework.addInstance(MetadataDataset.class.getName(), V2_SYSTEM_METADATA_INSTANCE_ID, DatasetProperties.EMPTY);
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public void markUpgradeComplete(MetadataScope metadataScope) {
        DatasetId metadataDatasetInstance = getMetadataDatasetInstance(metadataScope);
        LOG.info("Add Upgrade tag with version {} to {}", ProjectInfo.getVersion().toString(), metadataDatasetInstance);
        addTags(metadataScope, metadataDatasetInstance.toMetadataEntity(), getTagWithVersion(ProjectInfo.getVersion().toString()));
        removeTags(metadataScope, metadataDatasetInstance.toMetadataEntity(), NEEDS_UPGRADE_TAG_SET);
    }

    @Override // co.cask.cdap.data2.metadata.store.MetadataStore
    public boolean isUpgradeRequired(MetadataScope metadataScope) {
        DatasetId metadataDatasetInstance = getMetadataDatasetInstance(metadataScope);
        Set<String> tags = getTags(metadataScope, metadataDatasetInstance.toMetadataEntity());
        if (tags.contains(NEEDS_UPGRADE_TAG)) {
            LOG.debug("NEEDS_UPGRADE_TAG found on Metadata Dataset. Upgrade is required.");
            return true;
        }
        boolean z = false;
        for (String str : tags) {
            if (str.startsWith(VERSION_TAG_PREFIX)) {
                z = true;
                if (!getVersionFromVersionTag(str).equals(ProjectInfo.getVersion().toString())) {
                    LOG.debug("Metadata Dataset version mismatch. Needs Upgrade");
                    removeTags(metadataScope, metadataDatasetInstance.toMetadataEntity(), Collections.singleton(str));
                    addTags(metadataScope, metadataDatasetInstance.toMetadataEntity(), NEEDS_UPGRADE_TAG_SET);
                    return true;
                }
            }
        }
        if (z) {
            return false;
        }
        addTags(metadataScope, metadataDatasetInstance.toMetadataEntity(), NEEDS_UPGRADE_TAG_SET);
        return true;
    }

    private String getVersionFromVersionTag(String str) {
        return str.substring(VERSION_TAG_PREFIX.length());
    }

    private Set<String> getTagWithVersion(String str) {
        return Collections.singleton(VERSION_TAG_PREFIX + str);
    }
}
