package co.cask.cdap.metadata;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.DatasetManagementException;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.metadata.MetadataScope;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.LogSamplers;
import co.cask.cdap.common.logging.Loggers;
import co.cask.cdap.common.service.AbstractRetryableScheduledService;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.common.transaction.MultiThreadTransactionAware;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.datafabric.dataset.DatasetsUtil;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.metadata.dataset.MetadataDataset;
import co.cask.cdap.data2.metadata.dataset.MetadataEntries;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.proto.id.DatasetId;
import co.cask.cdap.proto.id.NamespaceId;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.apache.tephra.TransactionSystemClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/metadata/MetadataMigrator.class */
class MetadataMigrator extends AbstractRetryableScheduledService {
    private static final Logger LOG = LoggerFactory.getLogger(MetadataMigrator.class);
    private static final Logger OUTAGE_LOG = Loggers.sampling(LOG, LogSamplers.perMessage(() -> {
        return LogSamplers.limitRate(60000L);
    }));
    private final Queue<KeyValue<DatasetId, DatasetId>> datasetIds;
    private final DatasetFramework dsFramework;
    private final Transactional transactional;
    private final int batchSize;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MetadataMigrator(CConfiguration cConfiguration, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient) {
        super(RetryStrategies.exponentialDelay(1L, 60L, TimeUnit.SECONDS));
        this.datasetIds = new LinkedList();
        this.dsFramework = datasetFramework;
        this.batchSize = cConfiguration.getInt("metadata.upgrade.migration.batch.size");
        this.transactional = Transactions.createTransactionalWithRetry(Transactions.createTransactional(new MultiThreadDatasetCache(new SystemDatasetInstantiator(datasetFramework), transactionSystemClient, NamespaceId.SYSTEM, Collections.emptyMap(), (MetricsContext) null, (Map) null, new MultiThreadTransactionAware[0])), org.apache.tephra.RetryStrategies.retryOnConflict(20, 100L));
        this.datasetIds.add(new KeyValue<>(NamespaceId.SYSTEM.dataset("system.metadata"), NamespaceId.SYSTEM.dataset("v2.system")));
        this.datasetIds.add(new KeyValue<>(NamespaceId.SYSTEM.dataset("business.metadata"), NamespaceId.SYSTEM.dataset("v2.business")));
    }

    protected void doStartUp() throws Exception {
        LOG.info("Starting Metadata Migrator Service.");
    }

    public long runTask() throws Exception {
        if (this.datasetIds.isEmpty()) {
            stop();
            return 0L;
        }
        try {
            KeyValue<DatasetId, DatasetId> peek = this.datasetIds.peek();
            if (!this.dsFramework.hasInstance((DatasetId) peek.getKey())) {
                this.datasetIds.poll();
                return 0L;
            }
            if (((Boolean) Transactionals.execute(this.transactional, datasetContext -> {
                MetadataDataset dataset = datasetContext.getDataset(((DatasetId) peek.getKey()).getDataset());
                MetadataDataset metadataDataset = getMetadataDataset(datasetContext, (DatasetId) peek.getValue());
                MetadataEntries scanFromV1Table = dataset.scanFromV1Table(this.batchSize);
                if (scanFromV1Table.getEntries().isEmpty()) {
                    return true;
                }
                metadataDataset.writeUpgradedRows(scanFromV1Table.getEntries());
                dataset.deleteRows(scanFromV1Table.getRows());
                return false;
            })).booleanValue()) {
                this.dsFramework.deleteInstance((DatasetId) peek.getKey());
                LOG.info("Migration for dataset {} is complete. This dataset is dropped.", peek.getKey());
            }
            return 0L;
        } catch (Exception e) {
            OUTAGE_LOG.error("Error occurred while migrating metadata. ", e);
            throw e;
        }
    }

    protected String getServiceName() {
        return "metadata-migrator-service";
    }

    protected void doShutdown() throws Exception {
        LOG.info("Stopping Metadata Migrator Service.");
    }

    private MetadataDataset getMetadataDataset(DatasetContext datasetContext, DatasetId datasetId) throws IOException, DatasetManagementException {
        return DatasetsUtil.getOrCreateDataset(datasetContext, this.dsFramework, datasetId, MetadataDataset.class.getName(), DatasetProperties.builder().add("scope", (datasetId.getDataset().contains("business") ? MetadataScope.USER : MetadataScope.SYSTEM).name()).build());
    }
}
