package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.cloud.pubsublite.proto.AttributeValues;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.ByteString;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider;
import org.apache.beam.sdk.io.gcp.pubsublite.UuidDeduplicationOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteDlqTest.class */
public class PubsubLiteDlqTest {
    private static final TupleTag<Row> OUTPUT_TAG = PubsubLiteReadSchemaTransformProvider.OUTPUT_TAG;
    private static final TupleTag<Row> ERROR_TAG = PubsubLiteReadSchemaTransformProvider.ERROR_TAG;
    private static final Schema BEAM_RAW_SCHEMA = Schema.builder().addField("payload", Schema.FieldType.BYTES).build();
    private static final Schema BEAM_SCHEMA = Schema.of(new Schema.Field[]{Schema.Field.of("name", Schema.FieldType.STRING)});
    private static final Schema BEAM_SCHEMA_ATTRIBUTES = Schema.of(new Schema.Field[]{Schema.Field.of("name", Schema.FieldType.STRING), Schema.Field.of("key1", Schema.FieldType.STRING), Schema.Field.of("key2", Schema.FieldType.STRING)});
    private static final Schema BEAM_SCHEMA_ATTRIBUTES_AND_MAP = Schema.of(new Schema.Field[]{Schema.Field.of("name", Schema.FieldType.STRING), Schema.Field.of("key1", Schema.FieldType.STRING), Schema.Field.of("key2", Schema.FieldType.STRING), Schema.Field.of("attrs", Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.STRING))});
    private static final Schema BEAM_SCHEMA_ATTRIBUTES_MAP = Schema.of(new Schema.Field[]{Schema.Field.of("name", Schema.FieldType.STRING), Schema.Field.of("attrs", Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.STRING))});
    private static final Map<String, String> STATIC_MAP;
    private static final List<Row> RAW_ROWS;
    private static final List<Row> ROWS_WITH_ATTRIBUTES;
    private static final List<Row> ROWS_WITH_ATTRIBUTES_MAP;
    private static final List<Row> ROWS_WITH_ATTRIBUTES_AND_MAP;
    private static final List<Row> ROWS;
    private static final Map<String, AttributeValues> ATTRIBUTE_VALUES_MAP;
    private static final List<SequencedMessage> MESSAGES;
    private static final List<SequencedMessage> RAW_MESSAGES;
    private static final List<SequencedMessage> MESSAGESWITHERROR;
    private static final String PROTO_STRING_SCHEMA = "syntax = \"proto3\";\npackage com.test.proto;\nmessage MyMessage {\n  int32 id = 1;\n  string name = 2;\n  bool active = 3;\n\n  // Nested field\n  message Address {\n    string street = 1;\n    string city = 2;\n    string state = 3;\n    string zip_code = 4;\n  }\n\n  Address address = 4;\n}";
    private static final Schema BEAM_PROTO_SCHEMA;
    private static final Row INPUT_ROW;
    private static final SerializableFunction<Row, byte[]> INPUT_MAPPER;
    private static final byte[] INPUT_SOURCE;
    private static final List<SequencedMessage> INPUT_MESSAGES;
    final SerializableFunction<byte[], Row> valueMapper = JsonUtils.getJsonBytesToRowFunction(BEAM_SCHEMA);

    @Rule
    public transient TestPipeline p = TestPipeline.create();

    @Test
    public void testPubsubLiteErrorFnSuccess() {
        Schema errorSchemaBytes = ErrorHandling.errorSchemaBytes();
        PCollectionTuple apply = this.p.apply(Create.of(MESSAGES)).apply(ParDo.of(new PubsubLiteReadSchemaTransformProvider.ErrorFn("Read-Error-Counter", this.valueMapper, errorSchemaBytes, Boolean.TRUE.booleanValue())).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
        apply.get(OUTPUT_TAG).setRowSchema(BEAM_SCHEMA);
        apply.get(ERROR_TAG).setRowSchema(errorSchemaBytes);
        PAssert.that(apply.get(OUTPUT_TAG)).containsInAnyOrder(ROWS);
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testPubsubLiteErrorFnFailure() {
        Schema errorSchemaBytes = ErrorHandling.errorSchemaBytes();
        PCollectionTuple apply = this.p.apply(Create.of(MESSAGESWITHERROR)).apply(ParDo.of(new PubsubLiteReadSchemaTransformProvider.ErrorFn("Read-Error-Counter", this.valueMapper, errorSchemaBytes, Boolean.TRUE.booleanValue())).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
        apply.get(OUTPUT_TAG).setRowSchema(BEAM_SCHEMA);
        apply.get(ERROR_TAG).setRowSchema(errorSchemaBytes);
        PAssert.that(apply.get(ERROR_TAG).apply("error_count", Count.globally())).containsInAnyOrder(Collections.singletonList(3L));
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testPubsubLiteErrorFnRawSuccess() {
        Schema errorSchemaBytes = ErrorHandling.errorSchemaBytes();
        Schema buildSchemaWithAttributes = PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(BEAM_RAW_SCHEMA, new ArrayList(), "");
        PCollectionTuple apply = this.p.apply(Create.of(RAW_MESSAGES)).apply(ParDo.of(new PubsubLiteReadSchemaTransformProvider.ErrorFn("Read-Error-Counter", PubsubLiteReadSchemaTransformProvider.getRawBytesToRowFunction(BEAM_RAW_SCHEMA), errorSchemaBytes, Boolean.TRUE.booleanValue())).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
        apply.get(OUTPUT_TAG).setRowSchema(buildSchemaWithAttributes);
        apply.get(ERROR_TAG).setRowSchema(errorSchemaBytes);
        PAssert.that(apply.get(OUTPUT_TAG)).containsInAnyOrder(RAW_ROWS);
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testPubsubLiteErrorFnWithAttributesSuccess() {
        Schema errorSchemaBytes = ErrorHandling.errorSchemaBytes();
        ArrayList arrayList = new ArrayList();
        arrayList.add("key1");
        arrayList.add("key2");
        Schema buildSchemaWithAttributes = PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(BEAM_SCHEMA, arrayList, "");
        PCollectionTuple apply = this.p.apply(Create.of(MESSAGES)).apply(ParDo.of(new PubsubLiteReadSchemaTransformProvider.ErrorFn("Read-Error-Counter", this.valueMapper, errorSchemaBytes, arrayList, "", buildSchemaWithAttributes, Boolean.TRUE.booleanValue())).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
        apply.get(OUTPUT_TAG).setRowSchema(buildSchemaWithAttributes);
        apply.get(ERROR_TAG).setRowSchema(errorSchemaBytes);
        PAssert.that(apply.get(OUTPUT_TAG)).containsInAnyOrder(ROWS_WITH_ATTRIBUTES);
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testPubsubLiteErrorFnWithAttributeMapSuccess() {
        Schema errorSchemaBytes = ErrorHandling.errorSchemaBytes();
        ArrayList arrayList = new ArrayList();
        Schema buildSchemaWithAttributes = PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(BEAM_SCHEMA, arrayList, "attrs");
        PCollectionTuple apply = this.p.apply(Create.of(MESSAGES)).apply(ParDo.of(new PubsubLiteReadSchemaTransformProvider.ErrorFn("Read-Error-Counter", this.valueMapper, errorSchemaBytes, arrayList, "attrs", buildSchemaWithAttributes, Boolean.TRUE.booleanValue())).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
        apply.get(OUTPUT_TAG).setRowSchema(buildSchemaWithAttributes);
        apply.get(ERROR_TAG).setRowSchema(errorSchemaBytes);
        apply.get(OUTPUT_TAG).setRowSchema(buildSchemaWithAttributes);
        PAssert.that(apply.get(OUTPUT_TAG)).containsInAnyOrder(ROWS_WITH_ATTRIBUTES_MAP);
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testPubsubLiteErrorFnWithAttributesAndAttributeMapSuccess() {
        Schema errorSchemaBytes = ErrorHandling.errorSchemaBytes();
        ArrayList arrayList = new ArrayList();
        arrayList.add("key1");
        arrayList.add("key2");
        Schema buildSchemaWithAttributes = PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(BEAM_SCHEMA, arrayList, "attrs");
        PCollectionTuple apply = this.p.apply(Create.of(MESSAGES)).apply(ParDo.of(new PubsubLiteReadSchemaTransformProvider.ErrorFn("Read-Error-Counter", this.valueMapper, errorSchemaBytes, arrayList, "attrs", buildSchemaWithAttributes, Boolean.TRUE.booleanValue())).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
        apply.get(OUTPUT_TAG).setRowSchema(buildSchemaWithAttributes);
        apply.get(ERROR_TAG).setRowSchema(errorSchemaBytes);
        apply.get(OUTPUT_TAG).setRowSchema(buildSchemaWithAttributes);
        PAssert.that(apply.get(OUTPUT_TAG)).containsInAnyOrder(ROWS_WITH_ATTRIBUTES_AND_MAP);
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testPubsubLiteErrorFnWithAttributesFailure() {
        Schema errorSchemaBytes = ErrorHandling.errorSchemaBytes();
        ArrayList arrayList = new ArrayList();
        arrayList.add("randomKey1");
        arrayList.add("randomKey2");
        Schema buildSchemaWithAttributes = PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(BEAM_SCHEMA, arrayList, "");
        PCollectionTuple apply = this.p.apply(Create.of(MESSAGES)).apply(ParDo.of(new PubsubLiteReadSchemaTransformProvider.ErrorFn("Read-Error-Counter", this.valueMapper, errorSchemaBytes, arrayList, "", buildSchemaWithAttributes, Boolean.TRUE.booleanValue())).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
        apply.get(OUTPUT_TAG).setRowSchema(buildSchemaWithAttributes);
        apply.get(ERROR_TAG).setRowSchema(errorSchemaBytes);
        PAssert.that(apply.get(ERROR_TAG).apply("error_count", Count.globally())).containsInAnyOrder(Collections.singletonList(3L));
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testPubsubLiteErrorFnWithDedupingSuccess() {
        Schema errorSchemaBytes = ErrorHandling.errorSchemaBytes();
        PCollectionTuple apply = this.p.apply(Create.of(MESSAGES)).apply(PubsubLiteIO.deduplicate(UuidDeduplicationOptions.newBuilder().setUuidExtractor(PubsubLiteReadSchemaTransformProvider.getUuidFromMessage("key1")).build())).apply(ParDo.of(new PubsubLiteReadSchemaTransformProvider.ErrorFn("Read-Error-Counter", this.valueMapper, errorSchemaBytes, Boolean.TRUE.booleanValue())).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
        apply.get(OUTPUT_TAG).setRowSchema(BEAM_SCHEMA);
        apply.get(ERROR_TAG).setRowSchema(errorSchemaBytes);
        PAssert.that(apply.get(OUTPUT_TAG).apply("error_count", Count.globally())).containsInAnyOrder(Collections.singletonList(1L));
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testPubSubLiteErrorFnReadProto() {
        Schema errorSchemaBytes = ErrorHandling.errorSchemaBytes();
        Schema buildSchemaWithAttributes = PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(BEAM_PROTO_SCHEMA, new ArrayList(), "");
        PCollectionTuple apply = this.p.apply(Create.of(INPUT_MESSAGES)).apply(ParDo.of(new PubsubLiteReadSchemaTransformProvider.ErrorFn("Read-Error-Counter", ProtoByteUtils.getProtoBytesToRowFromSchemaFunction(PROTO_STRING_SCHEMA, "com.test.proto.MyMessage"), errorSchemaBytes, Boolean.TRUE.booleanValue())).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
        apply.get(OUTPUT_TAG).setRowSchema(buildSchemaWithAttributes);
        apply.get(ERROR_TAG).setRowSchema(errorSchemaBytes);
        PAssert.that(apply.get(OUTPUT_TAG)).containsInAnyOrder(new Row[]{INPUT_ROW});
        this.p.run().waitUntilFinish();
    }

    static {
        HashMap hashMap = new HashMap();
        hashMap.put("key1", "first_key");
        hashMap.put("key2", "second_key");
        STATIC_MAP = Collections.unmodifiableMap(hashMap);
        try {
            RAW_ROWS = Arrays.asList(Row.withSchema(BEAM_RAW_SCHEMA).withFieldValue("payload", "a".getBytes("UTF-8")).build(), Row.withSchema(BEAM_RAW_SCHEMA).withFieldValue("payload", "b".getBytes("UTF-8")).build(), Row.withSchema(BEAM_RAW_SCHEMA).withFieldValue("payload", "c".getBytes("UTF-8")).build());
            ROWS_WITH_ATTRIBUTES = Arrays.asList(Row.withSchema(BEAM_SCHEMA_ATTRIBUTES).withFieldValue("name", "a").withFieldValue("key1", "first_key").withFieldValue("key2", "second_key").build(), Row.withSchema(BEAM_SCHEMA_ATTRIBUTES).withFieldValue("name", "b").withFieldValue("key1", "first_key").withFieldValue("key2", "second_key").build(), Row.withSchema(BEAM_SCHEMA_ATTRIBUTES).withFieldValue("name", "c").withFieldValue("key1", "first_key").withFieldValue("key2", "second_key").build());
            ROWS_WITH_ATTRIBUTES_MAP = Arrays.asList(Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_MAP).withFieldValue("name", "a").withFieldValue("attrs", STATIC_MAP).build(), Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_MAP).withFieldValue("name", "b").withFieldValue("attrs", STATIC_MAP).build(), Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_MAP).withFieldValue("name", "c").withFieldValue("attrs", STATIC_MAP).build());
            ROWS_WITH_ATTRIBUTES_AND_MAP = Arrays.asList(Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_AND_MAP).withFieldValue("name", "a").withFieldValue("key1", "first_key").withFieldValue("key2", "second_key").withFieldValue("attrs", STATIC_MAP).build(), Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_AND_MAP).withFieldValue("name", "b").withFieldValue("key1", "first_key").withFieldValue("key2", "second_key").withFieldValue("attrs", STATIC_MAP).build(), Row.withSchema(BEAM_SCHEMA_ATTRIBUTES_AND_MAP).withFieldValue("name", "c").withFieldValue("key1", "first_key").withFieldValue("key2", "second_key").withFieldValue("attrs", STATIC_MAP).build());
            ROWS = Arrays.asList(Row.withSchema(BEAM_SCHEMA).withFieldValue("name", "a").build(), Row.withSchema(BEAM_SCHEMA).withFieldValue("name", "b").build(), Row.withSchema(BEAM_SCHEMA).withFieldValue("name", "c").build());
            ATTRIBUTE_VALUES_MAP = new HashMap();
            ATTRIBUTE_VALUES_MAP.put("key1", AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8("first_key")).build());
            ATTRIBUTE_VALUES_MAP.put("key2", AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8("second_key")).build());
            MESSAGES = Arrays.asList(SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"name\":\"a\"}")).putAllAttributes(ATTRIBUTE_VALUES_MAP).build()).build(), SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"name\":\"b\"}")).putAllAttributes(ATTRIBUTE_VALUES_MAP).build()).build(), SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"name\":\"c\"}")).putAllAttributes(ATTRIBUTE_VALUES_MAP).build()).build());
            RAW_MESSAGES = Arrays.asList(SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("a")).putAllAttributes(ATTRIBUTE_VALUES_MAP).build()).build(), SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("b")).putAllAttributes(ATTRIBUTE_VALUES_MAP).build()).build(), SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("c")).putAllAttributes(ATTRIBUTE_VALUES_MAP).build()).build());
            MESSAGESWITHERROR = Arrays.asList(SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"error\":\"a\"}")).build()).build(), SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"error\":\"b\"}")).build()).build(), SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"error\":\"c\"}")).build()).build());
            BEAM_PROTO_SCHEMA = Schema.builder().addField("id", Schema.FieldType.INT32).addField("name", Schema.FieldType.STRING).addField("active", Schema.FieldType.BOOLEAN).addField("address", Schema.FieldType.row(Schema.builder().addField("city", Schema.FieldType.STRING).addField("street", Schema.FieldType.STRING).addField("state", Schema.FieldType.STRING).addField("zip_code", Schema.FieldType.STRING).build())).build();
            INPUT_ROW = Row.withSchema(BEAM_PROTO_SCHEMA).withFieldValue("id", 1234).withFieldValue("name", "Doe").withFieldValue("active", false).withFieldValue("address.city", "seattle").withFieldValue("address.street", "fake street").withFieldValue("address.zip_code", "TO-1234").withFieldValue("address.state", "wa").build();
            INPUT_MAPPER = ProtoByteUtils.getRowToProtoBytesFromSchema(PROTO_STRING_SCHEMA, "com.test.proto.MyMessage");
            INPUT_SOURCE = (byte[]) INPUT_MAPPER.apply(INPUT_ROW);
            INPUT_MESSAGES = Collections.singletonList(SequencedMessage.newBuilder().setMessage(PubSubMessage.newBuilder().setData(ByteString.copyFrom(INPUT_SOURCE)).putAllAttributes(ATTRIBUTE_VALUES_MAP).build()).build());
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
        }
    }
}
