package org.apache.parquet.hadoop.example;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.parquet.Strings;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.api.DelegatingReadSupport;
import org.apache.parquet.hadoop.api.DelegatingWriteSupport;
import org.apache.parquet.hadoop.api.InitContext;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.ContextUtil;
import org.apache.parquet.schema.MessageTypeParser;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/parquet/hadoop/example/TestInputOutputFormat.class */
public class TestInputOutputFormat {
    private static final Logger LOG = LoggerFactory.getLogger(TestInputOutputFormat.class);
    final Path parquetPath = new Path("target/test/example/TestInputOutputFormat/parquet");
    final Path inputPath = new Path("src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java");
    final Path outputPath = new Path("target/test/example/TestInputOutputFormat/out");
    Job writeJob;
    Job readJob;
    private String writeSchema;
    private String readSchema;
    private String partialSchema;
    private Configuration conf;
    private Class<? extends Mapper<?, ?, ?, ?>> readMapperClass;
    private Class<? extends Mapper<?, ?, ?, ?>> writeMapperClass;

    /* loaded from: input_file:org/apache/parquet/hadoop/example/TestInputOutputFormat$MyReadSupport.class */
    public static final class MyReadSupport extends DelegatingReadSupport<Group> {
        public MyReadSupport() {
            super(new GroupReadSupport());
        }

        public ReadSupport.ReadContext init(InitContext initContext) {
            Set set = (Set) initContext.getKeyValueMetadata().get("my.count");
            Assert.assertTrue("counts: " + set, set.size() > 0);
            return super.init(initContext);
        }
    }

    /* loaded from: input_file:org/apache/parquet/hadoop/example/TestInputOutputFormat$MyWriteSupport.class */
    public static final class MyWriteSupport extends DelegatingWriteSupport<Group> {
        private long count;

        public MyWriteSupport() {
            super(new GroupWriteSupport());
            this.count = 0L;
        }

        public void write(Group group) {
            super.write(group);
            this.count++;
        }

        public WriteSupport.FinalizedWriteContext finalizeWrite() {
            HashMap hashMap = new HashMap();
            hashMap.put("my.count", String.valueOf(this.count));
            return new WriteSupport.FinalizedWriteContext(hashMap);
        }
    }

    /* loaded from: input_file:org/apache/parquet/hadoop/example/TestInputOutputFormat$PartialWriteMapper.class */
    public static class PartialWriteMapper extends Mapper<Void, Group, LongWritable, Text> {
        protected void map(Void r8, Group group, Mapper<Void, Group, LongWritable, Text>.Context context) throws IOException, InterruptedException {
            context.write(new LongWritable(group.getInteger("line", 0)), new Text("dummy"));
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((Void) obj, (Group) obj2, (Mapper<Void, Group, LongWritable, Text>.Context) context);
        }
    }

    /* loaded from: input_file:org/apache/parquet/hadoop/example/TestInputOutputFormat$ReadMapper.class */
    public static class ReadMapper extends Mapper<LongWritable, Text, Void, Group> {
        private SimpleGroupFactory factory;

        protected void setup(Mapper<LongWritable, Text, Void, Group>.Context context) throws IOException, InterruptedException {
            this.factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(ContextUtil.getConfiguration(context)));
        }

