package org.apache.beam.sdk.io.gcp.bigquery.providers;

import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.bigquery.providers.AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration;
import org.apache.beam.sdk.io.gcp.bigquery.providers.AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration_ErrorHandling;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;

@AutoService({SchemaTransformProvider.class})
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.class */
public class BigQueryStorageWriteApiSchemaTransformProvider extends TypedSchemaTransformProvider<BigQueryStorageWriteApiSchemaTransformConfiguration> {
    private static final Integer DEFAULT_TRIGGER_FREQUENCY_SECS = 5;
    private static final Duration DEFAULT_TRIGGERING_FREQUENCY = Duration.standardSeconds(DEFAULT_TRIGGER_FREQUENCY_SECS.intValue());
    private static final String INPUT_ROWS_TAG = "input";
    private static final String FAILED_ROWS_TAG = "FailedRows";
    private static final String FAILED_ROWS_WITH_ERRORS_TAG = "FailedRowsWithErrors";
    protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS";

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransform.class */
    public static class BigQueryStorageWriteApiSchemaTransform extends SchemaTransform {
        private BigQueryServices testBigQueryServices = null;
        private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransform$ElementCounterFn.class */
        public static class ElementCounterFn<T> extends DoFn<T, T> {
            private Counter bqGenericElementCounter;
            private Long elementsInBundle = 0L;

            ElementCounterFn(String str) {
                this.bqGenericElementCounter = Metrics.counter(BigQueryStorageWriteApiSchemaTransform.class, str);
            }

            @DoFn.ProcessElement
            public void process(DoFn<T, T>.ProcessContext processContext) {
                this.elementsInBundle = Long.valueOf(this.elementsInBundle.longValue() + 1);
                processContext.output(processContext.element());
            }

