package org.apache.tez.mapreduce.examples;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.AMConfiguration;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezSession;
import org.apache.tez.client.TezSessionConfiguration;
import org.apache.tez.client.TezSessionStatus;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.mapreduce.committer.MROutputCommitter;
import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
import org.apache.tez.runtime.library.input.ShuffledMergedInput;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;

/* loaded from: input_file:org/apache/tez/mapreduce/examples/UnionExample.class */
public class UnionExample {
    private Credentials credentials = new Credentials();

    /* loaded from: input_file:org/apache/tez/mapreduce/examples/UnionExample$TokenProcessor.class */
    public static class TokenProcessor implements LogicalIOProcessor {
        TezProcessorContext context;
        IntWritable one = new IntWritable(1);
        Text word = new Text();

        public void initialize(TezProcessorContext tezProcessorContext) throws Exception {
            this.context = tezProcessorContext;
        }

        public void handleEvents(List<Event> list) {
        }

        public void close() throws Exception {
        }

        public void run(Map<String, LogicalInput> map, Map<String, LogicalOutput> map2) throws Exception {
            Preconditions.checkArgument(map.size() == 1);
            Iterator<LogicalInput> it = map.values().iterator();
            while (it.hasNext()) {
                it.next().start();
            }
            Iterator<LogicalOutput> it2 = map2.values().iterator();
            while (it2.hasNext()) {
                it2.next().start();
            }
            boolean z = true;
            if (this.context.getTaskVertexName().equals("map3")) {
                z = false;
            }
            Preconditions.checkArgument(map2.size() == (z ? 2 : 1));
            Preconditions.checkArgument(map2.containsKey("checker"));
            KeyValueReader reader = map.values().iterator().next().getReader();
            KeyValueWriter writer = map2.get("checker").getWriter();
            MROutput mROutput = null;
            KeyValueWriter keyValueWriter = null;
            if (z) {
                mROutput = (MROutput) map2.get("parts");
                keyValueWriter = mROutput.getWriter();
            }
            while (reader.next()) {
                StringTokenizer stringTokenizer = new StringTokenizer(reader.getCurrentValue().toString());
                while (stringTokenizer.hasMoreTokens()) {
                    this.word.set(stringTokenizer.nextToken());
                    writer.write(this.word, this.one);
                    if (z) {
                        keyValueWriter.write(this.word, this.one);
                    }
                }
            }
            if (z && mROutput.isCommitRequired()) {
                while (!this.context.canCommit()) {
                    Thread.sleep(100L);
                }
                mROutput.commit();
            }
        }
    }

    /* loaded from: input_file:org/apache/tez/mapreduce/examples/UnionExample$UnionProcessor.class */
    public static class UnionProcessor implements LogicalIOProcessor {
        TezProcessorContext context;
        IntWritable one = new IntWritable(1);

        public void initialize(TezProcessorContext tezProcessorContext) throws Exception {
            this.context = tezProcessorContext;
        }

        public void handleEvents(List<Event> list) {
        }

        public void close() throws Exception {
        }

        public void run(Map<String, LogicalInput> map, Map<String, LogicalOutput> map2) throws Exception {
            Preconditions.checkArgument(map.size() == 2);
            Preconditions.checkArgument(map2.size() == 2);
            Iterator<LogicalInput> it = map.values().iterator();
            while (it.hasNext()) {
                it.next().start();
            }
            Iterator<LogicalOutput> it2 = map2.values().iterator();
            while (it2.hasNext()) {
                it2.next().start();
            }
            MROutput mROutput = map2.get("union");
            MROutput mROutput2 = map2.get("all-parts");
            KeyValueWriter writer = mROutput.getWriter();
            KeyValueWriter writer2 = mROutput2.getWriter();
            HashMap newHashMap = Maps.newHashMap();
            KeyValuesReader reader = map.get("union").getReader();
            while (reader.next()) {
                String text = ((Text) reader.getCurrentKey()).toString();
                IntWritable intWritable = (IntWritable) reader.getCurrentValues().iterator().next();
                for (int i = 0; i < intWritable.get(); i++) {
                    writer2.write(text, this.one);
                }
                AtomicInteger atomicInteger = (AtomicInteger) newHashMap.get(text);
                if (atomicInteger == null) {
                    newHashMap.put(text, new AtomicInteger(intWritable.get()));
                } else {
                    atomicInteger.addAndGet(intWritable.get());
                }
            }
            KeyValuesReader reader2 = map.get("map3").getReader();
            while (reader2.next()) {
                String text2 = ((Text) reader2.getCurrentKey()).toString();
                IntWritable intWritable2 = (IntWritable) reader2.getCurrentValues().iterator().next();
                AtomicInteger atomicInteger2 = (AtomicInteger) newHashMap.get(text2);
                if (atomicInteger2 == null) {
                    throw new TezUncheckedException("Expected to exist: " + text2);
                }
                atomicInteger2.getAndAdd(intWritable2.get() * (-2));
            }
            Iterator it3 = newHashMap.values().iterator();
            while (it3.hasNext()) {
                if (((AtomicInteger) it3.next()).get() != 0) {
                    throw new TezUncheckedException("Unexpected non-zero value");
                }
            }
            writer.write("Union", new IntWritable(newHashMap.size()));
            if (mROutput.isCommitRequired()) {
                while (!this.context.canCommit()) {
                    Thread.sleep(100L);
                }
                mROutput.commit();
            }
            if (mROutput2.isCommitRequired()) {
                while (!this.context.canCommit()) {
                    Thread.sleep(100L);
                }
                mROutput2.commit();
            }
        }
    }

