package org.apache.beam.sdk.io.hadoop.format;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOWriteTest.class */
public class HadoopFormatIOWriteTest {
    private static final int REDUCERS_COUNT = 2;
    private static final String LOCKS_FOLDER_NAME = "locks";
    private static Configuration conf;

    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    @Before
    public void setUp() {
        conf = loadTestConfiguration(EmployeeOutputFormat.class, Text.class, Employee.class);
        EmployeeOutputFormat.initWrittenOutput((OutputCommitter) Mockito.mock(OutputCommitter.class));
    }

    private static Configuration loadTestConfiguration(Class<?> cls, Class<?> cls2, Class<?> cls3) {
        Configuration configuration = new Configuration();
        configuration.setClass("mapreduce.job.outputformat.class", cls, OutputFormat.class);
        configuration.setClass("mapreduce.job.output.key.class", cls2, Object.class);
        configuration.setClass("mapreduce.job.output.value.class", cls3, Object.class);
        configuration.setInt("mapreduce.job.reduces", REDUCERS_COUNT);
        configuration.set("mapreduce.job.id", String.valueOf(1));
        return configuration;
    }

    @Test
    public void testWriteObjectCreationFailsIfConfigurationIsNull() {
        this.thrown.expect(NullPointerException.class);
        this.thrown.expectMessage("Hadoop configuration cannot be null");
        HadoopFormatIO.write().withConfiguration((Configuration) null).withPartitioning().withExternalSynchronization(new HDFSSynchronization(getLocksDirPath()));
    }

