/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.hadoop.format;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.hadoop.format.Employee;
import org.apache.beam.sdk.io.hadoop.format.EmployeeOutputFormat;
import org.apache.beam.sdk.io.hadoop.format.ExternalSynchronization;
import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormats;
import org.apache.beam.sdk.io.hadoop.format.TestEmployeeDataSet;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.verification.VerificationMode;

@RunWith(value=MockitoJUnitRunner.class)
public class HadoopFormatIOWriteTest {
    private static final int REDUCERS_COUNT = 2;
    private static final String LOCKS_FOLDER_NAME = "locks";
    private static Configuration conf;
    @Rule
    public final transient TestPipeline p = TestPipeline.create();
    @Rule
    public ExpectedException thrown = ExpectedException.none();
    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    @Before
    public void setUp() {
        conf = HadoopFormatIOWriteTest.loadTestConfiguration(EmployeeOutputFormat.class, Text.class, Employee.class);
        OutputCommitter mockedOutputCommitter = (OutputCommitter)Mockito.mock(OutputCommitter.class);
        EmployeeOutputFormat.initWrittenOutput(mockedOutputCommitter);
    }

    private static Configuration loadTestConfiguration(Class<?> outputFormatClassName, Class<?> keyClass, Class<?> valueClass) {
        Configuration conf = new Configuration();
        conf.setClass("mapreduce.job.outputformat.class", outputFormatClassName, OutputFormat.class);
        conf.setClass("mapreduce.job.output.key.class", keyClass, Object.class);
        conf.setClass("mapreduce.job.output.value.class", valueClass, Object.class);
        conf.setInt("mapreduce.job.reduces", 2);
        conf.set("mapreduce.job.id", String.valueOf(1));
        return conf;
    }

    @Test
    public void testWriteObjectCreationFailsIfConfigurationIsNull() {
        this.thrown.expect(NullPointerException.class);
        this.thrown.expectMessage("Hadoop configuration cannot be null");
        HadoopFormatIO.write().withConfiguration(null).withPartitioning().withExternalSynchronization((ExternalSynchronization)new HDFSSynchronization(this.getLocksDirPath()));
    }

