package org.apache.beam.sdk.io.hadoop.format;

import java.io.File;
import java.lang.invoke.SerializedLambda;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.class */
public class HadoopFormatIOSequenceFileTest {
    private static final String TEST_FOLDER_NAME = "test";
    private static final String LOCKS_FOLDER_NAME = "locks";
    private static final int REDUCERS_COUNT = 2;
    private static final String TOKENIZER_PATTERN = "[^\\p{L}]+";

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    @Rule
    public TestPipeline pipeline = TestPipeline.create();
    private static final Instant START_TIME = new Instant(0);
    private static final List<String> SENTENCES = Arrays.asList("Hello world this is first streamed event", "Hello again this is sedcond streamed event", "Third time Hello event created", "And last event will was sent now", "Hello from second window", "First event from second window");
    private static final List<String> FIRST_WIN_WORDS = SENTENCES.subList(0, 4);
    private static final List<String> SECOND_WIN_WORDS = SENTENCES.subList(4, 6);
    private static final Duration WINDOW_DURATION = Duration.standardMinutes(1);
    private static final SerializableFunction<KV<String, Long>, KV<Text, LongWritable>> KV_STR_INT_2_TXT_LONGWRITABLE = kv -> {
        return KV.of(new Text((String) kv.getKey()), new LongWritable(((Long) kv.getValue()).longValue()));
    };

    /* loaded from: input_file:org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest$ConfigTransform.class */
    private static class ConfigTransform<KeyT, ValueT> extends PTransform<PCollection<? extends KV<KeyT, ValueT>>, PCollectionView<Configuration>> {
        private final String outputDirPath;
        private final Class<?> keyClass;
        private final Class<?> valueClass;
        private int windowNum;

        private ConfigTransform(String str, Class<?> cls, Class<?> cls2) {
            this.windowNum = 0;
            this.outputDirPath = str;
            this.keyClass = cls;
            this.valueClass = cls2;
        }

