package org.apache.beam.sdk.io.gcp.spanner.changestreams;

import com.google.cloud.Timestamp;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.SpannerChangestreamsReadSchemaTransformProvider;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadDlqTest.class */
public class SpannerChangestreamsReadDlqTest {

    @Rule
    public transient TestPipeline p = TestPipeline.create();
    private static final TupleTag<Row> OUTPUTTAG = SpannerChangestreamsReadSchemaTransformProvider.OUTPUT_TAG;
    private static final TupleTag<Row> ERRORTAG = SpannerChangestreamsReadSchemaTransformProvider.ERROR_TAG;
    private static final Schema tableSchema = Schema.builder().addNullableInt32Field("id").addNullableStringField("name").build();
    private static final Schema tableChangeRecordSchema = Schema.builder().addStringField("operation").addStringField("commitTimestamp").addInt64Field("recordSequence").addRowField("rowValues", tableSchema).build();
    private static final Schema ERRORSCHEMA = SpannerChangestreamsReadSchemaTransformProvider.ERROR_SCHEMA;
    private static final Row record = Row.withSchema(tableSchema).withFieldValue("id", 0).withFieldValue("name", "test").build();
    private static final List<Row> ROWS = Arrays.asList(Row.withSchema(tableChangeRecordSchema).withFieldValue("operation", "INSERT").withFieldValue("commitTimestamp", "2023-06-01T18:14:22.078159000Z").withFieldValue("recordSequence", 1L).withFieldValue("rowValues", record).build());
    private static final Mod MOD = new Mod("{\r\n\"id\": \"0\"\r\n}", (String) null, "{\r\n\"id\": \"0\",\r\n\"name\": \"test\" \r\n}");
    private static final Mod INCORRECTMOD = new Mod("{\r\n\"id\": \"0\"\r\n}", (String) null, "{\r\n\"id\": \"0\",\r\n\"name\": \"test\" \r\n,\r\n\"lastName\": \"test\" \r\n}");
    private static final String TABLENAME = "test_table";
    private static final List<DataChangeRecord> message = Arrays.asList(new DataChangeRecord("token", Timestamp.parseTimestamp("2023-06-01T18:14:22.078159000Z"), "id", false, "1", TABLENAME, Collections.emptyList(), Collections.singletonList(MOD), ModType.INSERT, ValueCaptureType.NEW_VALUES, 1, 0, "tag", false, (ChangeStreamRecordMetadata) null));
    private static final List<DataChangeRecord> incorrectmessage = Arrays.asList(new DataChangeRecord("token", Timestamp.parseTimestamp("2023-06-01T18:14:22.078159000Z"), "id", false, "1", TABLENAME, Collections.emptyList(), Collections.singletonList(INCORRECTMOD), ModType.INSERT, ValueCaptureType.NEW_VALUES, 1, 0, "tag", false, (ChangeStreamRecordMetadata) null));

    @Test
    public void testSpannerChangestreamsReadDataChangeRecordToRowSuccess() throws Exception {
        PCollectionTuple apply = this.p.apply(Create.of(message)).apply(ParDo.of(new SpannerChangestreamsReadSchemaTransformProvider.DataChangeRecordToRow(TABLENAME, tableChangeRecordSchema, "SpannerChangestreams-read-error-counter")).withOutputTags(OUTPUTTAG, TupleTagList.of(ERRORTAG)));
        apply.get(OUTPUTTAG).setRowSchema(tableChangeRecordSchema);
        apply.get(ERRORTAG).setRowSchema(ERRORSCHEMA);
        PAssert.that(apply.get(OUTPUTTAG)).containsInAnyOrder(ROWS);
        this.p.run().waitUntilFinish();
    }

    @Test
    public void testSpannerChangestreamsReadDataChangeRecordToRowFailure() throws Exception {
        PCollectionTuple apply = this.p.apply(Create.of(incorrectmessage)).apply(ParDo.of(new SpannerChangestreamsReadSchemaTransformProvider.DataChangeRecordToRow(TABLENAME, tableChangeRecordSchema, "SpannerChangestreams-read-error-counter")).withOutputTags(OUTPUTTAG, TupleTagList.of(ERRORTAG)));
        apply.get(OUTPUTTAG).setRowSchema(tableChangeRecordSchema);
        apply.get(ERRORTAG).setRowSchema(ERRORSCHEMA);
        PAssert.that(apply.get(ERRORTAG).apply(Count.globally())).containsInAnyOrder(Collections.singletonList(1L));
        this.p.run().waitUntilFinish();
    }
}
