package org.apache.beam.sdk.io.kafka;

import com.google.auto.service.AutoService;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService({SchemaTransformProvider.class})
/* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.class */
public class KafkaReadSchemaTransformProvider extends TypedSchemaTransformProvider<KafkaReadSchemaTransformConfiguration> {
    private static final Logger LOG;
    public static final TupleTag<Row> OUTPUT_TAG;
    public static final TupleTag<Row> ERROR_TAG;
    public static final Schema ERROR_SCHEMA;
    final Boolean isTest;
    final Integer testTimeoutSecs;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider$ConsumerFactoryWithGcsTrustStores.class */
    public static class ConsumerFactoryWithGcsTrustStores implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
        private ConsumerFactoryWithGcsTrustStores() {
        }

        @Override // org.apache.beam.sdk.transforms.SerializableFunction, org.apache.beam.sdk.transforms.ProcessFunction
        public Consumer<byte[], byte[]> apply(Map<String, Object> map) {
            return KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN.apply((Map) map.entrySet().stream().map(entry -> {
                return Maps.immutableEntry((String) entry.getKey(), identityOrGcsToLocalFile(entry.getValue()));
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            })));
        }

        private static Object identityOrGcsToLocalFile(Object obj) {
            if (!(obj instanceof String)) {
                return obj;
            }
            String str = (String) obj;
            if (!str.startsWith("gs://")) {
                return obj;
            }
            try {
                Path createTempFile = Files.createTempFile("", "", new FileAttribute[0]);
                KafkaReadSchemaTransformProvider.LOG.info("Downloading {} into local filesystem ({})", str, createTempFile.toAbsolutePath());
                ReadableByteChannel open = FileSystems.open(FileSystems.match(str).metadata().get(0).resourceId());
                FileOutputStream fileOutputStream = new FileOutputStream(createTempFile.toFile());
                WritableByteChannel newChannel = Channels.newChannel(fileOutputStream);
                ByteBuffer allocate = ByteBuffer.allocate(1024);
                while (open.read(allocate) != -1) {
                    allocate.flip();
                    newChannel.write(allocate);
                    allocate.compact();
                }
                open.close();
                newChannel.close();
                fileOutputStream.close();
                return createTempFile.toAbsolutePath().toString();
            } catch (IOException e) {
                throw new IllegalArgumentException(String.format("Unable to fetch file %s to be used locally to create a Kafka Consumer.", str));
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider$ErrorFn.class */
    public static class ErrorFn extends DoFn<byte[], Row> {
        private SerializableFunction<byte[], Row> valueMapper;
        private Counter errorCounter;
        private Long errorsInBundle = 0L;

        public ErrorFn(String str, SerializableFunction<byte[], Row> serializableFunction) {
            this.errorCounter = Metrics.counter((Class<?>) KafkaReadSchemaTransformProvider.class, str);
            this.valueMapper = serializableFunction;
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element byte[] bArr, DoFn.MultiOutputReceiver multiOutputReceiver) {
            try {
                multiOutputReceiver.get(KafkaReadSchemaTransformProvider.OUTPUT_TAG).output(this.valueMapper.apply(bArr));
            } catch (Exception e) {
                this.errorsInBundle = Long.valueOf(this.errorsInBundle.longValue() + 1);
                KafkaReadSchemaTransformProvider.LOG.warn("Error while parsing the element", (Throwable) e);
                multiOutputReceiver.get(KafkaReadSchemaTransformProvider.ERROR_TAG).output(Row.withSchema(KafkaReadSchemaTransformProvider.ERROR_SCHEMA).addValues(e.toString(), bArr).build());
            }
        }

        @DoFn.FinishBundle
        public void finish(DoFn<byte[], Row>.FinishBundleContext finishBundleContext) {
            this.errorCounter.inc(this.errorsInBundle.longValue());
            this.errorsInBundle = 0L;
        }
    }

    public KafkaReadSchemaTransformProvider() {
        this(false, 0);
    }

    @VisibleForTesting
    KafkaReadSchemaTransformProvider(Boolean bool, Integer num) {
        this.isTest = bool;
        this.testTimeoutSecs = num;
    }

    @Override // org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider
    protected Class<KafkaReadSchemaTransformConfiguration> configurationClass() {
        return KafkaReadSchemaTransformConfiguration.class;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider
    public SchemaTransform from(final KafkaReadSchemaTransformConfiguration kafkaReadSchemaTransformConfiguration) {
        String schema = kafkaReadSchemaTransformConfiguration.getSchema();
        Integer valueOf = Integer.valueOf(kafkaReadSchemaTransformConfiguration.hashCode() % Integer.MAX_VALUE);
        String str = (String) MoreObjects.firstNonNull(kafkaReadSchemaTransformConfiguration.getAutoOffsetResetConfig(), "latest");
        final HashMap hashMap = new HashMap((Map) MoreObjects.firstNonNull(kafkaReadSchemaTransformConfiguration.getConsumerConfigUpdates(), new HashMap()));
        hashMap.put("group.id", "kafka-read-provider-" + valueOf);
        hashMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        hashMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        hashMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, str);
        if (schema == null || schema.isEmpty()) {
            if ($assertionsDisabled || !Strings.isNullOrEmpty(kafkaReadSchemaTransformConfiguration.getConfluentSchemaRegistryUrl())) {
                return new SchemaTransform() { // from class: org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider.4
                    static final /* synthetic */ boolean $assertionsDisabled;

                    @Override // org.apache.beam.sdk.transforms.PTransform
                    public PCollectionRowTuple expand(PCollectionRowTuple pCollectionRowTuple) {
                        String confluentSchemaRegistryUrl = kafkaReadSchemaTransformConfiguration.getConfluentSchemaRegistryUrl();
                        String confluentSchemaRegistrySubject = kafkaReadSchemaTransformConfiguration.getConfluentSchemaRegistrySubject();
                        if (confluentSchemaRegistryUrl == null || confluentSchemaRegistrySubject == null) {
                            throw new IllegalArgumentException("To read from Kafka, a schema must be provided directly or though Confluent Schema Registry. Make sure you are providing one of these parameters.");
                        }
                        KafkaIO.Read withValueDeserializer = KafkaIO.read().withTopic(kafkaReadSchemaTransformConfiguration.getTopic()).withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores()).withBootstrapServers(kafkaReadSchemaTransformConfiguration.getBootstrapServers()).withConsumerConfigUpdates(hashMap).withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(confluentSchemaRegistryUrl, confluentSchemaRegistrySubject));
                        if (KafkaReadSchemaTransformProvider.this.isTest.booleanValue()) {
                            withValueDeserializer = withValueDeserializer.withMaxReadTime(Duration.standardSeconds(KafkaReadSchemaTransformProvider.this.testTimeoutSecs.intValue()));
                        }
                        PCollection pCollection = (PCollection) ((PCollection) pCollectionRowTuple.getPipeline().apply(withValueDeserializer.withoutMetadata())).apply(Values.create());
                        if ($assertionsDisabled || pCollection.getCoder().getClass() == AvroCoder.class) {
                            return PCollectionRowTuple.of("output", (PCollection) pCollection.setCoder(AvroUtils.schemaCoder(((AvroCoder) pCollection.getCoder()).getSchema())).apply(Convert.toRows()));
                        }
                        throw new AssertionError();
                    }

                    static {
                        $assertionsDisabled = !KafkaReadSchemaTransformProvider.class.desiredAssertionStatus();
                    }
                };
            }
            throw new AssertionError("To read from Kafka, a schema must be provided directly or though Confluent Schema Registry. Neither seems to have been provided.");
        }
        if (!$assertionsDisabled && !Strings.isNullOrEmpty(kafkaReadSchemaTransformConfiguration.getConfluentSchemaRegistryUrl())) {
            throw new AssertionError("To read from Kafka, a schema must be provided directly or though Confluent Schema Registry, but not both.");
        }
        final Schema beamSchemaFromJsonSchema = Objects.equals(kafkaReadSchemaTransformConfiguration.getFormat(), "JSON") ? JsonUtils.beamSchemaFromJsonSchema(schema) : AvroUtils.toBeamSchema(new Schema.Parser().parse(schema));
        final SimpleFunction<byte[], Row> jsonBytesToRowFunction = Objects.equals(kafkaReadSchemaTransformConfiguration.getFormat(), "JSON") ? JsonUtils.getJsonBytesToRowFunction(beamSchemaFromJsonSchema) : AvroUtils.getAvroBytesToRowFunction(beamSchemaFromJsonSchema);
        return new SchemaTransform() { // from class: org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider.3
            @Override // org.apache.beam.sdk.transforms.PTransform
            public PCollectionRowTuple expand(PCollectionRowTuple pCollectionRowTuple) {
                KafkaIO.Read<byte[], byte[]> withBootstrapServers = KafkaIO.readBytes().withConsumerConfigUpdates(hashMap).withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores()).withTopic(kafkaReadSchemaTransformConfiguration.getTopic()).withBootstrapServers(kafkaReadSchemaTransformConfiguration.getBootstrapServers());
                if (KafkaReadSchemaTransformProvider.this.isTest.booleanValue()) {
                    withBootstrapServers = withBootstrapServers.withMaxReadTime(Duration.standardSeconds(KafkaReadSchemaTransformProvider.this.testTimeoutSecs.intValue()));
                }
                PCollectionTuple pCollectionTuple = (PCollectionTuple) ((PCollection) ((PCollection) pCollectionRowTuple.getPipeline().apply(withBootstrapServers.withoutMetadata())).apply(Values.create())).apply(ParDo.of(new ErrorFn("Kafka-read-error-counter", jsonBytesToRowFunction)).withOutputTags(KafkaReadSchemaTransformProvider.OUTPUT_TAG, TupleTagList.of(KafkaReadSchemaTransformProvider.ERROR_TAG)));
                return PCollectionRowTuple.of("output", pCollectionTuple.get(KafkaReadSchemaTransformProvider.OUTPUT_TAG).setRowSchema(beamSchemaFromJsonSchema), "errors", pCollectionTuple.get(KafkaReadSchemaTransformProvider.ERROR_TAG).setRowSchema(KafkaReadSchemaTransformProvider.ERROR_SCHEMA));
            }
        };
    }

    @Override // org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider
    public String identifier() {
        return "beam:schematransform:org.apache.beam:kafka_read:v1";
    }

    @Override // org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider
    public List<String> inputCollectionNames() {
        return Lists.newArrayList();
    }

    @Override // org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider
    public List<String> outputCollectionNames() {
        return Arrays.asList("output", "errors");
    }

    static {
        $assertionsDisabled = !KafkaReadSchemaTransformProvider.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) KafkaReadSchemaTransformProvider.class);
        OUTPUT_TAG = new TupleTag<Row>() { // from class: org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider.1
        };
        ERROR_TAG = new TupleTag<Row>() { // from class: org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider.2
        };
        ERROR_SCHEMA = org.apache.beam.sdk.schemas.Schema.builder().addStringField("error").addNullableByteArrayField("row").build();
    }
}