    @Test
    public void testWriteValidationFailsMissingOutputFormatInConf() {
        Configuration configuration = new Configuration();
        configuration.setClass("mapreduce.job.output.key.class", Text.class, Object.class);
        configuration.setClass("mapreduce.job.output.value.class", Employee.class, Object.class);
        this.p.apply(Create.of(TestEmployeeDataSet.getEmployeeData())).setTypeDescriptor(TypeDescriptors.kvs(new TypeDescriptor<Text>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOWriteTest.1
        }, new TypeDescriptor<Employee>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOWriteTest.2
        })).apply("Write", HadoopFormatIO.write().withConfiguration(configuration).withPartitioning().withExternalSynchronization(new HDFSSynchronization(getLocksDirPath())));
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectMessage("Configuration must contain \"mapreduce.job.outputformat.class\"");
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testWriteValidationFailsMissingKeyClassInConf() {
        Configuration configuration = new Configuration();
        configuration.setClass("mapreduce.job.outputformat.class", TextOutputFormat.class, OutputFormat.class);
        configuration.setClass("mapreduce.job.output.value.class", Employee.class, Object.class);
        runValidationPipeline(configuration);
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectMessage("Configuration must contain \"mapreduce.job.output.key.class\"");
        this.p.run().waitUntilFinish();
    }

    private void runValidationPipeline(Configuration configuration) {
        this.p.apply(Create.of(TestEmployeeDataSet.getEmployeeData())).setTypeDescriptor(TypeDescriptors.kvs(new TypeDescriptor<Text>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOWriteTest.3
        }, new TypeDescriptor<Employee>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOWriteTest.4
        })).apply("Write", HadoopFormatIO.write().withConfiguration(configuration).withPartitioning().withExternalSynchronization(new HDFSSynchronization(getLocksDirPath())));
    }

    @Test
    public void testWriteValidationFailsMissingValueClassInConf() {
        Configuration configuration = new Configuration();
        configuration.setClass("mapreduce.job.outputformat.class", TextOutputFormat.class, OutputFormat.class);
        configuration.setClass("mapreduce.job.output.key.class", Text.class, Object.class);
        runValidationPipeline(configuration);
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectMessage("Configuration must contain \"mapreduce.job.output.value.class\"");
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testWriteValidationFailsMissingJobIDInConf() {
        Configuration configuration = new Configuration();
        configuration.setClass("mapreduce.job.outputformat.class", TextOutputFormat.class, OutputFormat.class);
        configuration.setClass("mapreduce.job.output.key.class", Text.class, Object.class);
        configuration.setClass("mapreduce.job.output.value.class", Employee.class, Object.class);
        configuration.set("mapreduce.output.fileoutputformat.outputdir", this.tmpFolder.getRoot().getAbsolutePath());
        runValidationPipeline(configuration);
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectMessage("Configuration must contain \"mapreduce.job.id\"");
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testWritingData() throws IOException {
        conf.set("mapreduce.output.fileoutputformat.outputdir", this.tmpFolder.getRoot().getAbsolutePath());
        List<KV<Text, Employee>> employeeData = TestEmployeeDataSet.getEmployeeData();
        this.p.apply(Create.of(employeeData)).setTypeDescriptor(TypeDescriptors.kvs(new TypeDescriptor<Text>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOWriteTest.5
        }, new TypeDescriptor<Employee>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOWriteTest.6
        })).apply("Write", HadoopFormatIO.write().withConfiguration(conf).withPartitioning().withExternalSynchronization(new HDFSSynchronization(getLocksDirPath())));
        this.p.run();
        List<KV<Text, Employee>> writtenOutput = EmployeeOutputFormat.getWrittenOutput();
        Assert.assertEquals(employeeData.size(), writtenOutput.size());
        Assert.assertTrue(employeeData.containsAll(writtenOutput));
        Assert.assertTrue(writtenOutput.containsAll(employeeData));
        ((OutputCommitter) Mockito.verify(EmployeeOutputFormat.getOutputCommitter())).commitJob((JobContext) Mockito.any());
        ((OutputCommitter) Mockito.verify(EmployeeOutputFormat.getOutputCommitter(), Mockito.times(REDUCERS_COUNT))).commitTask((TaskAttemptContext) Mockito.any());
    }

    @Test
    public void testWritingDataFailInvalidKeyType() {
        conf.set("mapreduce.output.fileoutputformat.outputdir", this.tmpFolder.getRoot().getAbsolutePath());
        ArrayList arrayList = new ArrayList();
        arrayList.add(KV.of("key", new Employee("name", "address")));
        PCollection typeDescriptor = this.p.apply("CreateData", Create.of(arrayList)).setTypeDescriptor(TypeDescriptors.kvs(new TypeDescriptor<String>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOWriteTest.7
        }, new TypeDescriptor<Employee>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOWriteTest.8
        }));
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectMessage(String.class.getName());
        typeDescriptor.apply("Write", HadoopFormatIO.write().withConfiguration(conf).withPartitioning().withExternalSynchronization(new HDFSSynchronization(getLocksDirPath())));
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testWritingDataFailInvalidValueType() {
        conf.set("mapreduce.output.fileoutputformat.outputdir", this.tmpFolder.getRoot().getAbsolutePath());
        ArrayList arrayList = new ArrayList();
        arrayList.add(KV.of(new Text("key"), new Text("value")));
        TypeDescriptor<Text> typeDescriptor = new TypeDescriptor<Text>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOWriteTest.9
        };
        PCollection typeDescriptor2 = this.p.apply(Create.of(arrayList)).setTypeDescriptor(TypeDescriptors.kvs(typeDescriptor, typeDescriptor));
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectMessage(Text.class.getName());
        typeDescriptor2.apply("Write", HadoopFormatIO.write().withConfiguration(conf).withPartitioning().withExternalSynchronization(new HDFSSynchronization(getLocksDirPath())));
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testWriteDisplayData() {
        DisplayData from = DisplayData.from(HadoopFormatIO.write().withConfiguration(conf).withPartitioning().withExternalSynchronization(new HDFSSynchronization(getLocksDirPath())));
        MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("mapreduce.job.outputformat.class", conf.get("mapreduce.job.outputformat.class")));
        MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("mapreduce.job.output.key.class", conf.get("mapreduce.job.output.key.class")));
        MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("mapreduce.job.output.value.class", conf.get("mapreduce.job.output.value.class")));
        MatcherAssert.assertThat(from, DisplayDataMatchers.hasDisplayItem("mapreduce.job.partitioner.class", HadoopFormats.DEFAULT_PARTITIONER_CLASS_ATTR.getName()));
    }

    private String getLocksDirPath() {
        return Paths.get(this.tmpFolder.getRoot().getAbsolutePath(), LOCKS_FOLDER_NAME).toAbsolutePath().toString();
    }
}
