package cascading.stats.hadoop;

import cascading.CascadingException;
import cascading.PlatformTestCase;
import cascading.TestFailAggregator;
import cascading.cascade.Cascade;
import cascading.cascade.CascadeConnector;
import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.planner.HadoopFlowStepJob;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.regex.RegexParser;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.tap.SinkMode;
import cascading.tuple.Fields;
import data.InputData;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:cascading/stats/hadoop/CascadingStatsLocalHadoopErrorPlatformTest.class */
public class CascadingStatsLocalHadoopErrorPlatformTest extends PlatformTestCase {

    /* loaded from: input_file:cascading/stats/hadoop/CascadingStatsLocalHadoopErrorPlatformTest$FailFunction.class */
    public class FailFunction extends BaseOperation implements Function {
        public FailFunction(Fields fields) {
            super(1, fields);
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            throw new CascadingException("testing");
        }
    }

    public CascadingStatsLocalHadoopErrorPlatformTest() {
        super(false);
    }

    @Before
    public void setUp() {
        HadoopFlowStepJob.reportLocalError((Throwable) null);
    }

    @After
    public void tearDown() {
        HadoopFlowStepJob.reportLocalError((Throwable) null);
    }

    @Test
    public void testLocalErrorReportingInMapper() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Cascade connect = new CascadeConnector(getProperties()).connect(new Flow[]{getPlatform().getFlowConnector().connect("mapper fail test", getPlatform().getTextFile(InputData.inputFileApache), getPlatform().getTextFile(getOutputPath("mapperfail"), SinkMode.REPLACE), new Each(new Pipe("failing mapper"), new Fields(new Comparable[]{"line"}), new FailFunction(new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})))});
        assertNull(connect.getCascadeStats().getThrowable());
        try {
            connect.complete();
            fail("An exception should have been thrown");
        } catch (Throwable th) {
            assertEquals(th, connect.getCascadeStats().getThrowable());
        }
    }

    @Test
    public void testLocalErrorReportingInReducer() throws Exception {
        getPlatform().copyFromLocal(InputData.inputFileApache);
        Cascade connect = new CascadeConnector(getProperties()).connect(new Flow[]{getPlatform().getFlowConnector().connect("reducer fail test", getPlatform().getTextFile(InputData.inputFileApache), getPlatform().getTextFile(getOutputPath("reducerfail"), SinkMode.REPLACE), new Every(new GroupBy(new Each(new Pipe("failing reducer"), new Fields(new Comparable[]{"line"}), new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"})), new Fields(new Comparable[]{"ip"})), new TestFailAggregator(new Fields(new Comparable[]{"count"}), 1)))});
        assertNull(connect.getCascadeStats().getThrowable());
        try {
            connect.complete();
            fail("An exception should have been thrown");
        } catch (Throwable th) {
            assertEquals(th, connect.getCascadeStats().getThrowable());
        }
    }
}
