package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;

import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.schemas.transforms.Cast;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.joda.time.Instant;
import org.joda.time.ReadableDateTime;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/kafka/NestedPayloadKafkaTable.class */
class NestedPayloadKafkaTable extends BeamKafkaTable {

    @Nullable
    private final PayloadSerializer payloadSerializer;

    public NestedPayloadKafkaTable(Schema schema, String str, List<String> list, Optional<PayloadSerializer> optional) {
        super(schema, str, list);
        Preconditions.checkArgument(Schemas.isNestedSchema(this.schema));
        Schemas.validateNestedSchema(this.schema);
        if (optional.isPresent()) {
            Preconditions.checkArgument(this.schema.getField("payload").getType().getTypeName().equals(Schema.TypeName.ROW));
            this.payloadSerializer = optional.get();
        } else {
            Preconditions.checkArgument(this.schema.getField("payload").getType().equals(Schema.FieldType.BYTES));
            this.payloadSerializer = null;
        }
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable
    protected PTransform<PCollection<KafkaRecord<byte[], byte[]>>, PCollection<Row>> getPTransformForInput() {
        return new PTransform<PCollection<KafkaRecord<byte[], byte[]>>, PCollection<Row>>() { // from class: org.apache.beam.sdk.extensions.sql.meta.provider.kafka.NestedPayloadKafkaTable.1
            public PCollection<Row> expand(PCollection<KafkaRecord<byte[], byte[]>> pCollection) {
                return pCollection.apply(MapElements.into(new TypeDescriptor<Row>() { // from class: org.apache.beam.sdk.extensions.sql.meta.provider.kafka.NestedPayloadKafkaTable.1.1
                }).via(kafkaRecord -> {
                    return NestedPayloadKafkaTable.this.transformInput(kafkaRecord);
                }));
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 1989713795:
                        if (implMethodName.equals("lambda$expand$84c02c4c$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/sql/meta/provider/kafka/NestedPayloadKafkaTable$1") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/kafka/KafkaRecord;)Lorg/apache/beam/sdk/values/Row;")) {
                            AnonymousClass1 anonymousClass1 = (AnonymousClass1) serializedLambda.getCapturedArg(0);
                            return kafkaRecord -> {
                                return NestedPayloadKafkaTable.this.transformInput(kafkaRecord);
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    @VisibleForTesting
    Row transformInput(KafkaRecord<byte[], byte[]> kafkaRecord) {
        Headers headers;
        Row.FieldValueBuilder withFieldValues = Row.withSchema(getSchema()).withFieldValues(ImmutableMap.of());
        if (this.schema.hasField("message_key")) {
            withFieldValues.withFieldValue("message_key", kafkaRecord.getKV().getKey());
        }
        if (this.schema.hasField("event_timestamp")) {
            withFieldValues.withFieldValue("event_timestamp", Instant.ofEpochMilli(kafkaRecord.getTimestamp()));
        }
        if (this.schema.hasField("headers") && (headers = kafkaRecord.getHeaders()) != null) {
            ImmutableListMultimap.Builder builder = ImmutableListMultimap.builder();
            headers.forEach(header -> {
                builder.put(header.key(), header.value());
            });
            ImmutableList.Builder builder2 = ImmutableList.builder();
            builder.build().asMap().forEach((str, collection) -> {
                builder2.add(Row.withSchema(Schemas.HEADERS_ENTRY_SCHEMA).withFieldValue("key", str).withFieldValue("values", collection).build());
            });
            withFieldValues.withFieldValue("headers", builder2.build());
        }
        if (this.payloadSerializer == null) {
            withFieldValues.withFieldValue("payload", kafkaRecord.getKV().getValue());
        } else if (((byte[]) kafkaRecord.getKV().getValue()) != null) {
            withFieldValues.withFieldValue("payload", this.payloadSerializer.deserialize((byte[]) kafkaRecord.getKV().getValue()));
        }
        return withFieldValues.build();
    }

    @Override // org.apache.beam.sdk.extensions.sql.meta.provider.kafka.BeamKafkaTable
    protected PTransform<PCollection<Row>, PCollection<ProducerRecord<byte[], byte[]>>> getPTransformForOutput() {
        return new PTransform<PCollection<Row>, PCollection<ProducerRecord<byte[], byte[]>>>() { // from class: org.apache.beam.sdk.extensions.sql.meta.provider.kafka.NestedPayloadKafkaTable.2
            public PCollection<ProducerRecord<byte[], byte[]>> expand(PCollection<Row> pCollection) {
                return pCollection.apply(MapElements.into(new TypeDescriptor<ProducerRecord<byte[], byte[]>>() { // from class: org.apache.beam.sdk.extensions.sql.meta.provider.kafka.NestedPayloadKafkaTable.2.1
                }).via(row -> {
                    return NestedPayloadKafkaTable.this.transformOutput(row);
                }));
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case -1275552094:
                        if (implMethodName.equals("lambda$expand$836dddba$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/sql/meta/provider/kafka/NestedPayloadKafkaTable$2") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/Row;)Lorg/apache/kafka/clients/producer/ProducerRecord;")) {
                            AnonymousClass2 anonymousClass2 = (AnonymousClass2) serializedLambda.getCapturedArg(0);
                            return row -> {
                                return NestedPayloadKafkaTable.this.transformOutput(row);
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    @VisibleForTesting
    ProducerRecord<byte[], byte[]> transformOutput(Row row) {
        ReadableDateTime dateTime;
        Row castRow = Cast.castRow(row, row.getSchema(), this.schema);
        String str = (String) Iterables.getOnlyElement(getTopics());
        byte[] bArr = null;
        ImmutableList of = ImmutableList.of();
        Long l = null;
        if (this.schema.hasField("message_key")) {
            bArr = castRow.getBytes("message_key");
        }
        if (this.schema.hasField("event_timestamp") && (dateTime = castRow.getDateTime("event_timestamp")) != null) {
            l = Long.valueOf(dateTime.getMillis());
        }
        if (this.schema.hasField("headers")) {
            Collection collection = (Collection) org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull(castRow.getArray("headers"));
            ImmutableList.Builder builder = ImmutableList.builder();
            collection.forEach(row2 -> {
                String str2 = (String) org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull(row2.getString("key"));
                ((Collection) org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull(row2.getArray("values"))).forEach(bArr2 -> {
                    builder.add(new RecordHeader(str2, bArr2));
                });
            });
            of = builder.build();
        }
        return new ProducerRecord<>(str, (Integer) null, l, bArr, this.payloadSerializer == null ? castRow.getBytes("payload") : this.payloadSerializer.serialize((Row) org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull(castRow.getRow("payload"))), of);
    }
}