        public PCollectionView<Configuration> expand(PCollection<? extends KV<KeyT, ValueT>> pCollection) {
            Class<?> cls = this.keyClass;
            Class<?> cls2 = this.valueClass;
            String str = this.outputDirPath;
            Integer valueOf = Integer.valueOf(HadoopFormatIOSequenceFileTest.REDUCERS_COUNT);
            int i = this.windowNum;
            this.windowNum = i + 1;
            Configuration createWriteConf = HadoopFormatIOSequenceFileTest.createWriteConf(SequenceFileOutputFormat.class, cls, cls2, str, valueOf, String.valueOf(i));
            return pCollection.getPipeline().apply(Create.of(createWriteConf, new Configuration[0])).apply(View.asSingleton().withDefaultValue(createWriteConf));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest$ConvertToHadoopFormatFn.class */
    public static class ConvertToHadoopFormatFn<InputT, OutputT> extends DoFn<InputT, OutputT> {
        private final SerializableFunction<InputT, OutputT> transformFn;

        ConvertToHadoopFormatFn(SerializableFunction<InputT, OutputT> serializableFunction) {
            this.transformFn = serializableFunction;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element InputT inputt, DoFn.OutputReceiver<OutputT> outputReceiver) {
            outputReceiver.output(this.transformFn.apply(inputt));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest$ConvertToLowerCaseFn.class */
    public static class ConvertToLowerCaseFn extends DoFn<String, String> {
        private ConvertToLowerCaseFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element String str, DoFn.OutputReceiver<String> outputReceiver) {
            outputReceiver.output(str.toLowerCase());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest$CountWords.class */
    public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
        private CountWords() {
        }

        public PCollection<KV<String, Long>> expand(PCollection<String> pCollection) {
            return pCollection.apply(ParDo.of(new ExtractWordsFn())).apply(Count.perElement());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest$ExtractWordsFn.class */
    public static class ExtractWordsFn extends DoFn<String, String> {
        private final Counter emptyLines;
        private final Distribution lineLenDist;

        private ExtractWordsFn() {
            this.emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
            this.lineLenDist = Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element String str, DoFn.OutputReceiver<String> outputReceiver) {
            this.lineLenDist.update(str.length());
            if (str.trim().isEmpty()) {
                this.emptyLines.inc();
            }
            for (String str2 : str.split(HadoopFormatIOSequenceFileTest.TOKENIZER_PATTERN, -1)) {
                if (!str2.isEmpty()) {
                    outputReceiver.output(str2);
                }
            }
        }
    }

    private static Map<String, Long> computeWordCounts(List<String> list) {
        return (Map) list.stream().flatMap(str -> {
            return Stream.of((Object[]) str.split("\\W+"));
        }).map((v0) -> {
            return v0.toLowerCase();
        }).collect(Collectors.toMap(Function.identity(), str2 -> {
            return 1L;
        }, (v0, v1) -> {
            return Long.sum(v0, v1);
        }));
    }

    @Test
    public void batchTest() {
        String outputDirPath = getOutputDirPath("batchTest");
        executeBatchTest(HadoopFormatIO.write().withConfiguration(createWriteConf(SequenceFileOutputFormat.class, Text.class, LongWritable.class, outputDirPath, Integer.valueOf(REDUCERS_COUNT), "0")).withPartitioning().withExternalSynchronization(new HDFSSynchronization(getLocksDirPath())), outputDirPath);
        Assert.assertEquals("In lock folder shouldn't be any file", 0L, new File(getLocksDirPath()).list().length);
    }

    @Test
    public void batchTestWithoutPartitioner() {
        String outputDirPath = getOutputDirPath("batchTestWithoutPartitioner");
        executeBatchTest(HadoopFormatIO.write().withConfiguration(createWriteConf(SequenceFileOutputFormat.class, Text.class, LongWritable.class, outputDirPath, Integer.valueOf(REDUCERS_COUNT), "0")).withoutPartitioning().withExternalSynchronization(new HDFSSynchronization(getLocksDirPath())), outputDirPath);
        Assert.assertEquals("In lock folder shouldn't be any file", 0L, new File(getLocksDirPath()).list().length);
    }

    private void executeBatchTest(HadoopFormatIO.Write<Text, LongWritable> write, String str) {
        this.pipeline.apply(Create.of(SENTENCES)).apply(ParDo.of(new ConvertToLowerCaseFn())).apply(new CountWords()).apply("ConvertToHadoopFormat", ParDo.of(new ConvertToHadoopFormatFn(KV_STR_INT_2_TXT_LONGWRITABLE))).setTypeDescriptor(TypeDescriptors.kvs(new TypeDescriptor<Text>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOSequenceFileTest.1
        }, new TypeDescriptor<LongWritable>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOSequenceFileTest.2
        })).apply(write);
        this.pipeline.run();
        MatcherAssert.assertThat(loadWrittenDataAsMap(str).entrySet(), Matchers.equalTo(computeWordCounts(SENTENCES).entrySet()));
    }

    private List<KV<Text, LongWritable>> loadWrittenData(String str) {
        return (List) Arrays.stream((String[]) Objects.requireNonNull(new File(str).list())).filter(str2 -> {
            return str2.startsWith("part-r");
        }).map(str3 -> {
            return str + File.separator + str3;
        }).flatMap(this::extractResultsFromFile).collect(Collectors.toList());
    }

    private String getOutputDirPath(String str) {
        return Paths.get(this.tmpFolder.getRoot().getAbsolutePath(), "test/" + str).toAbsolutePath().toString();
    }

    private String getLocksDirPath() {
        return Paths.get(this.tmpFolder.getRoot().getAbsolutePath(), LOCKS_FOLDER_NAME).toAbsolutePath().toString();
    }

    private Stream<KV<Text, LongWritable>> extractResultsFromFile(String str) {
        try {
            SequenceFileRecordReader sequenceFileRecordReader = new SequenceFileRecordReader();
            Throwable th = null;
            try {
                try {
                    sequenceFileRecordReader.initialize(new FileSplit(new Path(str), 0L, Long.MAX_VALUE, new String[]{"localhost"}), HadoopFormats.createTaskAttemptContext(new Configuration(), new JobID("readJob", 0), 0));
                    ArrayList arrayList = new ArrayList();
                    while (sequenceFileRecordReader.nextKeyValue()) {
                        arrayList.add(KV.of(new Text(((Text) sequenceFileRecordReader.getCurrentKey()).toString()), new LongWritable(((LongWritable) sequenceFileRecordReader.getCurrentValue()).get())));
                    }
                    Stream<KV<Text, LongWritable>> stream = arrayList.stream();
                    if (0 != 0) {
                        try {
                            sequenceFileRecordReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        sequenceFileRecordReader.close();
                    }
                    return stream;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Configuration createWriteConf(Class<?> cls, Class<?> cls2, Class<?> cls3, String str, Integer num, String str2) {
        return getConfiguration(cls, cls2, cls3, str, num, str2);
    }

    private static Configuration getConfiguration(Class<?> cls, Class<?> cls2, Class<?> cls3, String str, Integer num, String str2) {
        Configuration configuration = new Configuration();
        configuration.setClass("mapreduce.job.outputformat.class", cls, OutputFormat.class);
        configuration.setClass("mapreduce.job.output.key.class", cls2, Object.class);
        configuration.setClass("mapreduce.job.output.value.class", cls3, Object.class);
        configuration.setInt("mapreduce.job.reduces", num.intValue());
        configuration.set("mapreduce.output.fileoutputformat.outputdir", str);
        configuration.set("mapreduce.job.id", str2);
        return configuration;
    }

    @Test
    public void streamTest() {
        TestStream advanceWatermarkToInfinity = TestStream.create(StringUtf8Coder.of()).advanceWatermarkTo(START_TIME).addElements(event(FIRST_WIN_WORDS.get(0), 2L), new TimestampedValue[0]).advanceWatermarkTo(START_TIME.plus(Duration.standardSeconds(27L))).addElements(event(FIRST_WIN_WORDS.get(1), 25L), new TimestampedValue[]{event(FIRST_WIN_WORDS.get(REDUCERS_COUNT), 18L), event(FIRST_WIN_WORDS.get(3), 28L)}).advanceWatermarkTo(START_TIME.plus(Duration.standardSeconds(65L))).addElements(event(SECOND_WIN_WORDS.get(0), 61L), new TimestampedValue[]{event(SECOND_WIN_WORDS.get(1), 63L)}).advanceWatermarkToInfinity();
        String outputDirPath = getOutputDirPath("streamTest");
        this.pipeline.apply(advanceWatermarkToInfinity).apply(Window.into(FixedWindows.of(WINDOW_DURATION))).apply(ParDo.of(new ConvertToLowerCaseFn())).apply(new CountWords()).apply("ConvertToHadoopFormat", ParDo.of(new ConvertToHadoopFormatFn(KV_STR_INT_2_TXT_LONGWRITABLE))).setTypeDescriptor(TypeDescriptors.kvs(new TypeDescriptor<Text>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOSequenceFileTest.3
        }, new TypeDescriptor<LongWritable>() { // from class: org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOSequenceFileTest.4
        })).apply(HadoopFormatIO.write().withConfigurationTransform(new ConfigTransform(outputDirPath, Text.class, LongWritable.class)).withExternalSynchronization(new HDFSSynchronization(getLocksDirPath())));
        this.pipeline.run();
        MatcherAssert.assertThat(loadWrittenDataAsMap(outputDirPath).entrySet(), Matchers.equalTo(computeWordCounts(FIRST_WIN_WORDS).entrySet()));
        Assert.assertEquals("In lock folder shouldn't be any file", 0L, new File(getLocksDirPath()).list().length);
    }

    private Map<String, Long> loadWrittenDataAsMap(String str) {
        return (Map) loadWrittenData(str).stream().collect(Collectors.toMap(kv -> {
            return ((Text) kv.getKey()).toString();
        }, kv2 -> {
            return Long.valueOf(((LongWritable) kv2.getValue()).get());
        }, (v0, v1) -> {
            return Long.sum(v0, v1);
        }));
    }

    private <T> TimestampedValue<T> event(T t, Long l) {
        return TimestampedValue.of(t, START_TIME.plus(new Duration(l)));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1484460167:
                if (implMethodName.equals("lambda$static$40420997$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/KV;)Lorg/apache/beam/sdk/values/KV;")) {
                    return kv -> {
                        return KV.of(new Text((String) kv.getKey()), new LongWritable(((Long) kv.getValue()).longValue()));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
