/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.hadoop.format;

import java.io.File;
import java.io.Serializable;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.examples.WordCount;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.hadoop.format.ExternalSynchronization;
import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormats;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class HadoopFormatIOSequenceFileTest {
    private static final Instant START_TIME = new Instant(0L);
    private static final String TEST_FOLDER_NAME = "test";
    private static final String LOCKS_FOLDER_NAME = "locks";
    private static final int REDUCERS_COUNT = 2;
    private static final List<String> SENTENCES = Arrays.asList("Hello world this is first streamed event", "Hello again this is sedcond streamed event", "Third time Hello event created", "And last event will was sent now", "Hello from second window", "First event from second window");
    private static final List<String> FIRST_WIN_WORDS = SENTENCES.subList(0, 4);
    private static final List<String> SECOND_WIN_WORDS = SENTENCES.subList(4, 6);
    private static final Duration WINDOW_DURATION = Duration.standardMinutes((long)1L);
    private static final SerializableFunction<KV<String, Long>, KV<Text, LongWritable>> KV_STR_INT_2_TXT_LONGWRITABLE = (SerializableFunction & Serializable)element -> KV.of((Object)new Text((String)element.getKey()), (Object)new LongWritable(((Long)element.getValue()).longValue()));
    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    @Rule
    public TestPipeline pipeline = TestPipeline.create();

    private static Map<String, Long> computeWordCounts(List<String> sentences) {
        return sentences.stream().flatMap(s -> Stream.of(s.split("\\W+"))).map(String::toLowerCase).collect(Collectors.toMap(Function.identity(), s -> 1L, Long::sum));
    }

    @Test
    public void batchTest() {
        String outputDir = this.getOutputDirPath("batchTest");
        Configuration conf = HadoopFormatIOSequenceFileTest.createWriteConf(SequenceFileOutputFormat.class, Text.class, LongWritable.class, outputDir, 2, "0");
        this.executeBatchTest((HadoopFormatIO.Write<Text, LongWritable>)HadoopFormatIO.write().withConfiguration(conf).withPartitioning().withExternalSynchronization((ExternalSynchronization)new HDFSSynchronization(this.getLocksDirPath())), outputDir);
        Assert.assertEquals((String)"In lock folder shouldn't be any file", (long)0L, (long)new File(this.getLocksDirPath()).list().length);
    }

    @Test
    public void batchTestWithoutPartitioner() {
        String outputDir = this.getOutputDirPath("batchTestWithoutPartitioner");
        Configuration conf = HadoopFormatIOSequenceFileTest.createWriteConf(SequenceFileOutputFormat.class, Text.class, LongWritable.class, outputDir, 2, "0");
        this.executeBatchTest((HadoopFormatIO.Write<Text, LongWritable>)HadoopFormatIO.write().withConfiguration(conf).withoutPartitioning().withExternalSynchronization((ExternalSynchronization)new HDFSSynchronization(this.getLocksDirPath())), outputDir);
        Assert.assertEquals((String)"In lock folder shouldn't be any file", (long)0L, (long)new File(this.getLocksDirPath()).list().length);
    }

    private void executeBatchTest(HadoopFormatIO.Write<Text, LongWritable> write, String outputDir) {
        ((PCollection)((PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(SENTENCES))).apply((PTransform)ParDo.of((DoFn)new ConvertToLowerCaseFn()))).apply((PTransform)new WordCount.CountWords())).apply("ConvertToHadoopFormat", (PTransform)ParDo.of(new ConvertToHadoopFormatFn<KV<String, Long>, KV<Text, LongWritable>>(KV_STR_INT_2_TXT_LONGWRITABLE)))).setTypeDescriptor(TypeDescriptors.kvs((TypeDescriptor)new TypeDescriptor<Text>(){}, (TypeDescriptor)new TypeDescriptor<LongWritable>(){})).apply(write);
        this.pipeline.run();
        Map<String, Long> results = this.loadWrittenDataAsMap(outputDir);
        MatcherAssert.assertThat(results.entrySet(), (Matcher)Matchers.equalTo(HadoopFormatIOSequenceFileTest.computeWordCounts(SENTENCES).entrySet()));
    }

    private List<KV<Text, LongWritable>> loadWrittenData(String outputDir) {
        return Arrays.stream(Objects.requireNonNull(new File(outputDir).list())).filter(fileName -> fileName.startsWith("part-r")).map(fileName -> outputDir + File.separator + fileName).flatMap(this::extractResultsFromFile).collect(Collectors.toList());
    }

    private String getOutputDirPath(String testName) {
        return Paths.get(this.tmpFolder.getRoot().getAbsolutePath(), "test/" + testName).toAbsolutePath().toString();
    }

    private String getLocksDirPath() {
        return Paths.get(this.tmpFolder.getRoot().getAbsolutePath(), LOCKS_FOLDER_NAME).toAbsolutePath().toString();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Stream<KV<Text, LongWritable>> extractResultsFromFile(String fileName) {
        try (SequenceFileRecordReader reader = new SequenceFileRecordReader();){
            Path path = new Path(fileName);
            TaskAttemptContext taskContext = HadoopFormats.createTaskAttemptContext((Configuration)new Configuration(), (JobID)new JobID("readJob", 0), (int)0);
            reader.initialize((InputSplit)new FileSplit(path, 0L, Long.MAX_VALUE, new String[]{"localhost"}), taskContext);
            ArrayList<KV> result = new ArrayList<KV>();
            while (reader.nextKeyValue()) {
                result.add(KV.of((Object)new Text(((Text)reader.getCurrentKey()).toString()), (Object)new LongWritable(((LongWritable)reader.getCurrentValue()).get())));
            }
            Stream<KV<Text, LongWritable>> stream = result.stream();
            return stream;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static Configuration createWriteConf(Class<?> outputFormatClass, Class<?> keyClass, Class<?> valueClass, String path, Integer reducersCount, String jobId) {
        return HadoopFormatIOSequenceFileTest.getConfiguration(outputFormatClass, keyClass, valueClass, path, reducersCount, jobId);
    }

    private static Configuration getConfiguration(Class<?> outputFormatClass, Class<?> keyClass, Class<?> valueClass, String path, Integer reducersCount, String jobId) {
        Configuration conf = new Configuration();
        conf.setClass("mapreduce.job.outputformat.class", outputFormatClass, OutputFormat.class);
        conf.setClass("mapreduce.job.output.key.class", keyClass, Object.class);
        conf.setClass("mapreduce.job.output.value.class", valueClass, Object.class);
        conf.setInt("mapreduce.job.reduces", reducersCount.intValue());
        conf.set("mapreduce.output.fileoutputformat.outputdir", path);
        conf.set("mapreduce.job.id", jobId);
        return conf;
    }

    @Test
    public void streamTest() {
        TestStream stringsStream = TestStream.create((Coder)StringUtf8Coder.of()).advanceWatermarkTo(START_TIME).addElements(this.event(FIRST_WIN_WORDS.get(0), 2L), new TimestampedValue[0]).advanceWatermarkTo(START_TIME.plus((ReadableDuration)Duration.standardSeconds((long)27L))).addElements(this.event(FIRST_WIN_WORDS.get(1), 25L), new TimestampedValue[]{this.event(FIRST_WIN_WORDS.get(2), 18L), this.event(FIRST_WIN_WORDS.get(3), 28L)}).advanceWatermarkTo(START_TIME.plus((ReadableDuration)Duration.standardSeconds((long)65L))).addElements(this.event(SECOND_WIN_WORDS.get(0), 61L), new TimestampedValue[]{this.event(SECOND_WIN_WORDS.get(1), 63L)}).advanceWatermarkToInfinity();
        String outputDirPath = this.getOutputDirPath("streamTest");
        PCollection dataToWrite = ((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)this.pipeline.apply((PTransform)stringsStream)).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)WINDOW_DURATION)))).apply((PTransform)ParDo.of((DoFn)new ConvertToLowerCaseFn()))).apply((PTransform)new WordCount.CountWords())).apply("ConvertToHadoopFormat", (PTransform)ParDo.of(new ConvertToHadoopFormatFn<KV<String, Long>, KV<Text, LongWritable>>(KV_STR_INT_2_TXT_LONGWRITABLE)))).setTypeDescriptor(TypeDescriptors.kvs((TypeDescriptor)new TypeDescriptor<Text>(){}, (TypeDescriptor)new TypeDescriptor<LongWritable>(){}));
        ConfigTransform configurationTransformation = new ConfigTransform(outputDirPath, Text.class, LongWritable.class);
        dataToWrite.apply((PTransform)HadoopFormatIO.write().withConfigurationTransform(configurationTransformation).withExternalSynchronization((ExternalSynchronization)new HDFSSynchronization(this.getLocksDirPath())));
        this.pipeline.run();
        Map<String, Long> values = this.loadWrittenDataAsMap(outputDirPath);
        MatcherAssert.assertThat(values.entrySet(), (Matcher)Matchers.equalTo(HadoopFormatIOSequenceFileTest.computeWordCounts(FIRST_WIN_WORDS).entrySet()));
        Assert.assertEquals((String)"In lock folder shouldn't be any file", (long)0L, (long)new File(this.getLocksDirPath()).list().length);
    }

    private Map<String, Long> loadWrittenDataAsMap(String outputDirPath) {
        return this.loadWrittenData(outputDirPath).stream().collect(Collectors.toMap(kv -> ((Text)kv.getKey()).toString(), kv -> ((LongWritable)kv.getValue()).get(), Long::sum));
    }

    private <T> TimestampedValue<T> event(T eventValue, Long timestamp) {
        return TimestampedValue.of(eventValue, (Instant)START_TIME.plus((ReadableDuration)new Duration((Object)timestamp)));
    }

    private static class ConfigTransform<KeyT, ValueT>
    extends PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>> {
        private final String outputDirPath;
        private final Class<?> keyClass;
        private final Class<?> valueClass;
        private int windowNum = 0;

        private ConfigTransform(String outputDirPath, Class<?> keyClass, Class<?> valueClass) {
            this.outputDirPath = outputDirPath;
            this.keyClass = keyClass;
            this.valueClass = valueClass;
        }

        public PCollectionView<Configuration> expand(PCollection<? extends KV<KeyT, ValueT>> input) {
            Configuration conf = HadoopFormatIOSequenceFileTest.createWriteConf(SequenceFileOutputFormat.class, this.keyClass, this.valueClass, this.outputDirPath, 2, String.valueOf(this.windowNum++));
            return (PCollectionView)((PCollection)input.getPipeline().apply((PTransform)Create.of((Object)conf, (Object[])new Configuration[0]))).apply((PTransform)View.asSingleton().withDefaultValue((Object)conf));
        }
    }

    private static class ConvertToLowerCaseFn
    extends DoFn<String, String> {
        private ConvertToLowerCaseFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element String element, DoFn.OutputReceiver<String> receiver) {
            receiver.output((Object)element.toLowerCase());
        }
    }

    private static class ConvertToHadoopFormatFn<InputT, OutputT>
    extends DoFn<InputT, OutputT> {
        private final SerializableFunction<InputT, OutputT> transformFn;

        ConvertToHadoopFormatFn(SerializableFunction<InputT, OutputT> transformFn) {
            this.transformFn = transformFn;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element InputT element, DoFn.OutputReceiver<OutputT> outReceiver) {
            outReceiver.output(this.transformFn.apply(element));
        }
    }
}