            @DoFn.FinishBundle
            public void finish(DoFn<T, T>.FinishBundleContext finishBundleContext) {
                this.bqGenericElementCounter.inc(this.elementsInBundle.longValue());
                this.elementsInBundle = 0L;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransform$FailOnError.class */
        public static class FailOnError extends DoFn<BigQueryStorageApiInsertError, Void> {
            private FailOnError() {
            }

            @DoFn.ProcessElement
            public void process(DoFn<BigQueryStorageApiInsertError, Void>.ProcessContext processContext) {
                throw new RuntimeException(((BigQueryStorageApiInsertError) processContext.element()).getErrorMessage());
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransform$NoOutputDoFn.class */
        public static class NoOutputDoFn<T> extends DoFn<T, Row> {
            private NoOutputDoFn() {
            }

            @DoFn.ProcessElement
            public void process(DoFn<T, Row>.ProcessContext processContext) {
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransform$RowDynamicDestinations.class */
        public static class RowDynamicDestinations extends DynamicDestinations<Row, String> {
            Schema schema;

            RowDynamicDestinations(Schema schema) {
                this.schema = schema;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations
            public String getDestination(ValueInSingleWindow<Row> valueInSingleWindow) {
                return ((Row) valueInSingleWindow.getValue()).getString("destination");
            }

            @Override // org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations
            public TableDestination getTable(String str) {
                return new TableDestination(str, (String) null);
            }

            @Override // org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations
            public TableSchema getSchema(String str) {
                return BigQueryUtils.toTableSchema(this.schema);
            }
        }

        BigQueryStorageWriteApiSchemaTransform(BigQueryStorageWriteApiSchemaTransformConfiguration bigQueryStorageWriteApiSchemaTransformConfiguration) {
            bigQueryStorageWriteApiSchemaTransformConfiguration.validate();
            this.configuration = bigQueryStorageWriteApiSchemaTransformConfiguration;
        }

        @VisibleForTesting
        public void setBigQueryServices(BigQueryServices bigQueryServices) {
            this.testBigQueryServices = bigQueryServices;
        }

        public PCollectionRowTuple expand(PCollectionRowTuple pCollectionRowTuple) {
            Preconditions.checkArgument(pCollectionRowTuple.has(BigQueryStorageWriteApiSchemaTransformProvider.INPUT_ROWS_TAG), "Missing expected input tag: %s", BigQueryStorageWriteApiSchemaTransformProvider.INPUT_ROWS_TAG);
            PCollection pCollection = pCollectionRowTuple.get(BigQueryStorageWriteApiSchemaTransformProvider.INPUT_ROWS_TAG);
            BigQueryIO.Write<Row> createStorageWriteApiTransform = createStorageWriteApiTransform(pCollection.getSchema());
            if (pCollection.isBounded() == PCollection.IsBounded.UNBOUNDED) {
                Long triggeringFrequencySeconds = this.configuration.getTriggeringFrequencySeconds();
                Boolean autoSharding = this.configuration.getAutoSharding();
                int intValue = this.configuration.getNumStreams() == null ? 0 : this.configuration.getNumStreams().intValue();
                if (!(this.configuration.getUseAtLeastOnceSemantics() != null && this.configuration.getUseAtLeastOnceSemantics().booleanValue())) {
                    createStorageWriteApiTransform = createStorageWriteApiTransform.withTriggeringFrequency((triggeringFrequencySeconds == null || triggeringFrequencySeconds.longValue() <= 0) ? BigQueryStorageWriteApiSchemaTransformProvider.DEFAULT_TRIGGERING_FREQUENCY : Duration.standardSeconds(triggeringFrequencySeconds.longValue()));
                }
                if (intValue > 0) {
                    createStorageWriteApiTransform = createStorageWriteApiTransform.withNumStorageWriteApiStreams(intValue);
                } else if (autoSharding == null || autoSharding.booleanValue()) {
                    createStorageWriteApiTransform = createStorageWriteApiTransform.withAutoSharding();
                }
            }
            Schema schema = pCollection.getSchema();
            WriteResult writeResult = (WriteResult) pCollection.apply("element-count", ParDo.of(new ElementCounterFn("BigQuery-write-element-counter"))).setRowSchema(schema).apply(createStorageWriteApiTransform);
            PCollection rowSchema = writeResult.getFailedStorageApiInserts().apply("post-write", ParDo.of(new NoOutputDoFn())).setRowSchema(Schema.of(new Schema.Field[0]));
            if (this.configuration.getErrorHandling() == null) {
                writeResult.getFailedStorageApiInserts().apply("Error on failed inserts", ParDo.of(new FailOnError()));
                return PCollectionRowTuple.of("post_write", rowSchema);
            }
            writeResult.getFailedStorageApiInserts().apply("error-count", ParDo.of(new ElementCounterFn("BigQuery-write-error-counter")));
            Schema of = Schema.of(new Schema.Field[]{Schema.Field.of("failed_row", Schema.FieldType.row(schema)), Schema.Field.of("error_message", Schema.FieldType.STRING)});
            return PCollectionRowTuple.of("post_write", rowSchema).and(this.configuration.getErrorHandling().getOutput(), writeResult.getFailedStorageApiInserts().apply("Construct failed rows and errors", MapElements.into(TypeDescriptors.rows()).via(bigQueryStorageApiInsertError -> {
                return Row.withSchema(of).withFieldValue("error_message", bigQueryStorageApiInsertError.getErrorMessage()).withFieldValue("failed_row", BigQueryUtils.toBeamRow(schema, bigQueryStorageApiInsertError.getRow())).build();
            })).setRowSchema(of));
        }

        BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
            BigQueryIO.Write<Row> useBeamSchema;
            BigQueryIO.Write withWriteDisposition = BigQueryIO.write().withMethod((this.configuration.getUseAtLeastOnceSemantics() == null || !this.configuration.getUseAtLeastOnceSemantics().booleanValue()) ? BigQueryIO.Write.Method.STORAGE_WRITE_API : BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE).withFormatFunction(BigQueryUtils.toTableRow()).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
            if (this.configuration.getTable().equals(BigQueryStorageWriteApiSchemaTransformProvider.DYNAMIC_DESTINATIONS)) {
                Preconditions.checkArgument(schema.getFieldNames().equals(Arrays.asList("destination", "record")), "When writing to dynamic destinations, we expect Row Schema with a \"destination\" string field and a \"record\" Row field.");
                useBeamSchema = withWriteDisposition.to(new RowDynamicDestinations(schema.getField("record").getType().getRowSchema())).withFormatFunction(row -> {
                    return BigQueryUtils.toTableRow(row.getRow("record"));
                });
            } else {
                useBeamSchema = withWriteDisposition.to(this.configuration.getTable()).useBeamSchema();
            }
            if (!Strings.isNullOrEmpty(this.configuration.getCreateDisposition())) {
                useBeamSchema = useBeamSchema.withCreateDisposition(BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS.get(this.configuration.getCreateDisposition().toUpperCase()));
            }
            if (!Strings.isNullOrEmpty(this.configuration.getWriteDisposition())) {
                useBeamSchema = useBeamSchema.withWriteDisposition(BigQueryStorageWriteApiSchemaTransformConfiguration.WRITE_DISPOSITIONS.get(this.configuration.getWriteDisposition().toUpperCase()));
            }
            if (this.testBigQueryServices != null) {
                useBeamSchema = useBeamSchema.withTestServices(this.testBigQueryServices);
            }
            return useBeamSchema;
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -2137105231:
                    if (implMethodName.equals("lambda$createStorageWriteApiTransform$8708a7c5$1")) {
                        z = true;
                        break;
                    }
                    break;
                case -1138352456:
                    if (implMethodName.equals("lambda$expand$328833dc$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransform") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/schemas/Schema;Lorg/apache/beam/sdk/schemas/Schema;Lorg/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertError;)Lorg/apache/beam/sdk/values/Row;")) {
                        Schema schema = (Schema) serializedLambda.getCapturedArg(0);
                        Schema schema2 = (Schema) serializedLambda.getCapturedArg(1);
                        return bigQueryStorageApiInsertError -> {
                            return Row.withSchema(schema).withFieldValue("error_message", bigQueryStorageApiInsertError.getErrorMessage()).withFieldValue("failed_row", BigQueryUtils.toBeamRow(schema2, bigQueryStorageApiInsertError.getRow())).build();
                        };
                    }
                    break;
                case MetadataTableAdminDao.CURRENT_METADATA_TABLE_VERSION /* 1 */:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransform") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/Row;)Lcom/google/api/services/bigquery/model/TableRow;")) {
                        return row -> {
                            return BigQueryUtils.toTableRow(row.getRow("record"));
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    @DefaultSchema(AutoValueSchema.class)
    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransformConfiguration.class */
    public static abstract class BigQueryStorageWriteApiSchemaTransformConfiguration {
        static final Map<String, BigQueryIO.Write.CreateDisposition> CREATE_DISPOSITIONS = ImmutableMap.builder().put(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED.name(), BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).put(BigQueryIO.Write.CreateDisposition.CREATE_NEVER.name(), BigQueryIO.Write.CreateDisposition.CREATE_NEVER).build();
        static final Map<String, BigQueryIO.Write.WriteDisposition> WRITE_DISPOSITIONS = ImmutableMap.builder().put(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE.name(), BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE).put(BigQueryIO.Write.WriteDisposition.WRITE_EMPTY.name(), BigQueryIO.Write.WriteDisposition.WRITE_EMPTY).put(BigQueryIO.Write.WriteDisposition.WRITE_APPEND.name(), BigQueryIO.Write.WriteDisposition.WRITE_APPEND).build();

        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransformConfiguration$Builder.class */
        public static abstract class Builder {
            public abstract Builder setTable(String str);

            public abstract Builder setCreateDisposition(String str);

            public abstract Builder setWriteDisposition(String str);

            public abstract Builder setTriggeringFrequencySeconds(Long l);

            public abstract Builder setUseAtLeastOnceSemantics(Boolean bool);

            public abstract Builder setAutoSharding(Boolean bool);

            public abstract Builder setNumStreams(Integer num);

            public abstract Builder setErrorHandling(ErrorHandling errorHandling);

            public abstract BigQueryStorageWriteApiSchemaTransformConfiguration build();
        }

        @AutoValue
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransformConfiguration$ErrorHandling.class */
        public static abstract class ErrorHandling {

            @AutoValue.Builder
            /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider$BigQueryStorageWriteApiSchemaTransformConfiguration$ErrorHandling$Builder.class */
            public static abstract class Builder {
                public abstract Builder setOutput(String str);

                public abstract ErrorHandling build();
            }

            @SchemaFieldDescription("The name of the output PCollection containing failed writes.")
            public abstract String getOutput();

            public static Builder builder() {
                return new AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration_ErrorHandling.Builder();
            }
        }

        public void validate() {
            Preconditions.checkArgument(!Strings.isNullOrEmpty(getTable()), "Invalid BigQuery Storage Write configuration: Table spec for a BigQuery Write must be specified.");
            if (!getTable().equals(BigQueryStorageWriteApiSchemaTransformProvider.DYNAMIC_DESTINATIONS)) {
                Preconditions.checkNotNull(BigQueryHelpers.parseTableSpec(getTable()));
            }
            if (!Strings.isNullOrEmpty(getCreateDisposition())) {
                Preconditions.checkNotNull(CREATE_DISPOSITIONS.get(getCreateDisposition().toUpperCase()), "Invalid BigQuery Storage Write configuration: Invalid create disposition (%s) was specified. Available dispositions are: %s", getCreateDisposition(), CREATE_DISPOSITIONS.keySet());
            }
            if (!Strings.isNullOrEmpty(getWriteDisposition())) {
                Preconditions.checkNotNull(WRITE_DISPOSITIONS.get(getWriteDisposition().toUpperCase()), "Invalid BigQuery Storage Write configuration: Invalid write disposition (%s) was specified. Available dispositions are: %s", getWriteDisposition(), WRITE_DISPOSITIONS.keySet());
            }
            if (getErrorHandling() != null) {
                Preconditions.checkArgument(!Strings.isNullOrEmpty(getErrorHandling().getOutput()), "Invalid BigQuery Storage Write configuration: Output must not be empty if error handling specified.");
            }
            if (getAutoSharding() == null || !getAutoSharding().booleanValue() || getNumStreams() == null) {
                return;
            }
            Preconditions.checkArgument(getNumStreams().intValue() == 0, "Invalid BigQuery Storage Write configuration: Cannot set a fixed number of streams when auto-sharding is enabled. Please pick only one of the two options.");
        }

        public static Builder builder() {
            return new AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration.Builder();
        }

        @SchemaFieldDescription("The bigquery table to write to. Format: [${PROJECT}:]${DATASET}.${TABLE}")
        public abstract String getTable();

        @SchemaFieldDescription("Optional field that specifies whether the job is allowed to create new tables. The following values are supported: CREATE_IF_NEEDED (the job may create the table), CREATE_NEVER (the job must fail if the table does not exist already).")
        @Nullable
        public abstract String getCreateDisposition();

        @SchemaFieldDescription("Specifies the action that occurs if the destination table already exists. The following values are supported: WRITE_TRUNCATE (overwrites the table data), WRITE_APPEND (append the data to the table), WRITE_EMPTY (job must fail if the table is not empty).")
        @Nullable
        public abstract String getWriteDisposition();

        @SchemaFieldDescription("Determines how often to 'commit' progress into BigQuery. Default is every 5 seconds.")
        @Nullable
        public abstract Long getTriggeringFrequencySeconds();

        @SchemaFieldDescription("This option enables lower latency for insertions to BigQuery but may ocassionally duplicate data elements.")
        @Nullable
        public abstract Boolean getUseAtLeastOnceSemantics();

        @SchemaFieldDescription("This option enables using a dynamically determined number of Storage Write API streams to write to BigQuery. Only applicable to unbounded data.")
        @Nullable
        public abstract Boolean getAutoSharding();

        @SchemaFieldDescription("Specifies the number of write streams that the Storage API sink will use. This parameter is only applicable when writing unbounded data.")
        @Nullable
        public abstract Integer getNumStreams();

        @SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
        @Nullable
        public abstract ErrorHandling getErrorHandling();
    }

    protected Class<BigQueryStorageWriteApiSchemaTransformConfiguration> configurationClass() {
        return BigQueryStorageWriteApiSchemaTransformConfiguration.class;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SchemaTransform from(BigQueryStorageWriteApiSchemaTransformConfiguration bigQueryStorageWriteApiSchemaTransformConfiguration) {
        return new BigQueryStorageWriteApiSchemaTransform(bigQueryStorageWriteApiSchemaTransformConfiguration);
    }

    public String identifier() {
        return String.format("beam:schematransform:org.apache.beam:bigquery_storage_write:v2", new Object[0]);
    }

    public String description() {
        return String.format("Writes data to BigQuery using the Storage Write API (https://cloud.google.com/bigquery/docs/write-api).\n\nThis expects a single PCollection of Beam Rows and outputs two dead-letter queues (DLQ) that contain failed rows. The first DLQ has tag [%s] and contains the failed rows. The second DLQ has tag [%s] and contains failed rows and along with their respective errors.", FAILED_ROWS_TAG, FAILED_ROWS_WITH_ERRORS_TAG);
    }

    public List<String> inputCollectionNames() {
        return Collections.singletonList(INPUT_ROWS_TAG);
    }

    public List<String> outputCollectionNames() {
        return Arrays.asList(FAILED_ROWS_TAG, FAILED_ROWS_WITH_ERRORS_TAG, "errors");
    }
}
