package co.cask.cdap.app.mapreduce;

import co.cask.cdap.AppWithWorker;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.api.metrics.MetricType;
import co.cask.cdap.api.metrics.MetricValues;
import co.cask.cdap.app.metrics.MapReduceMetrics;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data2.datafabric.dataset.service.DatasetService;
import co.cask.cdap.data2.datafabric.dataset.service.executor.DatasetOpExecutor;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.MRJobInfo;
import co.cask.cdap.proto.MRTaskInfo;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.test.internal.guice.AppFabricTestModule;
import co.cask.tephra.TransactionManager;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:co/cask/cdap/app/mapreduce/LocalMRJobInfoFetcherTest.class */
public class LocalMRJobInfoFetcherTest {
    private static Injector injector;
    private static MetricStore metricStore;

    @BeforeClass
    public static void beforeClass() throws Exception {
        CConfiguration create = CConfiguration.create();
        create.set("app.output.dir", System.getProperty("java.io.tmpdir"));
        create.set("app.temp.dir", System.getProperty("java.io.tmpdir"));
        injector = startMetricsService(create);
        metricStore = (MetricStore) injector.getInstance(MetricStore.class);
    }

    public static Injector startMetricsService(CConfiguration cConfiguration) {
        Injector createInjector = Guice.createInjector(new Module[]{new AppFabricTestModule(cConfiguration)});
        ((TransactionManager) createInjector.getInstance(TransactionManager.class)).startAndWait();
        ((DatasetOpExecutor) createInjector.getInstance(DatasetOpExecutor.class)).startAndWait();
        ((DatasetService) createInjector.getInstance(DatasetService.class)).startAndWait();
        return createInjector;
    }

    @Test
    public void testGetMRJobInfo() throws Exception {
        Id.Program from = Id.Program.from("fooNamespace", "testApp", ProgramType.MAPREDUCE, "fooMapReduce");
        Id.Run run = new Id.Run(from, "run10878");
        ImmutableMap of = ImmutableMap.of("ns", from.getNamespaceId(), "app", from.getApplicationId(), "mr", from.getId(), AppWithWorker.RUN, run.getId());
        Map<String, String> addToContext = addToContext(of, "mrt", MapReduceMetrics.TaskType.Mapper.getId());
        Map<String, String> addToContext2 = addToContext(of, "mrt", MapReduceMetrics.TaskType.Reducer.getId());
        Map<String, String> addToContext3 = addToContext(addToContext, "ins", "task_m_01");
        Map<String, String> addToContext4 = addToContext(addToContext, "ins", "task_m_02");
        Map<String, String> addToContext5 = addToContext(addToContext2, "ins", "task_r_01");
        long currentTimeMillis = System.currentTimeMillis() / 1000;
        gauge(addToContext, "process.completion", currentTimeMillis, 76L);
        gauge(addToContext2, "process.completion", currentTimeMillis, 52L);
        gauge(addToContext3, "process.completion.task", currentTimeMillis, 100L);
        gauge(addToContext3, "process.entries.task.in", currentTimeMillis, 32L);
        gauge(addToContext3, "process.entries.task.out", currentTimeMillis, 320L);
        gauge(addToContext4, "process.completion.task", currentTimeMillis, 12L);
        gauge(addToContext4, "process.entries.task.in", currentTimeMillis, 6L);
        gauge(addToContext4, "process.entries.task.out", currentTimeMillis, 60L);
        gauge(addToContext, "process.entries.in", currentTimeMillis, 38L);
        gauge(addToContext, "process.entries.out", currentTimeMillis, 380L);
        gauge(addToContext5, "process.completion.task", currentTimeMillis, 76L);
        gauge(addToContext5, "process.entries.task.in", currentTimeMillis, 320L);
        gauge(addToContext5, "process.entries.task.out", currentTimeMillis, 1L);
        gauge(addToContext2, "process.entries.in", currentTimeMillis, 320L);
        gauge(addToContext2, "process.entries.out", currentTimeMillis, 1L);
        MRJobInfo mRJobInfo = ((LocalMRJobInfoFetcher) injector.getInstance(LocalMRJobInfoFetcher.class)).getMRJobInfo(run);
        Assert.assertFalse(mRJobInfo.isComplete());
        Map counters = mRJobInfo.getCounters();
        Assert.assertEquals(38L, counters.get(TaskCounter.MAP_INPUT_RECORDS.name()));
        Assert.assertEquals(380L, counters.get(TaskCounter.MAP_OUTPUT_RECORDS.name()));
        Assert.assertEquals(320L, counters.get(TaskCounter.REDUCE_INPUT_RECORDS.name()));
        Assert.assertEquals(1L, counters.get(TaskCounter.REDUCE_OUTPUT_RECORDS.name()));
        List<MRTaskInfo> mapTasks = mRJobInfo.getMapTasks();
        List<MRTaskInfo> reduceTasks = mRJobInfo.getReduceTasks();
        Assert.assertEquals(2L, mapTasks.size());
        Assert.assertEquals(1L, reduceTasks.size());
        MRTaskInfo findByTaskId = findByTaskId(mapTasks, "task_m_01");
        MRTaskInfo findByTaskId2 = findByTaskId(mapTasks, "task_m_02");
        MRTaskInfo findByTaskId3 = findByTaskId(reduceTasks, "task_r_01");
        Map counters2 = findByTaskId.getCounters();
        Assert.assertEquals(32L, counters2.get(TaskCounter.MAP_INPUT_RECORDS.name()));
        Assert.assertEquals(320L, counters2.get(TaskCounter.MAP_OUTPUT_RECORDS.name()));
        Map counters3 = findByTaskId2.getCounters();
        Assert.assertEquals(6L, counters3.get(TaskCounter.MAP_INPUT_RECORDS.name()));
        Assert.assertEquals(60L, counters3.get(TaskCounter.MAP_OUTPUT_RECORDS.name()));
        Map counters4 = findByTaskId3.getCounters();
        Assert.assertEquals(320L, counters4.get(TaskCounter.REDUCE_INPUT_RECORDS.name()));
        Assert.assertEquals(1L, counters4.get(TaskCounter.REDUCE_OUTPUT_RECORDS.name()));
        Assert.assertEquals(0.76f, mRJobInfo.getMapProgress().floatValue(), 0.01f);
        Assert.assertEquals(0.52f, mRJobInfo.getReduceProgress().floatValue(), 0.01f);
        Assert.assertEquals(1.0f, findByTaskId.getProgress(), 0.01f);
        Assert.assertEquals(0.12f, findByTaskId2.getProgress(), 0.01f);
        Assert.assertEquals(0.76f, findByTaskId3.getProgress(), 0.01f);
    }

    private void gauge(Map<String, String> map, String str, long j, Long l) throws Exception {
        metricStore.add(new MetricValues(map, str, j, l.longValue(), MetricType.GAUGE));
    }

    private Map<String, String> addToContext(Map<String, String> map, String str, String str2) {
        return ImmutableMap.builder().putAll(map).put(str, str2).build();
    }

    private MRTaskInfo findByTaskId(List<MRTaskInfo> list, String str) {
        for (MRTaskInfo mRTaskInfo : list) {
            if (mRTaskInfo.getTaskId().equals(str)) {
                return mRTaskInfo;
            }
        }
        throw new IllegalArgumentException(String.format("TaskId: %s not found in list of TaskInfos: %s", str, list));
    }
}
