package org.apache.hadoop.streaming;

import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.TaskType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:test-classes/org/apache/hadoop/streaming/TestStreamingStatus.class */
public class TestStreamingStatus {
    protected static String TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp"), TestStreamingStatus.class.getSimpleName()).toURI().toString().replace(' ', '+');
    protected String INPUT_FILE = TEST_ROOT_DIR + "/input.txt";
    protected String OUTPUT_DIR = TEST_ROOT_DIR + "/out";
    protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
    protected String map = null;
    protected String reduce = null;
    protected String scriptFile = TEST_ROOT_DIR + "/perlScript.pl";
    protected String scriptFileName = new Path(this.scriptFile).toUri().getPath();
    String expectedStderr = "my error msg before consuming input\nmy error msg after consuming input\n";
    String expectedOutput = null;
    String expectedStatus = "before consuming input";
    protected String script = "#!/usr/bin/perl\nprint STDERR \"reporter:status:" + this.expectedStatus + "\\n\";\nprint STDERR \"reporter:counter:myOwnCounterGroup,myOwnCounter,1\\n\";\nprint STDERR \"my error msg before consuming input\\n\";\nfor($count = 1500; $count >= 1; $count--) {print STDOUT \"$count \";}while(<STDIN>) {chomp;}\nprint STDERR \"my error msg after consuming input\\n\";\nprint STDERR \"reporter:counter:myOwnCounterGroup,myOwnCounter,1\\n\";\n";
    MiniMRCluster mr = null;
    FileSystem fs = null;
    JobConf conf = null;

    @Before
    public void setUp() throws IOException {
        this.conf = new JobConf();
        this.conf.setBoolean("mapreduce.jobtracker.retirejobs", false);
        this.conf.setBoolean("mapreduce.jobtracker.persist.jobstatus.active", false);
        this.mr = new MiniMRCluster(1, "file:///", 3, (String[]) null, (String[]) null, this.conf);
        this.fs = new Path(this.INPUT_FILE).getFileSystem(this.mr.createJobConf());
        clean(this.fs);
        buildExpectedJobOutput();
    }

    @After
    public void tearDown() {
        if (this.fs != null) {
            clean(this.fs);
        }
        if (this.mr != null) {
            this.mr.shutdown();
        }
    }

    void buildExpectedJobOutput() {
        if (this.expectedOutput == null) {
            this.expectedOutput = "";
            for (int i = 1500; i >= 1; i--) {
                this.expectedOutput = this.expectedOutput.concat(Integer.toString(i) + " ");
            }
            this.expectedOutput = this.expectedOutput.trim();
        }
    }

    protected void createInputAndScript(boolean z, String str) throws IOException {
        makeInput(this.fs, z ? "" : this.input);
        FSDataOutputStream create = this.fs.create(new Path(this.scriptFileName));
        create.writeBytes(str);
        create.close();
    }

    protected String[] genArgs(String str, String str2, String str3) {
        return new String[]{"-input", this.INPUT_FILE, "-output", this.OUTPUT_DIR, "-mapper", str2, "-reducer", str3, "-jobconf", "mapreduce.job.maps=1", "-jobconf", "mapreduce.job.reduces=1", "-jobconf", "mapreduce.task.files.preserve.failedtasks=true", "-jobconf", "stream.tmpdir=" + new Path(TEST_ROOT_DIR).toUri().getPath(), "-jobconf", "mapreduce.jobtracker.address=" + str, "-jobconf", "fs.default.name=file:///", "-jobconf", "mapred.jar=" + TestStreaming.STREAMING_JAR, "-jobconf", "mapreduce.framework.name=yarn"};
    }

    public void makeInput(FileSystem fileSystem, String str) throws IOException {
        FSDataOutputStream create = fileSystem.create(new Path(this.INPUT_FILE));
        create.writeBytes(str);
        create.close();
    }

    protected void deleteOutDir(FileSystem fileSystem) {
        try {
            fileSystem.delete(new Path(this.OUTPUT_DIR), true);
        } catch (Exception e) {
        }
    }

    public void clean(FileSystem fileSystem) {
        deleteOutDir(fileSystem);
        try {
            Path path = new Path(this.INPUT_FILE);
            if (fileSystem.exists(path)) {
                fileSystem.delete(path, false);
            }
            Path path2 = new Path(this.scriptFile);
            if (fileSystem.exists(path2)) {
                fileSystem.delete(path2, false);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void testReporting() throws Exception {
        testStreamJob(false);
        testStreamJob(true);
    }

    private void testStreamJob(boolean z) throws IOException {
        createInputAndScript(z, this.script);
        this.map = this.scriptFileName;
        this.reduce = "/bin/cat";
        runStreamJob(TaskType.MAP, z);
        deleteOutDir(this.fs);
        this.map = "/bin/cat";
        this.reduce = this.scriptFileName;
        runStreamJob(TaskType.REDUCE, z);
        clean(this.fs);
    }

    void runStreamJob(TaskType taskType, boolean z) throws IOException {
        StreamJob streamJob = new StreamJob(genArgs(this.mr.createJobConf().get("mapreduce.jobtracker.address"), this.map, this.reduce), false);
        Assert.assertEquals(0L, streamJob.go());
        int i = 0;
        if (taskType == TaskType.MAP || !z) {
            validateTaskStatus(streamJob, taskType);
            validateJobOutput(streamJob.getConf());
            i = 2;
        }
        validateUserCounter(streamJob, i);
        validateTaskStderr(streamJob, taskType);
        deleteOutDir(this.fs);
    }

    void validateTaskStatus(StreamJob streamJob, TaskType taskType) throws IOException {
        TaskReport[] reduceTaskReports;
        String str;
        if (taskType == TaskType.MAP) {
            reduceTaskReports = streamJob.jc_.getMapTaskReports(streamJob.jobId_);
            str = "sort";
        } else {
            reduceTaskReports = streamJob.jc_.getReduceTaskReports(streamJob.jobId_);
            str = "reduce";
        }
        Assert.assertEquals(1L, reduceTaskReports.length);
        Assert.assertEquals(this.expectedStatus + " > " + str, reduceTaskReports[0].getState());
    }

    void validateJobOutput(Configuration configuration) throws IOException {
        Assert.assertTrue(MapReduceTestUtil.readOutput(new Path(this.OUTPUT_DIR), configuration).trim().equals(this.expectedOutput));
    }

    void validateTaskStderr(StreamJob streamJob, TaskType taskType) throws IOException {
        Assert.assertTrue(MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR, new TaskAttemptID(new TaskID(streamJob.jobId_, taskType, 0), 0), false).equals(this.expectedStderr.trim()));
    }

    void validateUserCounter(StreamJob streamJob, int i) throws IOException {
        Assert.assertEquals(i, streamJob.running_.getCounters().findCounter("myOwnCounterGroup", "myOwnCounter").getValue());
    }
}
