package co.cask.cdap.internal.app.runtime.worker;

import co.cask.cdap.AppWithWorker;
import co.cask.cdap.api.dataset.DatasetDefinition;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.dataset.lib.cube.AggregationFunction;
import co.cask.cdap.api.dataset.lib.cube.TimeValue;
import co.cask.cdap.api.metrics.MetricDataQuery;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.api.metrics.MetricTimeSeries;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.DynamicDatasetCache;
import co.cask.cdap.data2.dataset2.SingleThreadDatasetCache;
import co.cask.cdap.data2.transaction.TransactionExecutorFactory;
import co.cask.cdap.internal.AppFabricTestHelper;
import co.cask.cdap.internal.DefaultId;
import co.cask.cdap.internal.TempFolder;
import co.cask.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms;
import co.cask.cdap.internal.app.runtime.AbstractListener;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.proto.DatasetSpecificationSummary;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.test.SlowTests;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.TransactionExecutor;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionSystemClient;
import org.apache.twill.common.Threads;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

@Category({SlowTests.class})
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/worker/WorkerProgramRunnerTest.class */
public class WorkerProgramRunnerTest {
    private static Injector injector;
    private static TransactionExecutorFactory txExecutorFactory;
    private static TransactionManager txService;
    private static DatasetFramework dsFramework;
    private static DynamicDatasetCache datasetCache;
    private static MetricStore metricStore;
    private static final TempFolder TEMP_FOLDER = new TempFolder();
    private static Collection<ProgramController> runningPrograms = new HashSet();