    @Test
    public void testWriteValidationFailsMissingOutputFormatInConf() {
        Configuration configuration = new Configuration();
        configuration.setClass("mapreduce.job.output.key.class", Text.class, Object.class);
        configuration.setClass("mapreduce.job.output.value.class", Employee.class, Object.class);
        HadoopFormatIO.Write writeWithWrongConfig = HadoopFormatIO.write().withConfiguration(configuration).withPartitioning().withExternalSynchronization((ExternalSynchronization)new HDFSSynchronization(this.getLocksDirPath()));
        ((PCollection)this.p.apply((PTransform)Create.of(TestEmployeeDataSet.getEmployeeData()))).setTypeDescriptor(TypeDescriptors.kvs((TypeDescriptor)new TypeDescriptor<Text>(){}, (TypeDescriptor)new TypeDescriptor<Employee>(){})).apply("Write", (PTransform)writeWithWrongConfig);
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectMessage("Configuration must contain \"mapreduce.job.outputformat.class\"");
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testWriteValidationFailsMissingKeyClassInConf() {
        Configuration configuration = new Configuration();
        configuration.setClass("mapreduce.job.outputformat.class", TextOutputFormat.class, OutputFormat.class);
        configuration.setClass("mapreduce.job.output.value.class", Employee.class, Object.class);
        this.runValidationPipeline(configuration);
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectMessage("Configuration must contain \"mapreduce.job.output.key.class\"");
        this.p.run().waitUntilFinish();
    }

    private void runValidationPipeline(Configuration configuration) {
        ((PCollection)this.p.apply((PTransform)Create.of(TestEmployeeDataSet.getEmployeeData()))).setTypeDescriptor(TypeDescriptors.kvs((TypeDescriptor)new TypeDescriptor<Text>(){}, (TypeDescriptor)new TypeDescriptor<Employee>(){})).apply("Write", (PTransform)HadoopFormatIO.write().withConfiguration(configuration).withPartitioning().withExternalSynchronization((ExternalSynchronization)new HDFSSynchronization(this.getLocksDirPath())));
    }

    @Test
    public void testWriteValidationFailsMissingValueClassInConf() {
        Configuration configuration = new Configuration();
        configuration.setClass("mapreduce.job.outputformat.class", TextOutputFormat.class, OutputFormat.class);
        configuration.setClass("mapreduce.job.output.key.class", Text.class, Object.class);
        this.runValidationPipeline(configuration);
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectMessage("Configuration must contain \"mapreduce.job.output.value.class\"");
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testWriteValidationFailsMissingJobIDInConf() {
        Configuration configuration = new Configuration();
        configuration.setClass("mapreduce.job.outputformat.class", TextOutputFormat.class, OutputFormat.class);
        configuration.setClass("mapreduce.job.output.key.class", Text.class, Object.class);
        configuration.setClass("mapreduce.job.output.value.class", Employee.class, Object.class);
        configuration.set("mapreduce.output.fileoutputformat.outputdir", this.tmpFolder.getRoot().getAbsolutePath());
        this.runValidationPipeline(configuration);
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectMessage("Configuration must contain \"mapreduce.job.id\"");
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testWritingData() throws IOException {
        conf.set("mapreduce.output.fileoutputformat.outputdir", this.tmpFolder.getRoot().getAbsolutePath());
        List<KV<Text, Employee>> data = TestEmployeeDataSet.getEmployeeData();
        PCollection input = ((PCollection)this.p.apply((PTransform)Create.of(data))).setTypeDescriptor(TypeDescriptors.kvs((TypeDescriptor)new TypeDescriptor<Text>(){}, (TypeDescriptor)new TypeDescriptor<Employee>(){}));
        input.apply("Write", (PTransform)HadoopFormatIO.write().withConfiguration(conf).withPartitioning().withExternalSynchronization((ExternalSynchronization)new HDFSSynchronization(this.getLocksDirPath())));
        this.p.run();
        List<KV<Text, Employee>> writtenOutput = EmployeeOutputFormat.getWrittenOutput();
        Assert.assertEquals((long)data.size(), (long)writtenOutput.size());
        Assert.assertTrue((boolean)data.containsAll(writtenOutput));
        Assert.assertTrue((boolean)writtenOutput.containsAll(data));
        ((OutputCommitter)Mockito.verify((Object)EmployeeOutputFormat.getOutputCommitter())).commitJob((JobContext)Mockito.any());
        ((OutputCommitter)Mockito.verify((Object)EmployeeOutputFormat.getOutputCommitter(), (VerificationMode)Mockito.times((int)2))).commitTask((TaskAttemptContext)Mockito.any());
    }

    @Test
    public void testWritingDataFailInvalidKeyType() {
        conf.set("mapreduce.output.fileoutputformat.outputdir", this.tmpFolder.getRoot().getAbsolutePath());
        ArrayList<KV> data = new ArrayList<KV>();
        data.add(KV.of((Object)"key", (Object)new Employee("name", "address")));
        PCollection input = ((PCollection)this.p.apply("CreateData", (PTransform)Create.of(data))).setTypeDescriptor(TypeDescriptors.kvs((TypeDescriptor)new TypeDescriptor<String>(){}, (TypeDescriptor)new TypeDescriptor<Employee>(){}));
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectMessage(String.class.getName());
        input.apply("Write", (PTransform)HadoopFormatIO.write().withConfiguration(conf).withPartitioning().withExternalSynchronization((ExternalSynchronization)new HDFSSynchronization(this.getLocksDirPath())));
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testWritingDataFailInvalidValueType() {
        conf.set("mapreduce.output.fileoutputformat.outputdir", this.tmpFolder.getRoot().getAbsolutePath());
        ArrayList<KV> data = new ArrayList<KV>();
        data.add(KV.of((Object)new Text("key"), (Object)new Text("value")));
        TypeDescriptor<Text> textTypeDescriptor = new TypeDescriptor<Text>(){};
        PCollection input = ((PCollection)this.p.apply((PTransform)Create.of(data))).setTypeDescriptor(TypeDescriptors.kvs((TypeDescriptor)textTypeDescriptor, (TypeDescriptor)textTypeDescriptor));
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectMessage(Text.class.getName());
        input.apply("Write", (PTransform)HadoopFormatIO.write().withConfiguration(conf).withPartitioning().withExternalSynchronization((ExternalSynchronization)new HDFSSynchronization(this.getLocksDirPath())));
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testWriteDisplayData() {
        HadoopFormatIO.Write write = HadoopFormatIO.write().withConfiguration(conf).withPartitioning().withExternalSynchronization((ExternalSynchronization)new HDFSSynchronization(this.getLocksDirPath()));
        DisplayData displayData = DisplayData.from((HasDisplayData)write);
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"mapreduce.job.outputformat.class", (String)conf.get("mapreduce.job.outputformat.class")));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"mapreduce.job.output.key.class", (String)conf.get("mapreduce.job.output.key.class")));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"mapreduce.job.output.value.class", (String)conf.get("mapreduce.job.output.value.class")));
        MatcherAssert.assertThat((Object)displayData, (Matcher)DisplayDataMatchers.hasDisplayItem((String)"mapreduce.job.partitioner.class", (String)HadoopFormats.DEFAULT_PARTITIONER_CLASS_ATTR.getName()));
    }

    private String getLocksDirPath() {
        return Paths.get(this.tmpFolder.getRoot().getAbsolutePath(), LOCKS_FOLDER_NAME).toAbsolutePath().toString();
    }
}

