package org.apache.hyracks.hdfs.dataflow;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import org.apache.hyracks.hdfs.lib.RawBinaryComparatorFactory;
import org.apache.hyracks.hdfs.lib.RawBinaryHashFunctionFactory;
import org.apache.hyracks.hdfs.lib.TextKeyValueParserFactory;
import org.apache.hyracks.hdfs.lib.TextTupleWriterFactory;
import org.apache.hyracks.hdfs.scheduler.Scheduler;
import org.apache.hyracks.hdfs.utils.HyracksUtils;
import org.apache.hyracks.test.support.TestUtils;
import org.apache.hyracks.util.file.FileUtil;
import org.junit.Assert;

/* loaded from: input_file:org/apache/hyracks/hdfs/dataflow/DataflowTest.class */
public class DataflowTest extends TestCase {
    protected static final String HDFS_INPUT_PATH = "/customer/";
    protected static final String HDFS_OUTPUT_PATH = "/customer_result/";
    private MiniDFSCluster dfsCluster;
    private JobConf conf = new JobConf();
    private int numberOfNC = 2;
    protected static final String ACTUAL_RESULT_DIR = FileUtil.joinPath(new String[]{"target", "actual"});
    private static final String TEST_RESOURCES = FileUtil.joinPath(new String[]{"src", "test", "resources"});
    protected static final String EXPECTED_RESULT_PATH = FileUtil.joinPath(new String[]{TEST_RESOURCES, "expected"});
    private static final String PATH_TO_HADOOP_CONF = FileUtil.joinPath(new String[]{TEST_RESOURCES, "hadoop", "conf"});
    protected static final String BUILD_DIR = FileUtil.joinPath(new String[]{"target", "build"});
    private static final String DATA_PATH = FileUtil.joinPath(new String[]{TEST_RESOURCES, "data", "customer.tbl"});
    private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
    private static final String MINIDFS_BASEDIR = FileUtil.joinPath(new String[]{"target", "hdfs"});

    public void setUp() throws Exception {
        cleanupStores();
        HyracksUtils.init();
        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
        startHDFS();
    }

    private void cleanupStores() throws IOException {
        FileUtils.forceMkdir(new File(MINIDFS_BASEDIR));
        FileUtils.cleanDirectory(new File(MINIDFS_BASEDIR));
    }

    protected Configuration getConfiguration() {
        return this.conf;
    }

    protected MiniDFSCluster getMiniDFSCluster(Configuration configuration, int i) throws IOException {
        return new MiniDFSCluster(configuration, i, true, (String[]) null);
    }

    protected void startHDFS() throws IOException {
        getConfiguration().addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
        getConfiguration().addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
        getConfiguration().addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
        FileSystem.getLocal(new Configuration()).delete(new Path(BUILD_DIR), true);
        System.setProperty("hadoop.log.dir", FileUtil.joinPath(new String[]{"target", "logs"}));
        getConfiguration().set("hdfs.minidfs.basedir", MINIDFS_BASEDIR);
        this.dfsCluster = getMiniDFSCluster(getConfiguration(), this.numberOfNC);
        FileSystem fileSystem = FileSystem.get(getConfiguration());
        Path path = new Path(DATA_PATH);
        Path path2 = new Path(HDFS_INPUT_PATH);
        Path path3 = new Path(HDFS_OUTPUT_PATH);
        fileSystem.mkdirs(path2);
        fileSystem.mkdirs(path3);
        fileSystem.copyFromLocalFile(path, path2);
        DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
        getConfiguration().writeXml(dataOutputStream);
        dataOutputStream.flush();
        dataOutputStream.close();
    }

    public void testHDFSReadWriteOperators() throws Exception {
        FileInputFormat.setInputPaths(this.conf, HDFS_INPUT_PATH);
        FileOutputFormat.setOutputPath(this.conf, new Path(HDFS_OUTPUT_PATH));
        this.conf.setInputFormat(TextInputFormat.class);
        Scheduler scheduler = new Scheduler(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
        InputSplit[] splits = this.conf.getInputFormat().getSplits(this.conf, this.numberOfNC * 4);
        String[] locationConstraints = scheduler.getLocationConstraints(splits);
        JobSpecification jobSpecification = new JobSpecification();
        RecordDescriptor recordDescriptor = new RecordDescriptor(new ISerializerDeserializer[]{new UTF8StringSerializerDeserializer()});
        String[] strArr = {HyracksUtils.NC1_ID, HyracksUtils.NC1_ID, HyracksUtils.NC2_ID, HyracksUtils.NC2_ID};
        HDFSReadOperatorDescriptor hDFSReadOperatorDescriptor = new HDFSReadOperatorDescriptor(jobSpecification, recordDescriptor, this.conf, splits, locationConstraints, new TextKeyValueParserFactory());
        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpecification, hDFSReadOperatorDescriptor, strArr);
        ExternalSortOperatorDescriptor externalSortOperatorDescriptor = new ExternalSortOperatorDescriptor(jobSpecification, 10, new int[]{0}, new IBinaryComparatorFactory[]{RawBinaryComparatorFactory.INSTANCE}, recordDescriptor);
        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpecification, externalSortOperatorDescriptor, strArr);
        HDFSWriteOperatorDescriptor hDFSWriteOperatorDescriptor = new HDFSWriteOperatorDescriptor(jobSpecification, this.conf, new TextTupleWriterFactory());
        PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpecification, hDFSWriteOperatorDescriptor, new String[]{HyracksUtils.NC1_ID});
        jobSpecification.connect(new OneToOneConnectorDescriptor(jobSpecification), hDFSReadOperatorDescriptor, 0, externalSortOperatorDescriptor, 0);
        jobSpecification.connect(new MToNPartitioningMergingConnectorDescriptor(jobSpecification, new FieldHashPartitionComputerFactory(new int[]{0}, new IBinaryHashFunctionFactory[]{RawBinaryHashFunctionFactory.INSTANCE}), new int[]{0}, new IBinaryComparatorFactory[]{RawBinaryComparatorFactory.INSTANCE}, (INormalizedKeyComputerFactory) null), externalSortOperatorDescriptor, 0, hDFSWriteOperatorDescriptor, 0);
        jobSpecification.addRoot(hDFSWriteOperatorDescriptor);
        HyracksConnection hyracksConnection = new HyracksConnection(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
        hyracksConnection.waitForCompletion(hyracksConnection.startJob(jobSpecification));
        Assert.assertEquals(true, Boolean.valueOf(checkResults()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean checkResults() throws Exception {
        FileSystem.get(getConfiguration()).copyToLocalFile(new Path(HDFS_OUTPUT_PATH), new Path(ACTUAL_RESULT_DIR));
        TestUtils.compareWithResult(new File(FileUtil.joinPath(new String[]{EXPECTED_RESULT_PATH, "part-0"})), new File(FileUtil.joinPath(new String[]{ACTUAL_RESULT_DIR, "customer_result", "part-0"})));
        return true;
    }

    private void cleanupHDFS() throws Exception {
        this.dfsCluster.shutdown();
    }

    public void tearDown() throws Exception {
        HyracksUtils.deinit();
        cleanupHDFS();
    }
}
