/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
import org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class KafkaWriteSchemaTransformProviderTest {
    private static final TupleTag<KV<byte[], byte[]>> OUTPUT_TAG = KafkaWriteSchemaTransformProvider.OUTPUT_TAG;
    private static final TupleTag<Row> ERROR_TAG = KafkaWriteSchemaTransformProvider.ERROR_TAG;
    private static final Schema BEAMSCHEMA = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"name", (Schema.FieldType)Schema.FieldType.STRING)});
    private static final Schema BEAM_RAW_SCHEMA = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"payload", (Schema.FieldType)Schema.FieldType.BYTES)});
    private static final Schema BEAM_PROTO_SCHEMA = Schema.builder().addField("id", Schema.FieldType.INT32).addField("name", Schema.FieldType.STRING).addField("active", Schema.FieldType.BOOLEAN).addField("address", Schema.FieldType.row((Schema)Schema.builder().addField("city", Schema.FieldType.STRING).addField("street", Schema.FieldType.STRING).addField("state", Schema.FieldType.STRING).addField("zip_code", Schema.FieldType.STRING).build())).build();
    private static final List<Row> PROTO_ROWS = Collections.singletonList(Row.withSchema((Schema)BEAM_PROTO_SCHEMA).withFieldValue("id", (Object)1234).withFieldValue("name", (Object)"Doe").withFieldValue("active", (Object)false).withFieldValue("address.city", (Object)"seattle").withFieldValue("address.street", (Object)"fake street").withFieldValue("address.zip_code", (Object)"TO-1234").withFieldValue("address.state", (Object)"wa").build());
    private static final List<Row> ROWS = Arrays.asList(Row.withSchema((Schema)BEAMSCHEMA).withFieldValue("name", (Object)"a").build(), Row.withSchema((Schema)BEAMSCHEMA).withFieldValue("name", (Object)"b").build(), Row.withSchema((Schema)BEAMSCHEMA).withFieldValue("name", (Object)"c").build());
    private static final List<Row> RAW_ROWS;
    final SerializableFunction<Row, byte[]> valueMapper = JsonUtils.getRowToJsonBytesFunction((Schema)BEAMSCHEMA);
    final SerializableFunction<Row, byte[]> valueRawMapper = KafkaWriteSchemaTransformProvider.getRowToRawBytesFunction((String)"payload");
    final SerializableFunction<Row, byte[]> protoValueRawMapper = ProtoByteUtils.getRowToProtoBytes((String)Objects.requireNonNull(this.getClass().getResource("/proto_byte/file_descriptor/proto_byte_utils.pb")).getPath(), (String)"MyMessage");
    @Rule
    public transient TestPipeline p = TestPipeline.create();

    @Test
    public void testKafkaErrorFnSuccess() throws Exception {
        List<KV> msg = Arrays.asList(KV.of((Object)new byte[1], (Object)"{\"name\":\"a\"}".getBytes("UTF8")), KV.of((Object)new byte[1], (Object)"{\"name\":\"b\"}".getBytes("UTF8")), KV.of((Object)new byte[1], (Object)"{\"name\":\"c\"}".getBytes("UTF8")));
        PCollection input = (PCollection)this.p.apply((PTransform)Create.of(ROWS));
        Schema errorSchema = ErrorHandling.errorSchema((Schema)BEAMSCHEMA);
        PCollectionTuple output = (PCollectionTuple)input.apply((PTransform)ParDo.of((DoFn)new KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransform.ErrorCounterFn("Kafka-write-error-counter", this.valueMapper, errorSchema, true)).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
        output.get(ERROR_TAG).setRowSchema(errorSchema);
        PAssert.that((PCollection)output.get(OUTPUT_TAG)).containsInAnyOrder(msg);
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testKafkaErrorFnRawSuccess() throws Exception {
        List<KV> msg = Arrays.asList(KV.of((Object)new byte[1], (Object)"a".getBytes("UTF8")), KV.of((Object)new byte[1], (Object)"b".getBytes("UTF8")), KV.of((Object)new byte[1], (Object)"c".getBytes("UTF8")));
        PCollection input = (PCollection)this.p.apply((PTransform)Create.of(RAW_ROWS));
        Schema errorSchema = ErrorHandling.errorSchema((Schema)BEAM_RAW_SCHEMA);
        PCollectionTuple output = (PCollectionTuple)input.apply((PTransform)ParDo.of((DoFn)new KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransform.ErrorCounterFn("Kafka-write-error-counter", this.valueRawMapper, errorSchema, true)).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
        output.get(ERROR_TAG).setRowSchema(errorSchema);
        PAssert.that((PCollection)output.get(OUTPUT_TAG)).containsInAnyOrder(msg);
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testKafkaErrorFnProtoSuccess() {
        PCollection input = (PCollection)this.p.apply((PTransform)Create.of(PROTO_ROWS));
        Schema errorSchema = ErrorHandling.errorSchema((Schema)BEAM_PROTO_SCHEMA);
        PCollectionTuple output = (PCollectionTuple)input.apply((PTransform)ParDo.of((DoFn)new KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransform.ErrorCounterFn("Kafka-write-error-counter", this.protoValueRawMapper, errorSchema, true)).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
        output.get(ERROR_TAG).setRowSchema(errorSchema);
        this.p.run().waitUntilFinish();
    }

    static {
        try {
            RAW_ROWS = Arrays.asList(Row.withSchema((Schema)BEAM_RAW_SCHEMA).withFieldValue("payload", (Object)"a".getBytes("UTF8")).build(), Row.withSchema((Schema)BEAM_RAW_SCHEMA).withFieldValue("payload", (Object)"b".getBytes("UTF8")).build(), Row.withSchema((Schema)BEAM_RAW_SCHEMA).withFieldValue("payload", (Object)"c".getBytes("UTF8")).build());
        }
        catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
        }
    }
}

