package io.dstream.tez;

import io.dstream.SerializableStreamAssets;
import io.dstream.support.PartitionIdHelper;
import io.dstream.tez.io.KeyWritable;
import io.dstream.tez.io.TezDelegatingPartitioner;
import io.dstream.tez.io.ValueWritable;
import io.dstream.tez.utils.HdfsSerializerUtils;
import io.dstream.tez.utils.StreamUtils;
import io.dstream.utils.ReflectionUtils;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/dstream/tez/TezTaskProcessor.class */
public class TezTaskProcessor extends SimpleMRProcessor {
    private final Logger logger;
    private final String dagName;
    private final int taskIndex;
    private final String vertexName;
    private final Configuration configuration;
    private final ThreadLocal<Integer> partitionIdHolder;

    /* loaded from: input_file:io/dstream/tez/TezTaskProcessor$WritingConsumer.class */
    private static class WritingConsumer implements Consumer<Object> {
        private final KeyWritable kw = new KeyWritable();
        private final ValueWritable<Object> vw = new ValueWritable<>();
        private final KeyValueWriter kvWriter;

        public WritingConsumer(KeyValueWriter keyValueWriter) {
            this.kvWriter = keyValueWriter;
        }

        @Override // java.util.function.Consumer
        public void accept(Object obj) {
            try {
                if (obj instanceof Map.Entry) {
                    Map.Entry entry = (Map.Entry) obj;
                    if (entry.getKey() == null) {
                        Iterator it = (Iterator) entry.getValue();
                        while (it.hasNext()) {
                            this.vw.setValue(it.next());
                            this.kvWriter.write(this.kw, this.vw);
                        }
                    } else {
                        this.kw.setValue(((Map.Entry) obj).getKey());
                        this.vw.setValue(((Map.Entry) obj).getValue());
                        this.kvWriter.write(this.kw, this.vw);
                    }
                } else {
                    this.vw.setValue(obj);
                    this.kvWriter.write(this.kw, this.vw);
                }
            } catch (Exception e) {
                e.printStackTrace();
                throw new IllegalStateException("Failed to write " + obj + " to KV Writer", e);
            }
        }
    }

    public TezTaskProcessor(ProcessorContext processorContext) {
        super(processorContext);
        this.logger = LoggerFactory.getLogger(TezTaskProcessor.class);
        this.dagName = getContext().getDAGName();
        this.taskIndex = getContext().getTaskIndex();
        this.vertexName = getContext().getTaskVertexName();
        this.configuration = new Configuration();
        try {
            Field findField = ReflectionUtils.findField(PartitionIdHelper.class, "partitionIdHolder", ThreadLocal.class);
            findField.setAccessible(true);
            this.partitionIdHolder = (ThreadLocal) findField.get(null);
            this.partitionIdHolder.set(Integer.valueOf(this.taskIndex));
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    public void run() throws Exception {
        Stream stream;
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Executing processor for task: " + this.taskIndex + "; DAG " + this.dagName + "; Vertex " + this.vertexName);
        }
        List list = (List) getOrderedInputs().stream().map(logicalInput -> {
            try {
                return logicalInput.getReader();
            } catch (Exception e) {
                throw new IllegalStateException("Failed to get reader", e);
            }
        }).map(reader -> {
            return reader instanceof KeyValueReader ? StreamUtils.toStream((KeyValueReader) reader) : StreamUtils.toStream((KeyValuesReader) reader);
        }).collect(Collectors.toList());
        Task task = getTask();
        if (list.size() > 0) {
            stream = (Stream) list.get(0);
            if (list.size() > 1) {
                stream = list.stream();
            }
        } else {
            if (task.getStreamProducingSourceSupplier() == null) {
                throw new IllegalStateException("Unexpected condition. Can't determine input Stream for task");
            }
            getContext().getTaskIndex();
            stream = task.getStreamProducingSourceSupplier().get();
        }
        SerializableStreamAssets.SerFunction extractTaskFunction = extractTaskFunction();
        WritingConsumer writingConsumer = new WritingConsumer(((LogicalOutput) getOutputs().values().iterator().next()).getWriter());
        try {
            if (extractTaskFunction == null) {
                stream.forEach(writingConsumer);
            } else {
                ((Stream) extractTaskFunction.apply(stream)).forEach(writingConsumer);
            }
            this.logger.info("Finished processing task-[" + this.dagName + ":" + this.vertexName + ":" + this.taskIndex + "]");
        } catch (Exception e) {
            e.printStackTrace();
            throw new IllegalStateException("Failed to process Tez task", e);
        }
    }

    private List<LogicalInput> getOrderedInputs() {
        TreeMap treeMap = new TreeMap(new Comparator<String>() { // from class: io.dstream.tez.TezTaskProcessor.1
            @Override // java.util.Comparator
            public int compare(String str, String str2) {
                int parseInt = Integer.parseInt(str.split(":")[0]);
                int parseInt2 = Integer.parseInt(str2.split(":")[0]);
                if (parseInt == parseInt2) {
                    return 0;
                }
                return parseInt > parseInt2 ? 1 : -1;
            }
        });
        treeMap.putAll(this.inputs);
        return (List) treeMap.entrySet().stream().map(entry -> {
            return (LogicalInput) entry.getValue();
        }).collect(Collectors.toList());
    }

    private SerializableStreamAssets.SerFunction extractTaskFunction() throws Exception {
        return getTask().getFunction();
    }

    private Task getTask() throws Exception {
        ObjectRegistry objectRegistry = getContext().getObjectRegistry();
        Task task = (Task) objectRegistry.get(this.vertexName);
        if (task == null) {
            FileSystem fileSystem = FileSystem.get(this.configuration);
            ByteBuffer payload = getContext().getUserPayload().getPayload();
            byte[] bArr = new byte[payload.capacity()];
            payload.get(bArr);
            task = (Task) HdfsSerializerUtils.deserialize(new Path(new String(bArr)), fileSystem, Task.class);
            objectRegistry.cacheForDAG(this.vertexName, task);
            TezDelegatingPartitioner.setDelegator(task.getClassifier());
        }
        return task;
    }
}