    private DAG createDAG(FileSystem fileSystem, TezConfiguration tezConfiguration, Map<String, LocalResource> map, Path path, String str, String str2) throws IOException {
        JobConf jobConf = new JobConf(tezConfiguration);
        jobConf.set("mapreduce.map.output.key.class", Text.class.getName());
        jobConf.set("mapreduce.map.output.value.class", IntWritable.class.getName());
        jobConf.set("mapreduce.job.inputformat.class", TezGroupedSplitsInputFormat.class.getName());
        jobConf.set("mapreduce.input.fileinputformat.inputdir", str);
        jobConf.setBoolean("mapred.mapper.new-api", true);
        MultiStageMRConfToTezTranslator.translateVertexConfToTez(jobConf, (Configuration) null);
        JobConf jobConf2 = new JobConf(tezConfiguration);
        jobConf2.set("mapreduce.map.output.key.class", Text.class.getName());
        jobConf2.set("mapreduce.map.output.value.class", IntWritable.class.getName());
        jobConf2.set("mapreduce.job.outputformat.class", TextOutputFormat.class.getName());
        jobConf2.set("mapreduce.output.fileoutputformat.outputdir", str2);
        jobConf2.setBoolean("mapred.mapper.new-api", true);
        MultiStageMRConfToTezTranslator.translateVertexConfToTez(jobConf2, jobConf);
        MRHelpers.doJobClientMagic(jobConf);
        MRHelpers.doJobClientMagic(jobConf2);
        byte[] createUserPayloadFromConf = MRHelpers.createUserPayloadFromConf(jobConf);
        byte[] createMRInputPayloadWithGrouping = MRHelpers.createMRInputPayloadWithGrouping(createUserPayloadFromConf, TextInputFormat.class.getName());
        Vertex vertex = new Vertex("map1", new ProcessorDescriptor(TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(jobConf));
        vertex.setJavaOpts(MRHelpers.getMapJavaOpts(jobConf));
        HashMap hashMap = new HashMap();
        MRHelpers.updateEnvironmentForMRTasks(jobConf, hashMap, true);
        vertex.setTaskEnvironment(hashMap);
        InputDescriptor userPayload = new InputDescriptor(MRInput.class.getName()).setUserPayload(createMRInputPayloadWithGrouping);
        vertex.addInput("MRInput", userPayload, MRInputAMSplitGenerator.class);
        Vertex vertex2 = new Vertex("map2", new ProcessorDescriptor(TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(jobConf));
        vertex2.setJavaOpts(MRHelpers.getMapJavaOpts(jobConf));
        MRHelpers.updateEnvironmentForMRTasks(jobConf, hashMap, true);
        vertex2.setTaskEnvironment(hashMap);
        vertex2.addInput("MRInput", userPayload, MRInputAMSplitGenerator.class);
        Vertex vertex3 = new Vertex("map3", new ProcessorDescriptor(TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(jobConf));
        vertex3.setJavaOpts(MRHelpers.getMapJavaOpts(jobConf));
        MRHelpers.updateEnvironmentForMRTasks(jobConf, hashMap, true);
        vertex3.setTaskEnvironment(hashMap);
        vertex3.addInput("MRInput", userPayload, MRInputAMSplitGenerator.class);
        byte[] createUserPayloadFromConf2 = MRHelpers.createUserPayloadFromConf(jobConf2);
        Vertex vertex4 = new Vertex("checker", new ProcessorDescriptor(UnionProcessor.class.getName()).setUserPayload(createUserPayloadFromConf2), 1, MRHelpers.getReduceResource(jobConf2));
        vertex4.setJavaOpts(MRHelpers.getReduceJavaOpts(jobConf2));
        HashMap hashMap2 = new HashMap();
        MRHelpers.updateEnvironmentForMRTasks(jobConf2, hashMap2, false);
        vertex4.setTaskEnvironment(hashMap2);
        vertex4.addOutput("union", new OutputDescriptor(MROutput.class.getName()).setUserPayload(createUserPayloadFromConf2), MROutputCommitter.class);
        Configuration configuration = new Configuration(jobConf2);
        configuration.set("mapreduce.output.fileoutputformat.outputdir", str2 + "-parts");
        byte[] createUserPayloadFromConf3 = MRHelpers.createUserPayloadFromConf(configuration);
        DAG dag = new DAG("UnionExample");
        VertexGroup createVertexGroup = dag.createVertexGroup("union", new Vertex[]{vertex, vertex2});
        OutputDescriptor userPayload2 = new OutputDescriptor(MROutput.class.getName()).setUserPayload(createUserPayloadFromConf3);
        Configuration configuration2 = new Configuration(jobConf2);
        configuration2.set("mapreduce.output.fileoutputformat.outputdir", str2 + "-all-parts");
        OutputDescriptor userPayload3 = new OutputDescriptor(MROutput.class.getName()).setUserPayload(MRHelpers.createUserPayloadFromConf(configuration2));
        createVertexGroup.addOutput("parts", userPayload2, MROutputCommitter.class);
        vertex4.addOutput("all-parts", userPayload3, MROutputCommitter.class);
        dag.addVertex(vertex).addVertex(vertex2).addVertex(vertex3).addVertex(vertex4).addEdge(new Edge(vertex3, vertex4, new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class.getName()).setUserPayload(createUserPayloadFromConf), new InputDescriptor(ShuffledMergedInput.class.getName()).setUserPayload(createUserPayloadFromConf2)))).addEdge(new GroupInputEdge(createVertexGroup, vertex4, new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class.getName()).setUserPayload(createUserPayloadFromConf), new InputDescriptor(ShuffledMergedInput.class.getName()).setUserPayload(createUserPayloadFromConf2)), new InputDescriptor(ConcatenatedMergedKeyValuesInput.class.getName())));
        return dag;
    }

    private static void waitForTezSessionReady(TezSession tezSession) throws IOException, TezException {
        while (true) {
            TezSessionStatus sessionStatus = tezSession.getSessionStatus();
            if (sessionStatus.equals(TezSessionStatus.SHUTDOWN)) {
                throw new RuntimeException("TezSession has already shutdown");
            }
            if (sessionStatus.equals(TezSessionStatus.READY)) {
                return;
            }
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    private static void printUsage() {
        System.err.println("Usage:  unionexample <in1> <out1>");
    }

    public boolean run(String str, String str2, Configuration configuration) throws Exception {
        DAGStatus dAGStatus;
        System.out.println("Running UnionExample");
        TezConfiguration tezConfiguration = configuration != null ? new TezConfiguration(configuration) : new TezConfiguration();
        UserGroupInformation.setConfiguration(tezConfiguration);
        String shortUserName = UserGroupInformation.getCurrentUser().getShortUserName();
        ApplicationId createApplication = new TezClient(tezConfiguration).createApplication();
        FileSystem fileSystem = FileSystem.get(tezConfiguration);
        String str3 = "/user/" + shortUserName + "/.staging//" + createApplication.toString();
        Path path = new Path(str3);
        tezConfiguration.set("tez.staging-dir", str3);
        Path makeQualified = fileSystem.makeQualified(path);
        TokenCache.obtainTokensForNamenodes(this.credentials, new Path[]{makeQualified}, tezConfiguration);
        TezClientUtils.ensureStagingDirExists(tezConfiguration, makeQualified);
        tezConfiguration.set("tez.am.java.opts", MRHelpers.getMRAMJavaOpts(tezConfiguration));
        TezSession tezSession = new TezSession("UnionExampleSession", createApplication, new TezSessionConfiguration(new AMConfiguration((Map) null, (Map) null, tezConfiguration, this.credentials), tezConfiguration));
        tezSession.start();
        String[] strArr = {"map1", "map2", "map3", "checker"};
        EnumSet of = EnumSet.of(StatusGetOpts.GET_COUNTERS);
        try {
            if (fileSystem.exists(new Path(str2))) {
                throw new FileAlreadyExistsException("Output directory " + str2 + " already exists");
            }
            DAG createDAG = createDAG(fileSystem, tezConfiguration, new TreeMap(), makeQualified, str, str2);
            waitForTezSessionReady(tezSession);
            DAGClient submitDAG = tezSession.submitDAG(createDAG);
            while (true) {
                dAGStatus = submitDAG.getDAGStatus(of);
                if (dAGStatus.getState() == DAGStatus.State.RUNNING || dAGStatus.getState() == DAGStatus.State.SUCCEEDED || dAGStatus.getState() == DAGStatus.State.FAILED || dAGStatus.getState() == DAGStatus.State.KILLED || dAGStatus.getState() == DAGStatus.State.ERROR) {
                    break;
                }
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                }
            }
            while (dAGStatus.getState() == DAGStatus.State.RUNNING) {
                try {
                    ExampleDriver.printDAGStatus(submitDAG, strArr);
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e2) {
                    }
                    dAGStatus = submitDAG.getDAGStatus(of);
                } catch (TezException e3) {
                    System.exit(-1);
                }
            }
            ExampleDriver.printDAGStatus(submitDAG, strArr, true, true);
            System.out.println("DAG completed. FinalState=" + dAGStatus.getState());
            if (dAGStatus.getState() == DAGStatus.State.SUCCEEDED) {
                return true;
            }
            System.out.println("DAG diagnostics: " + dAGStatus.getDiagnostics());
            fileSystem.delete(makeQualified, true);
            tezSession.stop();
            return false;
        } finally {
            fileSystem.delete(makeQualified, true);
            tezSession.stop();
        }
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr.length % 2 != 0) {
            printUsage();
            System.exit(2);
        }
        new UnionExample().run(strArr[0], strArr[1], null);
    }
}
