package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.ByteString;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.class */
public class PubsubLiteDlqTest {
    private static final TupleTag<Row> OUTPUTTAG = PubsubLiteReadSchemaTransformProvider.OUTPUT_TAG;
    private static final TupleTag<Row> ERRORTAG = PubsubLiteReadSchemaTransformProvider.ERROR_TAG;
    private static final Schema BEAMSCHEMA = Schema.of(new Schema.Field[]{Schema.Field.of("name", Schema.FieldType.STRING)});
    private static final Schema ERRORSCHEMA = PubsubLiteReadSchemaTransformProvider.ERROR_SCHEMA;
    private static final List<Row> ROWS = Arrays.asList(Row.withSchema(BEAMSCHEMA).withFieldValue("name", "a").build(), Row.withSchema(BEAMSCHEMA).withFieldValue("name", "b").build(), Row.withSchema(BEAMSCHEMA).withFieldValue("name", "c").build());
    private static final List<SequencedMessage> MESSAGES = Arrays.asList(SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"name\":\"a\"}")).build()).build(), SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"name\":\"b\"}")).build()).build(), SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"name\":\"c\"}")).build()).build());
    private static final List<SequencedMessage> MESSAGESWITHERROR = Arrays.asList(SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"error\":\"a\"}")).build()).build(), SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"error\":\"b\"}")).build()).build(), SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"error\":\"c\"}")).build()).build());
    final SerializableFunction<byte[], Row> valueMapper = JsonUtils.getJsonBytesToRowFunction(BEAMSCHEMA);

    @Rule
    public transient TestPipeline p = TestPipeline.create();

    @Test
    public void testPubsubLiteErrorFnSuccess() throws Exception {
        PCollectionTuple apply = this.p.apply(Create.of(MESSAGES)).apply(ParDo.of(new PubsubLiteReadSchemaTransformProvider.ErrorFn("Read-Error-Counter", this.valueMapper)).withOutputTags(OUTPUTTAG, TupleTagList.of(ERRORTAG)));
        apply.get(OUTPUTTAG).setRowSchema(BEAMSCHEMA);
        apply.get(ERRORTAG).setRowSchema(ERRORSCHEMA);
        PAssert.that(apply.get(OUTPUTTAG)).containsInAnyOrder(ROWS);
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testPubsubLiteErrorFnFailure() throws Exception {
        PCollectionTuple apply = this.p.apply(Create.of(MESSAGESWITHERROR)).apply(ParDo.of(new PubsubLiteReadSchemaTransformProvider.ErrorFn("Read-Error-Counter", this.valueMapper)).withOutputTags(OUTPUTTAG, TupleTagList.of(ERRORTAG)));
        apply.get(OUTPUTTAG).setRowSchema(BEAMSCHEMA);
        apply.get(ERRORTAG).setRowSchema(ERRORSCHEMA);
        PAssert.that(apply.get(ERRORTAG).apply("error_count", Count.globally())).containsInAnyOrder(Collections.singletonList(3L));
        this.p.run().waitUntilFinish();
    }
}
