/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.fileschematransform;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fileschematransform.FileReadSchemaTransformConfiguration;
import org.apache.beam.sdk.io.fileschematransform.FileReadSchemaTransformProvider;
import org.apache.beam.sdk.io.fileschematransform.LineReadSchemaTransformFormatProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;

public class LineReadSchemaTransformFormatProviderTest {
    @Rule
    public TestPipeline writePipeline = TestPipeline.create();
    @Rule
    public TestPipeline readPipeline = TestPipeline.create();
    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    @Rule
    public TestName testName = new TestName();

    protected String getFormat() {
        return new LineReadSchemaTransformFormatProvider().identifier();
    }

    protected String getFilePath() {
        return this.getFolder() + "/test";
    }

    protected String getFolder() {
        try {
            return this.tmpFolder.newFolder(new String[]{this.getFormat(), this.testName.getMethodName()}).getAbsolutePath();
        }
        catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    @Test
    public void testReadStrings() {
        ArrayList<String> values = new ArrayList<String>();
        String filePath = this.getFilePath();
        for (int i = 0; i < 10; ++i) {
            values.add("line #" + i);
        }
        ((PCollection)this.writePipeline.apply((PTransform)Create.of(values))).apply((PTransform)TextIO.write().to(filePath));
        this.writePipeline.run().waitUntilFinish();
        FileReadSchemaTransformConfiguration config = FileReadSchemaTransformConfiguration.builder().setFormat(this.getFormat()).setFilepattern(filePath + "*").build();
        SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config);
        PCollectionRowTuple output = (PCollectionRowTuple)PCollectionRowTuple.empty((Pipeline)this.readPipeline).apply((PTransform)readTransform);
        PCollection outputStrings = (PCollection)output.get("output").apply((PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.strings()).via((SerializableFunction & Serializable)row -> row.getString("line")));
        PAssert.that((PCollection)outputStrings).containsInAnyOrder(values);
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testStreamingRead() {
        String folder = this.getFolder();
        FileReadSchemaTransformConfiguration config = FileReadSchemaTransformConfiguration.builder().setFormat(this.getFormat()).setFilepattern(folder + "/test_*").setPollIntervalMillis(Long.valueOf(100L)).setTerminateAfterSecondsSinceNewOutput(Long.valueOf(3L)).build();
        SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config);
        PCollectionRowTuple output = (PCollectionRowTuple)PCollectionRowTuple.empty((Pipeline)this.readPipeline).apply((PTransform)readTransform);
        ((PCollection)((PCollection)((PCollection)this.readPipeline.apply((PTransform)GenerateSequence.from((long)0L).to(3L).withRate(1L, Duration.millis((long)300L)))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.millis((long)100L))).withAllowedLateness(Duration.ZERO).triggering((Trigger)Repeatedly.forever((Trigger)AfterPane.elementCountAtLeast((int)1))).discardingFiredPanes())).apply("Create Key-String pairs", (PTransform)MapElements.via((SimpleFunction)new CreateKeyStringPair()))).setCoder((Coder)KvCoder.of((Coder)VarIntCoder.of(), (Coder)StringUtf8Coder.of())).apply((PTransform)FileIO.writeDynamic().by(KV::getKey).via(Contextful.fn(KV::getValue), (FileIO.Sink)TextIO.sink()).to(folder).withNaming((SerializableFunction & Serializable)integer -> FileIO.Write.defaultNaming((String)("test_" + integer), (String)".txt")).withDestinationCoder((Coder)VarIntCoder.of()).withNumShards(1));
        PCollection outputStrings = (PCollection)output.get("output").apply("Get strings", (PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.strings()).via((SerializableFunction & Serializable)row -> row.getString("line")));
        List expectedStrings = Arrays.asList(0, 1, 2).stream().map(num -> "dynamic destination line #" + num).collect(Collectors.toList());
        PAssert.that((PCollection)outputStrings).containsInAnyOrder(expectedStrings);
        this.readPipeline.run();
    }

    @Test
    public void testReadWithPCollectionOfFilepatterns() {
        String folder = this.getFolder();
        ((PCollection)((PCollection)this.writePipeline.apply((PTransform)Create.of(Arrays.asList(0L, 1L, 2L)))).apply((PTransform)MapElements.via((SimpleFunction)new CreateKeyStringPair()))).setCoder((Coder)KvCoder.of((Coder)VarIntCoder.of(), (Coder)StringUtf8Coder.of())).apply((PTransform)FileIO.writeDynamic().by(KV::getKey).via(Contextful.fn(KV::getValue), (FileIO.Sink)TextIO.sink()).to(folder).withNaming((SerializableFunction & Serializable)integer -> FileIO.Write.defaultNaming((String)("test_" + integer), (String)".txt")).withDestinationCoder((Coder)VarIntCoder.of()).withNumShards(1));
        this.writePipeline.run().waitUntilFinish();
        FileReadSchemaTransformConfiguration config = FileReadSchemaTransformConfiguration.builder().setFormat(this.getFormat()).build();
        SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config);
        Schema patternSchema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"filepattern", (Schema.FieldType)Schema.FieldType.STRING)});
        PCollection filepatterns = ((PCollection)((PCollection)this.readPipeline.apply((PTransform)Create.of(Arrays.asList(folder + "/test_1-*", folder + "/test_2-*", folder + "/test_3-*")))).apply("Create Rows of filepatterns", (PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.rows()).via((SerializableFunction & Serializable)pattern -> Row.withSchema((Schema)patternSchema).withFieldValue("filepattern", pattern).build()))).setRowSchema(patternSchema);
        PCollectionRowTuple output = (PCollectionRowTuple)PCollectionRowTuple.of((String)"input", (PCollection)filepatterns).apply((PTransform)readTransform);
        PCollection outputStrings = (PCollection)output.get("output").apply((PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.strings()).via((SerializableFunction & Serializable)row -> row.getString("line")));
        List expectedStrings = Arrays.asList(0, 1, 2).stream().map(num -> "dynamic destination line #" + num).collect(Collectors.toList());
        PAssert.that((PCollection)outputStrings).containsInAnyOrder(expectedStrings);
        this.readPipeline.run();
    }

    private static class CreateKeyStringPair
    extends SimpleFunction<Long, KV<Integer, String>> {
        private CreateKeyStringPair() {
        }

        public KV<Integer, String> apply(Long l) {
            String line = "dynamic destination line #" + l.intValue();
            return KV.of((Object)(l.intValue() + 1), (Object)line);
        }
    }
}

