package org.apache.ignite.internal.processors.hadoop;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.processors.hadoop.counter.GridHadoopFSCounterWriter;
import org.apache.ignite.internal.processors.hadoop.counter.GridHadoopPerformanceCounter;
import org.apache.ignite.internal.processors.hadoop.examples.GridHadoopWordCount1;
import org.apache.ignite.internal.processors.hadoop.examples.GridHadoopWordCount2;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.testframework.GridTestUtils;

/* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceTest.class */
public class GridHadoopMapReduceTest extends GridHadoopAbstractWordCountTest {
    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.internal.processors.hadoop.GridHadoopAbstractWordCountTest, org.apache.ignite.internal.processors.hadoop.GridHadoopAbstractSelfTest
    public int gridCount() {
        return 3;
    }

    public void testWholeMapReduceExecution() throws Exception {
        IgfsPath igfsPath = new IgfsPath("/input");
        this.igfs.mkdirs(igfsPath);
        IgfsPath igfsPath2 = new IgfsPath(igfsPath, GridHadoopWordCount2.class.getSimpleName() + "-input");
        generateTestFile(igfsPath2.toString(), "red", 100000, "blue", 200000, "green", 150000, "yellow", 70000);
        for (int i = 0; i < 8; i++) {
            this.igfs.delete(new IgfsPath("/output"), true);
            boolean z = (i & 1) == 0;
            boolean z2 = (i & 2) == 0;
            boolean z3 = (i & 4) == 0;
            JobConf jobConf = new JobConf();
            jobConf.set("ignite.counters.writer", GridHadoopFSCounterWriter.class.getName());
            jobConf.setUser("yyy");
            jobConf.set("ignite.counters.fswriter.directory", "/xxx/${USER}/zzz");
            jobConf.setInt("mapreduce.input.fileinputformat.split.maxsize", 65000);
            jobConf.setInt("fs.local.block.size", 65000);
            setupFileSystems(jobConf);
            GridHadoopWordCount1.setTasksClasses(jobConf, !z, !z2, !z3);
            Job job = Job.getInstance(jobConf);
            GridHadoopWordCount2.setTasksClasses(job, z, z2, z3);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.setInputPaths(job, new Path[]{new Path(igfsScheme() + igfsPath2.toString())});
            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + "/output"));
            job.setJarByClass(GridHadoopWordCount2.class);
            GridHadoopJobId gridHadoopJobId = new GridHadoopJobId(UUID.randomUUID(), 1);
            grid(0).hadoop().submit(gridHadoopJobId, GridHadoopUtils.createJobInfo(job.getConfiguration())).get();
            checkJobStatistics(gridHadoopJobId);
            assertEquals("Use new mapper: " + z + ", new combiner: " + z2 + ", new reducer: " + z3, "blue\t200000\ngreen\t150000\nred\t100000\nyellow\t70000\n", readAndSortFile("/output/" + (z3 ? "part-r-" : "part-") + "00000"));
        }
    }

    private void checkJobStatistics(GridHadoopJobId gridHadoopJobId) throws IgniteCheckedException, IOException {
        String str;
        String str2;
        GridHadoopPerformanceCounter counter = GridHadoopPerformanceCounter.getCounter(grid(0).hadoop().counters(gridHadoopJobId), (UUID) null);
        TreeMap treeMap = new TreeMap();
        HashMap hashMap = new HashMap();
        hashMap.put("submit", 0);
        hashMap.put("prepare", 1);
        hashMap.put("start", 2);
        hashMap.put("Cstart", 3);
        hashMap.put("finish", 4);
        String str3 = null;
        long j = 0;
        for (T2 t2 : counter.evts()) {
            String[] split = ((String) t2.get1()).split(" ");
            if ("JOB".equals(split[0])) {
                str = split[0];
                str2 = split[1];
            } else {
                str = ("COMBINE".equals(split[0]) ? "MAP" : split[0].substring(0, 3)) + split[1];
                str2 = ("COMBINE".equals(split[0]) ? "C" : "") + split[2];
            }
            if (!str.equals(str3)) {
                treeMap.put(str, new TreeMap());
            }
            Integer num = (Integer) hashMap.get(str2);
            assertNotNull("Invalid phase " + str2, num);
            ((SortedMap) treeMap.get(str)).put(num, t2.get2());
            str3 = str;
            j++;
        }
        for (Map.Entry entry : treeMap.entrySet()) {
            long j2 = 0;
            for (Map.Entry entry2 : ((Map) entry.getValue()).entrySet()) {
                assertTrue("Phase order of " + ((String) entry.getKey()) + " is invalid", ((Long) entry2.getValue()).longValue() >= j2);
                j2 = ((Long) entry2.getValue()).longValue();
            }
        }
        final IgfsPath igfsPath = new IgfsPath("/xxx/yyy/zzz/" + gridHadoopJobId + "/performance");
        GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.apache.ignite.internal.processors.hadoop.GridHadoopMapReduceTest.1
            public boolean apply() {
                return GridHadoopMapReduceTest.this.igfs.exists(igfsPath);
            }
        }, 10000L);
        assertEquals(j, GridHadoopTestUtils.simpleCheckJobStatFile(new BufferedReader(new InputStreamReader(this.igfs.open(igfsPath)))));
    }
}
