package io.dstream.tez;

import io.dstream.support.SourceSupplier;
import io.dstream.support.UriSourceSupplier;
import io.dstream.tez.io.KeyWritable;
import io.dstream.tez.io.TezDelegatingPartitioner;
import io.dstream.tez.io.ValueWritable;
import io.dstream.tez.utils.HdfsSerializerUtils;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/dstream/tez/TezDAGBuilder.class */
public class TezDAGBuilder {
    private final DAG dag;
    private final ExecutionContextAwareTezClient tezClient;
    private final TezDagExecutor dagExecutor;
    private Vertex lastVertex;
    private int inputOrderCounter;
    private final Logger logger = LoggerFactory.getLogger(TezDAGBuilder.class);
    private final OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig.newBuilder("io.dstream.tez.io.KeyWritable", "io.dstream.tez.io.ValueWritable", TezDelegatingPartitioner.class.getName(), (Map) null).build();

    public TezDAGBuilder(String str, ExecutionContextAwareTezClient executionContextAwareTezClient, Properties properties) {
        this.dag = DAG.create(str + "_" + System.currentTimeMillis());
        this.tezClient = executionContextAwareTezClient;
        this.dagExecutor = new TezDagExecutor(this.tezClient, this.dag);
    }

    public void addTask(TaskDescriptor taskDescriptor) {
        if (taskDescriptor.getId() == 0) {
            determineInputFormatClass(taskDescriptor);
        }
        ProcessorDescriptor processorDescriptor = (ProcessorDescriptor) ProcessorDescriptor.create(TezTaskProcessor.class.getName()).setUserPayload(createPayloadFromTaskSerPath(Task.build(taskDescriptor), this.dag.getName()));
        UriSourceSupplier sourceSupplier = taskDescriptor.getSourceSupplier();
        Vertex createVertex = createVertex(taskDescriptor, processorDescriptor);
        this.dag.addVertex(createVertex);
        if (taskDescriptor.getId() != 0) {
            addEdge(createVertex);
        } else if (sourceSupplier instanceof UriSourceSupplier) {
            Stream<URI> stream = sourceSupplier.get();
            DataSourceDescriptor buildDataSourceDescriptorFromUris = buildDataSourceDescriptorFromUris(taskDescriptor.getInputFormatClass(), stream);
            StringBuilder sb = new StringBuilder();
            int i = this.inputOrderCounter;
            this.inputOrderCounter = i + 1;
            createVertex.addDataSource(sb.append(i).append(":").append(createVertex.getName()).append("_INPUT_").append(Arrays.asList(stream)).toString(), buildDataSourceDescriptorFromUris);
        }
        if (taskDescriptor.getDependentTasksChains() != null) {
            taskDescriptor.getDependentTasksChains().forEach(list -> {
                list.forEach(this::addTask);
                addEdge(createVertex);
            });
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Created Vertex: " + createVertex);
        }
        this.lastVertex = createVertex;
    }

    private Vertex createVertex(TaskDescriptor taskDescriptor, ProcessorDescriptor processorDescriptor) {
        Vertex create;
        String str = taskDescriptor.getName() + "_" + taskDescriptor.getOperationName();
        if (taskDescriptor.getId() == 0 && (taskDescriptor.getSourceSupplier() instanceof UriSourceSupplier)) {
            StringBuilder sb = new StringBuilder();
            int i = this.inputOrderCounter;
            this.inputOrderCounter = i + 1;
            create = Vertex.create(sb.append(i).append(":").append(str).toString(), processorDescriptor);
        } else {
            StringBuilder sb2 = new StringBuilder();
            int i2 = this.inputOrderCounter;
            this.inputOrderCounter = i2 + 1;
            create = Vertex.create(sb2.append(i2).append(":").append(str).toString(), processorDescriptor, taskDescriptor.getParallelism());
        }
        Vertex vertex = create;
        vertex.addTaskLocalFiles(this.tezClient.getLocalResources());
        return vertex;
    }

    private void addEdge(Vertex vertex) {
        this.dag.addEdge(Edge.create(this.lastVertex, vertex, this.edgeConf.createDefaultEdgeProperty()));
    }

    public void addDataSink(String str) {
        createDataSink(this.lastVertex, this.tezClient.getClientName() + "_OUTPUT", KeyWritable.class, ValueWritable.class, SequenceFileOutputFormat.class, str);
        this.lastVertex = null;
    }

    public Runnable build() {
        return this.dagExecutor;
    }

    private DataSourceDescriptor buildDataSourceDescriptorFromUris(Class<?> cls, Stream<URI> stream) {
        return MRInput.createConfigBuilder(this.tezClient.getTezConfiguration(), cls, (String) stream.map(uri -> {
            return uri.getPath();
        }).reduce((str, str2) -> {
            return str + "," + str2;
        }).get()).groupSplits(false).build();
    }

    private UserPayload createPayloadFromTaskSerPath(Task task, String str) {
        return UserPayload.create(ByteBuffer.wrap(HdfsSerializerUtils.serialize(task, this.tezClient.getFileSystem(), new Path(str + "/tasks/" + task.getId() + "_" + task.getName() + ".ser")).toString().getBytes()));
    }

    private void createDataSink(Vertex vertex, String str, Class<? extends Writable> cls, Class<? extends Writable> cls2, Class<?> cls3, String str2) {
        vertex.addDataSink(str, MROutput.createConfigBuilder(buildJobConf(cls, cls2), cls3, str2).build());
    }

    private JobConf buildJobConf(Class<? extends Writable> cls, Class<? extends Writable> cls2) {
        JobConf jobConf = new JobConf(this.tezClient.getTezConfiguration());
        jobConf.setOutputKeyClass(cls);
        jobConf.setOutputValueClass(cls2);
        return jobConf;
    }

    private void determineInputFormatClass(TaskDescriptor taskDescriptor) {
        SourceSupplier sourceSupplier = taskDescriptor.getSourceSupplier();
        Class<?> sourceElementType = taskDescriptor.getSourceElementType();
        if (sourceSupplier instanceof UriSourceSupplier) {
            if (!sourceElementType.isAssignableFrom(String.class)) {
                throw new IllegalArgumentException("Failed to determine Input Format class for source item type " + sourceElementType);
            }
            taskDescriptor.setInputFormatClass(TextInputFormat.class);
        }
    }
}
