/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.ProjectId;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.protobuf.ByteString;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.gcp.pubsublite.AutoValue_PubsubLiteWriteSchemaTransformProvider_PubsubLiteWriteSchemaTransformConfiguration;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService(value={SchemaTransformProvider.class})
public class PubsubLiteWriteSchemaTransformProvider
extends TypedSchemaTransformProvider<PubsubLiteWriteSchemaTransformConfiguration> {
    public static final @UnknownKeyFor @NonNull @Initialized String SUPPORTED_FORMATS_STR = "JSON,AVRO";
    public static final @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> SUPPORTED_FORMATS = Sets.newHashSet((Object[])"JSON,AVRO".split(","));
    public static final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized PubSubMessage> OUTPUT_TAG = new TupleTag<PubSubMessage>(){};
    public static final @UnknownKeyFor @NonNull @Initialized TupleTag<@UnknownKeyFor @NonNull @Initialized Row> ERROR_TAG = new TupleTag<Row>(){};
    public static final @UnknownKeyFor @NonNull @Initialized Schema ERROR_SCHEMA = Schema.builder().addStringField("error").addNullableByteArrayField("row").build();
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(PubsubLiteWriteSchemaTransformProvider.class);

    protected @UnknownKeyFor @NonNull @Initialized Class<@UnknownKeyFor @NonNull @Initialized PubsubLiteWriteSchemaTransformConfiguration> configurationClass() {
        return PubsubLiteWriteSchemaTransformConfiguration.class;
    }

    public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(final @UnknownKeyFor @NonNull @Initialized PubsubLiteWriteSchemaTransformConfiguration configuration) {
        if (!SUPPORTED_FORMATS.contains(configuration.getFormat())) {
            throw new IllegalArgumentException("Format " + configuration.getFormat() + " is not supported. Supported formats are: " + String.join((CharSequence)", ", SUPPORTED_FORMATS));
        }
        return new SchemaTransform(){

            public @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple expand(@UnknownKeyFor @NonNull @Initialized PCollectionRowTuple input) {
                Schema inputSchema = input.get("input").getSchema();
                SimpleFunction toBytesFn = configuration.getFormat().equals("JSON") ? JsonUtils.getRowToJsonBytesFunction((Schema)inputSchema) : AvroUtils.getRowToAvroBytesFunction((Schema)inputSchema);
                PCollectionTuple outputTuple = (PCollectionTuple)input.get("input").apply("Map Rows to PubSubMessages", (PTransform)ParDo.of((DoFn)new ErrorCounterFn("PubSubLite-write-error-counter", (SerializableFunction<Row, byte[]>)toBytesFn)).withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
                ((PCollection)outputTuple.get(OUTPUT_TAG).apply("Add UUIDs", PubsubLiteIO.addUuids())).apply("Write to PS Lite", PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(((TopicPath.Builder)((TopicPath.Builder)TopicPath.newBuilder().setProject(ProjectId.of((String)configuration.getProject()))).setName(TopicName.of((String)configuration.getTopicName())).setLocation(CloudRegionOrZone.parse((String)configuration.getLocation()))).build()).build()));
                return PCollectionRowTuple.of((String)"errors", (PCollection)outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA));
            }
        };
    }

    public @UnknownKeyFor @NonNull @Initialized String identifier() {
        return "beam:schematransform:org.apache.beam:pubsublite_write:v1";
    }

    public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> inputCollectionNames() {
        return Collections.singletonList("input");
    }

    public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> outputCollectionNames() {
        return Collections.singletonList("errors");
    }

    @DefaultSchema(value=AutoValueSchema.class)
    @AutoValue
    public static abstract class PubsubLiteWriteSchemaTransformConfiguration {
        @SchemaFieldDescription(value="The GCP project where the Pubsub Lite reservation resides. This can be a project number of a project ID.")
        public abstract @UnknownKeyFor @NonNull @Initialized String getProject();

        @SchemaFieldDescription(value="The region or zone where the Pubsub Lite reservation resides.")
        public abstract @UnknownKeyFor @NonNull @Initialized String getLocation();

        @SchemaFieldDescription(value="The name of the topic to publish data into. This will be concatenated with the project and location parameters to build a full topic path.")
        public abstract @UnknownKeyFor @NonNull @Initialized String getTopicName();

        @SchemaFieldDescription(value="The encoding format for the data stored in Pubsub Lite. Valid options are: JSON,AVRO")
        public abstract @UnknownKeyFor @NonNull @Initialized String getFormat();

        public static @UnknownKeyFor @NonNull @Initialized Builder builder() {
            return new AutoValue_PubsubLiteWriteSchemaTransformProvider_PubsubLiteWriteSchemaTransformConfiguration.Builder();
        }

        @AutoValue.Builder
        public static abstract class Builder {
            public abstract @UnknownKeyFor @NonNull @Initialized Builder setProject(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setLocation(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setTopicName(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized Builder setFormat(@UnknownKeyFor @NonNull @Initialized String var1);

            public abstract @UnknownKeyFor @NonNull @Initialized PubsubLiteWriteSchemaTransformConfiguration build();
        }
    }

    public static class ErrorCounterFn
    extends DoFn<Row, PubSubMessage> {
        private @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Row, @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> toBytesFn;
        private @UnknownKeyFor @NonNull @Initialized Counter errorCounter;
        private @UnknownKeyFor @NonNull @Initialized long errorsInBundle = 0L;

        public ErrorCounterFn(@UnknownKeyFor @NonNull @Initialized String name, @UnknownKeyFor @NonNull @Initialized SerializableFunction<@UnknownKeyFor @NonNull @Initialized Row, @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized []> toBytesFn) {
            this.toBytesFn = toBytesFn;
            this.errorCounter = Metrics.counter(PubsubLiteWriteSchemaTransformProvider.class, (String)name);
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element @UnknownKeyFor @NonNull @Initialized Row row, // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized DoFn.MultiOutputReceiver receiver) {
            try {
                PubSubMessage message = PubSubMessage.newBuilder().setData(ByteString.copyFrom((byte[])Objects.requireNonNull((byte[])this.toBytesFn.apply((Object)row)))).build();
                receiver.get(OUTPUT_TAG).output((Object)message);
            }
            catch (Exception e) {
                ++this.errorsInBundle;
                LOG.warn("Error while parsing the element", (Throwable)e);
                receiver.get(ERROR_TAG).output((Object)Row.withSchema((Schema)ERROR_SCHEMA).addValues(new Object[]{e.toString(), row.toString().getBytes(StandardCharsets.UTF_8)}).build());
            }
        }

        @DoFn.FinishBundle
        public void finish() {
            this.errorCounter.inc(this.errorsInBundle);
            this.errorsInBundle = 0L;
        }
    }
}