    @ClassRule
    public static TemporaryFolder tmpFolder = new TemporaryFolder();
    private static final Supplier<File> TEMP_FOLDER_SUPPLIER = new Supplier<File>() { // from class: co.cask.cdap.internal.app.runtime.worker.WorkerProgramRunnerTest.1
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public File m68get() {
            try {
                return WorkerProgramRunnerTest.tmpFolder.newFolder();
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
    };

    @BeforeClass
    public static void beforeClass() {
        CConfiguration create = CConfiguration.create();
        create.set("local.data.dir", TEMP_FOLDER.newFolder("data").getAbsolutePath());
        create.setInt("data.tx.timeout", 1);
        create.setInt("data.tx.cleanup.interval", 2);
        injector = AppFabricTestHelper.getInjector(create);
        txService = (TransactionManager) injector.getInstance(TransactionManager.class);
        txExecutorFactory = (TransactionExecutorFactory) injector.getInstance(TransactionExecutorFactory.class);
        dsFramework = (DatasetFramework) injector.getInstance(DatasetFramework.class);
        datasetCache = new SingleThreadDatasetCache(new SystemDatasetInstantiator(dsFramework, WorkerProgramRunnerTest.class.getClassLoader(), (Iterable) null), (TransactionSystemClient) injector.getInstance(TransactionSystemClient.class), NamespaceId.DEFAULT, DatasetDefinition.NO_ARGUMENTS, (MetricsContext) null, (Map) null);
        metricStore = (MetricStore) injector.getInstance(MetricStore.class);
        txService.startAndWait();
    }

    @AfterClass
    public static void afterClass() throws Exception {
        txService.stopAndWait();
    }

    @After
    public void after() throws Throwable {
        Iterator<ProgramController> it = runningPrograms.iterator();
        while (it.hasNext()) {
            stopProgram(it.next());
        }
        Iterator it2 = dsFramework.getInstances(DefaultId.NAMESPACE).iterator();
        while (it2.hasNext()) {
            dsFramework.deleteInstance(DefaultId.NAMESPACE.dataset(((DatasetSpecificationSummary) it2.next()).getName()));
        }
    }

    @Test
    public void testWorkerDatasetWithMetrics() throws Throwable {
        ProgramController startProgram = startProgram(AppFabricTestHelper.deployApplicationWithManager(AppWithWorker.class, TEMP_FOLDER_SUPPLIER), AppWithWorker.TableWriter.class);
        final TransactionExecutor createExecutor = txExecutorFactory.createExecutor(datasetCache);
        Tasks.waitFor(AppWithWorker.RUN, new Callable<String>() { // from class: co.cask.cdap.internal.app.runtime.worker.WorkerProgramRunnerTest.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public String call() throws Exception {
                return (String) createExecutor.execute(new Callable<String>() { // from class: co.cask.cdap.internal.app.runtime.worker.WorkerProgramRunnerTest.2.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public String call() throws Exception {
                        return Bytes.toString(WorkerProgramRunnerTest.datasetCache.getDataset(AppWithWorker.DATASET).read(AppWithWorker.RUN));
                    }
                });
            }
        }, 5L, TimeUnit.SECONDS);
        stopProgram(startProgram);
        txExecutorFactory.createExecutor(datasetCache.getTransactionAwares()).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.worker.WorkerProgramRunnerTest.3
            public void apply() throws Exception {
                KeyValueTable dataset = WorkerProgramRunnerTest.datasetCache.getDataset(AppWithWorker.DATASET);
                Assert.assertEquals(AppWithWorker.RUN, Bytes.toString(dataset.read(AppWithWorker.RUN)));
                Assert.assertEquals(AppWithWorker.INITIALIZE, Bytes.toString(dataset.read(AppWithWorker.INITIALIZE)));
                Assert.assertEquals(AppWithWorker.STOP, Bytes.toString(dataset.read(AppWithWorker.STOP)));
            }
        });
        Tasks.waitFor(3L, new Callable<Long>() { // from class: co.cask.cdap.internal.app.runtime.worker.WorkerProgramRunnerTest.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Long call() throws Exception {
                Collection query = WorkerProgramRunnerTest.metricStore.query(new MetricDataQuery(0L, System.currentTimeMillis() / 1000, Integer.MAX_VALUE, "system.dataset.store.ops", AggregationFunction.SUM, ImmutableMap.of("ns", DefaultId.NAMESPACE.getEntityName(), "app", AppWithWorker.NAME, "wrk", AppWithWorker.WORKER, "ds", AppWithWorker.DATASET), Collections.emptyList()));
                if (query.isEmpty()) {
                    return 0L;
                }
                Assert.assertEquals(1L, query.size());
                MetricTimeSeries metricTimeSeries = (MetricTimeSeries) query.iterator().next();
                Assert.assertEquals(1L, metricTimeSeries.getTimeValues().size());
                return Long.valueOf(((TimeValue) metricTimeSeries.getTimeValues().get(0)).getValue());
            }
        }, 5L, TimeUnit.SECONDS, 50L, TimeUnit.MILLISECONDS);
    }

    private ProgramController startProgram(ApplicationWithPrograms applicationWithPrograms, Class<?> cls) throws Throwable {
        final AtomicReference atomicReference = new AtomicReference();
        final ProgramController submit = AppFabricTestHelper.submit(applicationWithPrograms, cls.getName(), new BasicArguments(), TEMP_FOLDER_SUPPLIER);
        runningPrograms.add(submit);
        submit.addListener(new AbstractListener() { // from class: co.cask.cdap.internal.app.runtime.worker.WorkerProgramRunnerTest.5
            public void error(Throwable th) {
                atomicReference.set(th);
            }

            public void killed() {
                atomicReference.set(new RuntimeException("Killed"));
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        Tasks.waitFor(ProgramController.State.ALIVE, new Callable<ProgramController.State>() { // from class: co.cask.cdap.internal.app.runtime.worker.WorkerProgramRunnerTest.6
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public ProgramController.State call() throws Exception {
                Throwable th = (Throwable) atomicReference.get();
                if (th == null) {
                    return submit.getState();
                }
                Throwables.propagateIfInstanceOf(th, Exception.class);
                throw Throwables.propagate(th);
            }
        }, 30L, TimeUnit.SECONDS);
        return submit;
    }

    private void stopProgram(ProgramController programController) throws Throwable {
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        programController.addListener(new AbstractListener() { // from class: co.cask.cdap.internal.app.runtime.worker.WorkerProgramRunnerTest.7
            public void error(Throwable th) {
                countDownLatch.countDown();
                atomicReference.set(th);
            }

            public void completed() {
                countDownLatch.countDown();
            }

            public void killed() {
                countDownLatch.countDown();
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        programController.stop();
        countDownLatch.await(30L, TimeUnit.SECONDS);
        runningPrograms.remove(programController);
        Throwable th = (Throwable) atomicReference.get();
        if (th != null) {
            throw th;
        }
    }
}
