/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.fileschematransform;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans;
import org.apache.beam.sdk.io.fileschematransform.FileReadSchemaTransformConfiguration;
import org.apache.beam.sdk.io.fileschematransform.FileReadSchemaTransformFormatProviderTest;
import org.apache.beam.sdk.io.fileschematransform.FileReadSchemaTransformProvider;
import org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData;
import org.apache.beam.sdk.io.fileschematransform.JsonReadSchemaTransformFormatProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
import org.junit.Test;

public class JsonReadSchemaTransformFormatProviderTest
extends FileReadSchemaTransformFormatProviderTest {
    @Override
    protected String getFormat() {
        return new JsonReadSchemaTransformFormatProvider().identifier();
    }

    @Override
    public String getStringSchemaFromBeamSchema(Schema beamSchema) {
        return JsonUtils.jsonSchemaStringFromBeamSchema((Schema)beamSchema);
    }

    private static Schema getExpectedSchema(Schema inputRowSchema) {
        Schema.Builder outputSchemaBuilder = Schema.builder();
        for (Schema.Field field : inputRowSchema.getFields()) {
            Schema.FieldType outputType;
            switch (field.getType().getTypeName()) {
                case ROW: {
                    outputType = Schema.FieldType.row((Schema)JsonReadSchemaTransformFormatProviderTest.getExpectedSchema(field.getType().getRowSchema()));
                    break;
                }
                case ARRAY: 
                case ITERABLE: {
                    Schema.FieldType arrayType = JsonReadSchemaTransformFormatProviderTest.getExpectedPrimitiveType(field.getType().getCollectionElementType());
                    outputType = Schema.FieldType.array((Schema.FieldType)arrayType);
                    break;
                }
                default: {
                    outputType = JsonReadSchemaTransformFormatProviderTest.getExpectedPrimitiveType(field.getType());
                }
            }
            outputSchemaBuilder.addField(field.getName(), outputType);
        }
        return outputSchemaBuilder.build();
    }

    private static Schema.FieldType getExpectedPrimitiveType(Schema.FieldType inputType) {
        Schema.FieldType outputType;
        switch (inputType.getTypeName()) {
            case BYTE: 
            case INT32: 
            case INT64: {
                outputType = Schema.FieldType.INT64.withNullable(inputType.getNullable().booleanValue());
                break;
            }
            case DECIMAL: 
            case FLOAT: 
            case DOUBLE: {
                outputType = Schema.FieldType.DOUBLE.withNullable(inputType.getNullable().booleanValue());
                break;
            }
            default: {
                outputType = inputType;
            }
        }
        return outputType;
    }

    public static Row getExpectedRow(Row inputRow) {
        Schema outputSchema = JsonReadSchemaTransformFormatProviderTest.getExpectedSchema(inputRow.getSchema());
        HashMap<String, Object> outputRowFields = new HashMap<String, Object>();
        for (Schema.Field inputField : inputRow.getSchema().getFields()) {
            String fieldName = inputField.getName();
            Object outputValue = JsonReadSchemaTransformFormatProviderTest.getExpectedValue(inputField.getType(), inputRow.getValue(fieldName));
            outputRowFields.put(fieldName, outputValue);
        }
        return Row.withSchema((Schema)outputSchema).withFieldValues(outputRowFields).build();
    }

    private static Object getExpectedValue(Schema.FieldType inputType, Object inputValue) {
        List outputValue;
        Schema.FieldType expectedType = JsonReadSchemaTransformFormatProviderTest.getExpectedPrimitiveType(inputType);
        switch (expectedType.getTypeName()) {
            case INT64: {
                outputValue = inputValue == null ? null : Long.valueOf(Long.parseLong(String.valueOf(inputValue)));
                break;
            }
            case DOUBLE: {
                outputValue = inputValue == null ? null : Double.valueOf(Double.parseDouble(String.valueOf(inputValue)));
                break;
            }
            case ROW: {
                outputValue = JsonReadSchemaTransformFormatProviderTest.getExpectedRow((Row)inputValue);
                break;
            }
            case ARRAY: {
                outputValue = ((List)inputValue).stream().map(val -> JsonReadSchemaTransformFormatProviderTest.getExpectedValue(inputType.getCollectionElementType(), val)).collect(Collectors.toList());
                break;
            }
            default: {
                outputValue = inputValue;
            }
        }
        return outputValue;
    }

    @Override
    public void runWriteAndReadTest(Schema schema, List<Row> rows, String filePath, String schemaFilePath) {
        String jsonStringSchema = Strings.isNullOrEmpty((String)schemaFilePath) ? JsonUtils.jsonSchemaStringFromBeamSchema((Schema)schema) : schemaFilePath;
        PayloadSerializer payloadSerializer = new JsonPayloadSerializerProvider().getSerializer(schema, (Map)ImmutableMap.of());
        ((PCollection)((PCollection)this.writePipeline.apply((PTransform)Create.of(rows).withRowSchema(schema))).apply((PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.strings()).via((SerializableFunction & Serializable)row -> new String(payloadSerializer.serialize(row), StandardCharsets.UTF_8)))).setCoder((Coder)StringUtf8Coder.of()).apply((PTransform)TextIO.write().to(filePath));
        this.writePipeline.run().waitUntilFinish();
        FileReadSchemaTransformConfiguration config = FileReadSchemaTransformConfiguration.builder().setFormat(this.getFormat()).setSchema(jsonStringSchema).setFilepattern(filePath + "*").build();
        SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config);
        PCollectionRowTuple output = (PCollectionRowTuple)PCollectionRowTuple.empty((Pipeline)this.readPipeline).apply(readTransform.buildTransform());
        List expectedRows = rows.stream().map(row -> JsonReadSchemaTransformFormatProviderTest.getExpectedRow(row)).collect(Collectors.toList());
        PAssert.that((PCollection)output.get("output")).containsInAnyOrder(expectedRows);
        this.readPipeline.run();
    }

    @Test
    public void testStreamingRead() {
        Schema schema = SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
        List<Row> rows = FileWriteSchemaTransformFormatProviderTestData.DATA.allPrimitiveDataTypesRows;
        String folder = this.getFolder();
        String jsonStringSchema = JsonUtils.jsonSchemaStringFromBeamSchema((Schema)schema);
        FileReadSchemaTransformConfiguration config = FileReadSchemaTransformConfiguration.builder().setFormat(this.getFormat()).setFilepattern(folder + "/test_*").setSchema(jsonStringSchema).setPollIntervalMillis(Long.valueOf(100L)).setTerminateAfterSecondsSinceNewOutput(Long.valueOf(3L)).build();
        SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config);
        PCollectionRowTuple output = (PCollectionRowTuple)PCollectionRowTuple.empty((Pipeline)this.readPipeline).apply(readTransform.buildTransform());
        PayloadSerializer payloadSerializer = new JsonPayloadSerializerProvider().getSerializer(schema, (Map)ImmutableMap.of());
        ((PCollection)((PCollection)((PCollection)this.readPipeline.apply((PTransform)GenerateSequence.from((long)0L).to(3L).withRate(1L, Duration.millis((long)300L)))).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.millis((long)100L))).withAllowedLateness(Duration.ZERO).triggering((Trigger)Repeatedly.forever((Trigger)AfterPane.elementCountAtLeast((int)1))).discardingFiredPanes())).apply((PTransform)MapElements.via((SimpleFunction)new CreateKVJsonString(schema, payloadSerializer)))).setCoder((Coder)KvCoder.of((Coder)VarIntCoder.of(), (Coder)StringUtf8Coder.of())).apply((PTransform)FileIO.writeDynamic().by(KV::getKey).via(Contextful.fn(KV::getValue), (FileIO.Sink)TextIO.sink()).to(folder).withNaming((SerializableFunction & Serializable)integer -> FileIO.Write.defaultNaming((String)("test_" + integer), (String)".json")).withDestinationCoder((Coder)VarIntCoder.of()).withNumShards(1));
        List expectedRows = rows.stream().map(row -> JsonReadSchemaTransformFormatProviderTest.getExpectedRow(row)).collect(Collectors.toList());
        PAssert.that((PCollection)output.get("output")).containsInAnyOrder(expectedRows);
        this.readPipeline.run();
    }

    @Test
    public void testReadWithPCollectionOfFilepatterns() {
        Schema schema = SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
        List<Row> rows = FileWriteSchemaTransformFormatProviderTestData.DATA.allPrimitiveDataTypesRows;
        String folder = this.getFolder();
        PayloadSerializer payloadSerializer = new JsonPayloadSerializerProvider().getSerializer(schema, (Map)ImmutableMap.of());
        ((PCollection)((PCollection)this.writePipeline.apply((PTransform)Create.of(Arrays.asList(0L, 1L, 2L)))).apply((PTransform)MapElements.via((SimpleFunction)new CreateKVJsonString(schema, payloadSerializer)))).setCoder((Coder)KvCoder.of((Coder)VarIntCoder.of(), (Coder)StringUtf8Coder.of())).apply((PTransform)FileIO.writeDynamic().by(KV::getKey).via(Contextful.fn(KV::getValue), (FileIO.Sink)TextIO.sink()).to(folder).withNaming((SerializableFunction & Serializable)integer -> FileIO.Write.defaultNaming((String)("test_" + integer), (String)".json")).withDestinationCoder((Coder)VarIntCoder.of()).withNumShards(1));
        this.writePipeline.run().waitUntilFinish();
        String jsonStringSchema = JsonUtils.jsonSchemaStringFromBeamSchema((Schema)schema);
        FileReadSchemaTransformConfiguration config = FileReadSchemaTransformConfiguration.builder().setFormat(this.getFormat()).setSchema(jsonStringSchema).build();
        SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config);
        Schema patternSchema = this.getFilepatternSchema();
        PCollection filepatterns = ((PCollection)((PCollection)this.readPipeline.apply((PTransform)Create.of(Arrays.asList(folder + "/test_1-*", folder + "/test_2-*", folder + "/test_3-*")))).apply("Create Rows of filepatterns", (PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.rows()).via((SerializableFunction & Serializable)pattern -> Row.withSchema((Schema)patternSchema).withFieldValue("filepattern", pattern).build()))).setRowSchema(patternSchema);
        PCollectionRowTuple output = (PCollectionRowTuple)PCollectionRowTuple.of((String)"input", (PCollection)filepatterns).apply(readTransform.buildTransform());
        List expectedRows = rows.stream().map(row -> JsonReadSchemaTransformFormatProviderTest.getExpectedRow(row)).collect(Collectors.toList());
        PAssert.that((PCollection)output.get("output")).containsInAnyOrder(expectedRows);
        this.readPipeline.run();
    }

    private static class CreateKVJsonString
    extends SimpleFunction<Long, KV<Integer, String>> {
        Schema schema;
        PayloadSerializer payloadSerializer;

        CreateKVJsonString(Schema schema, PayloadSerializer payloadSerializer) {
            this.schema = schema;
            this.payloadSerializer = payloadSerializer;
        }

        public KV<Integer, String> apply(Long l) {
            Row row = FileWriteSchemaTransformFormatProviderTestData.DATA.allPrimitiveDataTypesRows.get(l.intValue());
            String jsonString = new String(this.payloadSerializer.serialize(row), StandardCharsets.UTF_8);
            return KV.of((Object)(l.intValue() + 1), (Object)jsonString);
        }
    }
}

