package co.cask.cdap.internal.app.runtime.distributed.remote;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.dataset.DatasetManagementException;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.transaction.MultiThreadTransactionAware;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.datafabric.dataset.service.DatasetService;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.DynamicDatasetCache;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.transaction.TransactionSystemClientAdapter;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.SimpleProgramOptions;
import co.cask.cdap.internal.app.runtime.batch.AppWithMapReduceUsingRuntimeDatasets;
import co.cask.cdap.internal.app.store.AppMetadataStore;
import co.cask.cdap.internal.guice.AppFabricTestModule;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramRunId;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Service;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.tephra.RetryStrategies;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionSystemClient;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/remote/RemoteRuntimeDatasetTest.class */
public class RemoteRuntimeDatasetTest {

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private static TransactionManager txManager;
    private static DatasetService datasetService;
    private static MessagingService messagingService;
    private static DatasetFramework datasetFramework;
    private static Transactional transactional;
    private static DynamicDatasetCache datasetCache;

    @BeforeClass
    public static void init() throws Exception {
        CConfiguration create = CConfiguration.create();
        create.set("local.data.dir", TEMP_FOLDER.newFolder().getAbsolutePath());
        create.setBoolean("explore.enabled", false);
        Injector createInjector = Guice.createInjector(new Module[]{new AppFabricTestModule(create)});
        txManager = (TransactionManager) createInjector.getInstance(TransactionManager.class);
        txManager.startAndWait();
        datasetService = (DatasetService) createInjector.getInstance(DatasetService.class);
        datasetService.startAndWait();
        messagingService = (MessagingService) createInjector.getInstance(MessagingService.class);
        if (messagingService instanceof Service) {
            messagingService.startAndWait();
        }
        datasetFramework = (DatasetFramework) createInjector.getInstance(DatasetFramework.class);
        datasetCache = new MultiThreadDatasetCache(new SystemDatasetInstantiator(datasetFramework), new TransactionSystemClientAdapter((TransactionSystemClient) createInjector.getInstance(TransactionSystemClient.class)), NamespaceId.SYSTEM, ImmutableMap.of(), (MetricsContext) null, (Map) null, new MultiThreadTransactionAware[0]);
        transactional = Transactions.createTransactionalWithRetry(Transactions.createTransactional(datasetCache), RetryStrategies.retryOnConflict(20, 100L));
    }

    @AfterClass
    public static void finish() {
        if (messagingService instanceof Service) {
            messagingService.stopAndWait();
        }
        datasetService.stopAndWait();
        txManager.stopAndWait();
    }

    @After
    public void resetTest() throws IOException, DatasetManagementException {
        datasetCache.invalidate();
        datasetFramework.deleteInstance(AppMetadataStore.APP_META_INSTANCE_ID);
    }

    @Test
    public void testReadWrite() {
        ProgramRunId run = NamespaceId.DEFAULT.app("app").program(ProgramType.SPARK, "spark").run(RunIds.generate().getId());
        SimpleProgramOptions simpleProgramOptions = new SimpleProgramOptions(run.getParent(), new BasicArguments(Collections.singletonMap("a", "b")), new BasicArguments(Collections.singletonMap(AppWithMapReduceUsingRuntimeDatasets.FileMapper.ONLY_KEY, "y")));
        Transactionals.execute(transactional, datasetContext -> {
            RemoteRuntimeDataset.create(datasetContext, datasetFramework).write(run, simpleProgramOptions);
        });
        Map.Entry entry = (Map.Entry) ((Optional) Transactionals.execute(transactional, datasetContext2 -> {
            return RemoteRuntimeDataset.create(datasetContext2, datasetFramework).scan(1, (ProgramRunId) null).stream().findFirst();
        })).orElseThrow(IllegalStateException::new);
        Assert.assertEquals(run, entry.getKey());
        Assert.assertTrue(programOptionsEquals(simpleProgramOptions, (ProgramOptions) entry.getValue()));
    }

    @Test
    public void testScan() {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 5; i++) {
            ProgramRunId run = NamespaceId.DEFAULT.app("app").program(ProgramType.SPARK, "spark").run(RunIds.generate().getId());
            SimpleProgramOptions simpleProgramOptions = new SimpleProgramOptions(run.getParent(), new BasicArguments(Collections.singletonMap("a", "b" + i)), new BasicArguments(Collections.singletonMap(AppWithMapReduceUsingRuntimeDatasets.FileMapper.ONLY_KEY, "y" + i)));
            hashMap.put(run, simpleProgramOptions);
            Transactionals.execute(transactional, datasetContext -> {
                RemoteRuntimeDataset.create(datasetContext, datasetFramework).write(run, simpleProgramOptions);
            });
        }
        AtomicReference atomicReference = new AtomicReference();
        HashMap hashMap2 = new HashMap();
        for (boolean z = false; !z; z = ((Boolean) Transactionals.execute(transactional, datasetContext2 -> {
            List<Map.Entry> scan = RemoteRuntimeDataset.create(datasetContext2, datasetFramework).scan(2, (ProgramRunId) atomicReference.get());
            for (Map.Entry entry : scan) {
                hashMap2.put(entry.getKey(), entry.getValue());
                atomicReference.set(entry.getKey());
            }
            return Boolean.valueOf(scan.isEmpty());
        })).booleanValue()) {
        }
        Assert.assertEquals(hashMap.size(), hashMap2.size());
        Assert.assertTrue(hashMap.entrySet().stream().allMatch(entry -> {
            return programOptionsEquals((ProgramOptions) entry.getValue(), (ProgramOptions) hashMap2.get(entry.getKey()));
        }));
    }

    @Test
    public void testDelete() {
        ProgramRunId run = NamespaceId.DEFAULT.app("app").program(ProgramType.SPARK, "spark").run(RunIds.generate().getId());
        SimpleProgramOptions simpleProgramOptions = new SimpleProgramOptions(run.getParent(), new BasicArguments(Collections.singletonMap("a", "b")), new BasicArguments(Collections.singletonMap(AppWithMapReduceUsingRuntimeDatasets.FileMapper.ONLY_KEY, "y")));
        Transactionals.execute(transactional, datasetContext -> {
            RemoteRuntimeDataset.create(datasetContext, datasetFramework).write(run, simpleProgramOptions);
        });
        Transactionals.execute(transactional, datasetContext2 -> {
            RemoteRuntimeDataset.create(datasetContext2, datasetFramework).delete(run);
        });
        Assert.assertTrue(((Boolean) Transactionals.execute(transactional, datasetContext3 -> {
            return Boolean.valueOf(RemoteRuntimeDataset.create(datasetContext3, datasetFramework).scan(1, (ProgramRunId) null).isEmpty());
        })).booleanValue());
    }

    private boolean programOptionsEquals(ProgramOptions programOptions, ProgramOptions programOptions2) {
        return Objects.equals(programOptions.getProgramId(), programOptions2.getProgramId()) && programOptions.isDebug() == programOptions2.isDebug() && Objects.equals(programOptions.getArguments().asMap(), programOptions2.getArguments().asMap()) && Objects.equals(programOptions.getUserArguments().asMap(), programOptions2.getUserArguments().asMap());
    }
}