        protected void map(LongWritable longWritable, Text text, Mapper<LongWritable, Text, Void, Group>.Context context) throws IOException, InterruptedException {
            context.write((Object) null, this.factory.newGroup().append("line", (int) longWritable.get()).append("content", text.toString()));
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (Text) obj2, (Mapper<LongWritable, Text, Void, Group>.Context) context);
        }
    }

    /* loaded from: input_file:org/apache/parquet/hadoop/example/TestInputOutputFormat$WriteMapper.class */
    public static class WriteMapper extends Mapper<Void, Group, LongWritable, Text> {
        protected void map(Void r9, Group group, Mapper<Void, Group, LongWritable, Text>.Context context) throws IOException, InterruptedException {
            context.write(new LongWritable(group.getInteger("line", 0)), new Text(group.getString("content", 0)));
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((Void) obj, (Group) obj2, (Mapper<Void, Group, LongWritable, Text>.Context) context);
        }
    }

    @Before
    public void setUp() {
        this.conf = new Configuration();
        this.writeSchema = "message example {\nrequired int32 line;\nrequired binary content;\n}";
        this.readSchema = "message example {\nrequired int32 line;\nrequired binary content;\n}";
        this.partialSchema = "message example {\nrequired int32 line;\n}";
        this.readMapperClass = ReadMapper.class;
        this.writeMapperClass = WriteMapper.class;
    }

    private void runMapReduceJob(CompressionCodecName compressionCodecName) throws IOException, ClassNotFoundException, InterruptedException {
        runMapReduceJob(compressionCodecName, Collections.emptyMap());
    }

    private void runMapReduceJob(CompressionCodecName compressionCodecName, Map<String, String> map) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration(this.conf);
        for (Map.Entry<String, String> entry : map.entrySet()) {
            configuration.set(entry.getKey(), entry.getValue());
        }
        FileSystem fileSystem = this.parquetPath.getFileSystem(configuration);
        fileSystem.delete(this.parquetPath, true);
        fileSystem.delete(this.outputPath, true);
        this.writeJob = new Job(configuration, "write");
        TextInputFormat.addInputPath(this.writeJob, this.inputPath);
        this.writeJob.setInputFormatClass(TextInputFormat.class);
        this.writeJob.setNumReduceTasks(0);
        ParquetOutputFormat.setCompression(this.writeJob, compressionCodecName);
        ParquetOutputFormat.setOutputPath(this.writeJob, this.parquetPath);
        this.writeJob.setOutputFormatClass(ParquetOutputFormat.class);
        this.writeJob.setMapperClass(this.readMapperClass);
        ParquetOutputFormat.setWriteSupportClass(this.writeJob, MyWriteSupport.class);
        GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(this.writeSchema), this.writeJob.getConfiguration());
        this.writeJob.submit();
        waitForJob(this.writeJob);
        configuration.set("parquet.read.schema", this.readSchema);
        this.readJob = new Job(configuration, "read");
        this.readJob.setInputFormatClass(ParquetInputFormat.class);
        ParquetInputFormat.setReadSupportClass(this.readJob, MyReadSupport.class);
        ParquetInputFormat.setInputPaths(this.readJob, new Path[]{this.parquetPath});
        this.readJob.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(this.readJob, this.outputPath);
        this.readJob.setMapperClass(this.writeMapperClass);
        this.readJob.setNumReduceTasks(0);
        this.readJob.submit();
        waitForJob(this.readJob);
    }

    private void testReadWrite(CompressionCodecName compressionCodecName) throws IOException, ClassNotFoundException, InterruptedException {
        testReadWrite(compressionCodecName, Collections.emptyMap());
    }

    private void testReadWrite(CompressionCodecName compressionCodecName, Map<String, String> map) throws IOException, ClassNotFoundException, InterruptedException {
        String readLine;
        String readLine2;
        runMapReduceJob(compressionCodecName, map);
        BufferedReader bufferedReader = new BufferedReader(new FileReader(new File(this.inputPath.toString())));
        BufferedReader bufferedReader2 = new BufferedReader(new FileReader(new File(this.outputPath.toString(), "part-m-00000")));
        int i = 0;
        while (true) {
            readLine = bufferedReader.readLine();
            if (readLine == null || (readLine2 = bufferedReader2.readLine()) == null) {
                break;
            }
            i++;
            Assert.assertEquals("line " + i, readLine, readLine2.substring(readLine2.indexOf("\t") + 1));
        }
        Assert.assertNull("line " + i, bufferedReader2.readLine());
        Assert.assertNull("line " + i, readLine);
        bufferedReader.close();
        bufferedReader2.close();
    }

    @Test
    public void testReadWrite() throws IOException, ClassNotFoundException, InterruptedException {
        testReadWrite(CompressionCodecName.GZIP);
        testReadWrite(CompressionCodecName.UNCOMPRESSED);
        testReadWrite(CompressionCodecName.SNAPPY);
    }

    @Test
    public void testReadWriteTaskSideMD() throws IOException, ClassNotFoundException, InterruptedException {
        testReadWrite(CompressionCodecName.UNCOMPRESSED, new HashMap<String, String>() { // from class: org.apache.parquet.hadoop.example.TestInputOutputFormat.1
            {
                put("parquet.task.side.metadata", "true");
            }
        });
    }

    @Test
    public void testReadWriteTaskSideMDAggressiveFilter() throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        ParquetInputFormat.setFilterPredicate(configuration, FilterApi.eq(FilterApi.intColumn("line"), -1000));
        final String str = configuration.get("parquet.private.read.filter.predicate");
        runMapReduceJob(CompressionCodecName.UNCOMPRESSED, new HashMap<String, String>() { // from class: org.apache.parquet.hadoop.example.TestInputOutputFormat.2
            {
                put("parquet.task.side.metadata", "true");
                put("parquet.private.read.filter.predicate", str);
            }
        });
        Assert.assertTrue(Files.readAllLines(new File(this.outputPath.toString(), "part-m-00000").toPath(), StandardCharsets.UTF_8).isEmpty());
    }

    @Test
    public void testReadWriteFilter() throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        ParquetInputFormat.setFilterPredicate(configuration, FilterApi.lt(FilterApi.intColumn("line"), 500));
        final String str = configuration.get("parquet.private.read.filter.predicate");
        runMapReduceJob(CompressionCodecName.UNCOMPRESSED, new HashMap<String, String>() { // from class: org.apache.parquet.hadoop.example.TestInputOutputFormat.3
            {
                put("parquet.task.side.metadata", "true");
                put("parquet.private.read.filter.predicate", str);
            }
        });
        List<String> readAllLines = Files.readAllLines(new File(this.inputPath.toString()).toPath(), StandardCharsets.UTF_8);
        int i = 0;
        Iterator<String> it = readAllLines.iterator();
        while (it.hasNext()) {
            String next = it.next();
            if (i < 500) {
                i += next.length();
            } else {
                it.remove();
            }
        }
        List<String> readAllLines2 = Files.readAllLines(new File(this.outputPath.toString(), "part-m-00000").toPath(), StandardCharsets.UTF_8);
        StringBuilder sb = new StringBuilder();
        Iterator<String> it2 = readAllLines2.iterator();
        while (it2.hasNext()) {
            sb.append(it2.next().split("\t", -1)[1]);
            sb.append("\n");
        }
        sb.deleteCharAt(sb.length() - 1);
        Assert.assertEquals(Strings.join(readAllLines, "\n"), sb.toString());
    }

    @Test
    public void testProjection() throws Exception {
        this.readSchema = this.partialSchema;
        this.writeMapperClass = PartialWriteMapper.class;
        runMapReduceJob(CompressionCodecName.GZIP);
    }

    private static long value(Job job, String str, String str2) throws Exception {
        Method method = Counters.class.getMethod("getGroup", String.class);
        return ((Long) Counter.class.getMethod("getValue", new Class[0]).invoke((Counter) CounterGroup.class.getMethod("findCounter", String.class).invoke((CounterGroup) method.invoke(job.getCounters(), str), str2), new Object[0])).longValue();
    }

    @Test
    public void testReadWriteWithCounter() throws Exception {
        runMapReduceJob(CompressionCodecName.GZIP);
        Assert.assertTrue(value(this.readJob, "parquet", "bytesread") > 0);
        Assert.assertTrue(value(this.readJob, "parquet", "bytestotal") > 0);
        Assert.assertTrue(value(this.readJob, "parquet", "bytesread") == value(this.readJob, "parquet", "bytestotal"));
    }

    @Test
    public void testReadWriteWithoutCounter() throws Exception {
        this.conf.set("parquet.benchmark.time.read", "false");
        this.conf.set("parquet.benchmark.bytes.total", "false");
        this.conf.set("parquet.benchmark.bytes.read", "false");
        runMapReduceJob(CompressionCodecName.GZIP);
        Assert.assertTrue(value(this.readJob, "parquet", "bytesread") == 0);
        Assert.assertTrue(value(this.readJob, "parquet", "bytestotal") == 0);
        Assert.assertTrue(value(this.readJob, "parquet", "timeread") == 0);
    }

    private void waitForJob(Job job) throws InterruptedException, IOException {
        while (!job.isComplete()) {
            LOG.debug("waiting for job {}", job.getJobName());
            Thread.sleep(100L);
        }
        LOG.info("status for job {}: {}", job.getJobName(), job.isSuccessful() ? "SUCCESS" : "FAILURE");
        if (!job.isSuccessful()) {
            throw new RuntimeException("job failed " + job.getJobName());
        }
    }
}
