/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class KafkaDlqTest {
    private static final TupleTag<Row> OUTPUTTAG = KafkaReadSchemaTransformProvider.OUTPUT_TAG;
    private static final TupleTag<Row> ERRORTAG = KafkaReadSchemaTransformProvider.ERROR_TAG;
    private static final Schema BEAMSCHEMA = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"name", (Schema.FieldType)Schema.FieldType.STRING)});
    private static final List<Row> ROWS = Arrays.asList(Row.withSchema((Schema)BEAMSCHEMA).withFieldValue("name", (Object)"a").build(), Row.withSchema((Schema)BEAMSCHEMA).withFieldValue("name", (Object)"b").build(), Row.withSchema((Schema)BEAMSCHEMA).withFieldValue("name", (Object)"c").build());
    private static List<byte[]> messages;
    private static List<byte[]> messagesWithError;
    final SerializableFunction<byte[], Row> valueMapper = JsonUtils.getJsonBytesToRowFunction((Schema)BEAMSCHEMA);
    @Rule
    public transient TestPipeline p = TestPipeline.create();

    @Test
    public void testKafkaErrorFnSuccess() throws Exception {
        try {
            messages = Arrays.asList("{\"name\":\"a\"}".getBytes("UTF8"), "{\"name\":\"b\"}".getBytes("UTF8"), "{\"name\":\"c\"}".getBytes("UTF8"));
        }
        catch (Exception exception) {
            // empty catch block
        }
        PCollection input = (PCollection)this.p.apply((PTransform)Create.of(messages));
        Schema errorSchema = ErrorHandling.errorSchemaBytes();
        PCollectionTuple output = (PCollectionTuple)input.apply((PTransform)ParDo.of((DoFn)new KafkaReadSchemaTransformProvider.ErrorFn("Kafka-read-error-counter", this.valueMapper, errorSchema, true)).withOutputTags(OUTPUTTAG, TupleTagList.of(ERRORTAG)));
        output.get(OUTPUTTAG).setRowSchema(BEAMSCHEMA);
        output.get(ERRORTAG).setRowSchema(errorSchema);
        PAssert.that((PCollection)output.get(OUTPUTTAG)).containsInAnyOrder(ROWS);
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testKafkaErrorFnFailure() throws Exception {
        try {
            messagesWithError = Arrays.asList("{\"error\":\"a\"}".getBytes("UTF8"), "{\"error\":\"b\"}".getBytes("UTF8"), "{\"error\":\"c\"}".getBytes("UTF8"));
        }
        catch (Exception exception) {
            // empty catch block
        }
        PCollection input = (PCollection)this.p.apply((PTransform)Create.of(messagesWithError));
        Schema errorSchema = ErrorHandling.errorSchemaBytes();
        PCollectionTuple output = (PCollectionTuple)input.apply((PTransform)ParDo.of((DoFn)new KafkaReadSchemaTransformProvider.ErrorFn("Read-Error-Counter", this.valueMapper, errorSchema, true)).withOutputTags(OUTPUTTAG, TupleTagList.of(ERRORTAG)));
        output.get(OUTPUTTAG).setRowSchema(BEAMSCHEMA);
        output.get(ERRORTAG).setRowSchema(errorSchema);
        PCollection count = (PCollection)output.get(ERRORTAG).apply("error_count", Count.globally());
        PAssert.that((PCollection)count).containsInAnyOrder(Collections.singletonList(3L));
        this.p.run().waitUntilFinish();
    }
}

