package com.google.cloud.pubsublite.spark;

import com.github.benmanes.caffeine.cache.LocalCacheFactory;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.CursorClient;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.GenericArrayData;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.ByteArray;
import org.apache.spark.unsafe.types.UTF8String;
import repackaged.com.google.common.annotations.VisibleForTesting;
import repackaged.com.google.common.base.Preconditions;
import repackaged.com.google.common.collect.ImmutableListMultimap;
import repackaged.com.google.common.collect.ListMultimap;
import repackaged.com.google.common.flogger.GoogleLogger;
import repackaged.com.google.common.math.LongMath;
import repackaged.com.google.protobuf.ByteString;
import repackaged.com.google.protobuf.util.Timestamps;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.compat.java8.functionConverterImpls.FromJavaBiConsumer;

/* loaded from: input_file:com/google/cloud/pubsublite/spark/PslSparkUtils.class */
public class PslSparkUtils {
    private static final GoogleLogger log = GoogleLogger.forEnclosingClass();

    @VisibleForTesting
    public static ArrayBasedMapData convertAttributesToSparkMap(ListMultimap<String, ByteString> listMultimap) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        listMultimap.asMap().forEach((str, collection) -> {
            arrayList.add(UTF8String.fromString(str));
            arrayList2.add(new GenericArrayData(JavaConverters.asScalaBufferConverter((List) collection.stream().map(byteString -> {
                return ByteArray.concat((byte[][]) new byte[]{byteString.toByteArray()});
            }).collect(Collectors.toList())).asScala()));
        });
        return new ArrayBasedMapData(new GenericArrayData(JavaConverters.asScalaBufferConverter(arrayList).asScala()), new GenericArrayData(JavaConverters.asScalaBufferConverter(arrayList2).asScala()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r5v11, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r5v14, types: [byte[], byte[][]] */
    public static InternalRow toInternalRow(SequencedMessage sequencedMessage, SubscriptionPath subscriptionPath, Partition partition) {
        Serializable[] serializableArr = new Serializable[8];
        serializableArr[0] = UTF8String.fromString(subscriptionPath.toString());
        serializableArr[1] = Long.valueOf(partition.value());
        serializableArr[2] = Long.valueOf(sequencedMessage.offset().value());
        serializableArr[3] = ByteArray.concat((byte[][]) new byte[]{sequencedMessage.message().key().toByteArray()});
        serializableArr[4] = ByteArray.concat((byte[][]) new byte[]{sequencedMessage.message().data().toByteArray()});
        serializableArr[5] = Long.valueOf(Timestamps.toMicros(sequencedMessage.publishTime()));
        serializableArr[6] = sequencedMessage.message().eventTime().isPresent() ? Long.valueOf(Timestamps.toMicros(sequencedMessage.message().eventTime().get())) : null;
        serializableArr[7] = convertAttributesToSparkMap(sequencedMessage.message().attributes());
        return InternalRow.apply((Seq) JavaConverters.asScalaBufferConverter(new ArrayList(Arrays.asList(serializableArr))).asScala());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static <T> void extractVal(StructType structType, InternalRow internalRow, String str, DataType dataType, Consumer<T> consumer) {
        Option fieldIndex = structType.getFieldIndex(str);
        if (fieldIndex.isEmpty()) {
            return;
        }
        consumer.accept(internalRow.get(((Integer) fieldIndex.get()).intValue(), dataType));
    }

    public static Message toPubSubMessage(StructType structType, InternalRow internalRow) {
        Message.Builder builder = Message.builder();
        extractVal(structType, internalRow, LocalCacheFactory.KEY, SparkStructs.PUBLISH_FIELD_TYPES.get(LocalCacheFactory.KEY), bArr -> {
            builder.setKey(ByteString.copyFrom(bArr));
        });
        extractVal(structType, internalRow, "data", SparkStructs.PUBLISH_FIELD_TYPES.get("data"), bArr2 -> {
            builder.setData(ByteString.copyFrom(bArr2));
        });
        extractVal(structType, internalRow, "event_timestamp", SparkStructs.PUBLISH_FIELD_TYPES.get("event_timestamp"), l -> {
            builder.setEventTime(Timestamps.fromMicros(l.longValue()));
        });
        extractVal(structType, internalRow, "attributes", SparkStructs.PUBLISH_FIELD_TYPES.get("attributes"), mapData -> {
            ImmutableListMultimap.Builder builder2 = ImmutableListMultimap.builder();
            mapData.foreach(DataTypes.StringType, SparkStructs.ATTRIBUTES_PER_KEY_DATATYPE, new FromJavaBiConsumer((obj, obj2) -> {
                String uTF8String = ((UTF8String) obj).toString();
                ((ArrayData) obj2).foreach(DataTypes.BinaryType, new FromJavaBiConsumer((obj, obj2) -> {
                    builder2.put((ImmutableListMultimap.Builder) uTF8String, (String) ByteString.copyFrom((byte[]) obj2));
                }));
            }));
            builder.setAttributes(builder2.build());
        });
        return builder.build();
    }

    public static void verifyWriteInputSchema(StructType structType) {
        SparkStructs.PUBLISH_FIELD_TYPES.forEach((str, dataType) -> {
            Option fieldIndex = structType.getFieldIndex(str);
            if (fieldIndex.isEmpty()) {
                ((GoogleLogger.Api) log.atInfo().atMostEvery(5, TimeUnit.MINUTES)).log("Input schema to write to Pub/Sub Lite doesn't contain %s column, this field for all rows will be set to empty.", str);
                return;
            }
            StructField apply = structType.apply(((Integer) fieldIndex.get()).intValue());
            if (!apply.dataType().sameType(dataType)) {
                throw new IllegalArgumentException(String.format("Column %s in input schema to write to Pub/Sub Lite has a wrong DataType. Actual: %s, expected: %s.", str, apply.dataType(), dataType));
            }
        });
    }

    public static SparkSourceOffset toSparkSourceOffset(PslSourceOffset pslSourceOffset) {
        return new SparkSourceOffset((Map) pslSourceOffset.partitionOffsetMap().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return SparkPartitionOffset.builder().partition(Partition.of(((Partition) entry.getKey()).value())).offset(((Offset) entry.getValue()).value() - 1).build();
        })));
    }

    public static PslSourceOffset toPslSourceOffset(SparkSourceOffset sparkSourceOffset) {
        long size = sparkSourceOffset.getPartitionOffsetMap().size();
        HashMap hashMap = new HashMap();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= size) {
                return PslSourceOffset.builder().partitionOffsetMap(hashMap).build();
            }
            Partition of = Partition.of(j2);
            Preconditions.checkArgument(sparkSourceOffset.getPartitionOffsetMap().containsKey(of));
            hashMap.put(of, Offset.of(sparkSourceOffset.getPartitionOffsetMap().get(of).offset() + 1));
            j = j2 + 1;
        }
    }

    public static PslPartitionOffset toPslPartitionOffset(SparkPartitionOffset sparkPartitionOffset) {
        return PslPartitionOffset.builder().partition(sparkPartitionOffset.partition()).offset(Offset.of(sparkPartitionOffset.offset() + 1)).build();
    }

    public static SparkSourceOffset getSparkStartOffset(CursorClient cursorClient, SubscriptionPath subscriptionPath, long j) {
        try {
            HashMap hashMap = new HashMap();
            for (int i = 0; i < j; i++) {
                hashMap.put(Partition.of(i), Offset.of(0L));
            }
            Map<Partition, Offset> map = cursorClient.listPartitionCursors(subscriptionPath).get();
            hashMap.getClass();
            map.forEach((v1, v2) -> {
                r1.replace(v1, v2);
            });
            return toSparkSourceOffset(PslSourceOffset.builder().partitionOffsetMap(hashMap).build());
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException("Failed to get information from PSL and construct startOffset", e);
        }
    }

    public static SparkSourceOffset getSparkEndOffset(SparkSourceOffset sparkSourceOffset, SparkSourceOffset sparkSourceOffset2, long j, long j2) {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < j2; i++) {
            Partition of = Partition.of(i);
            SparkPartitionOffset create = SparkPartitionOffset.create(of, -1L);
            hashMap.put(of, SparkPartitionOffset.create(of, Math.min(LongMath.saturatedAdd(sparkSourceOffset2.getPartitionOffsetMap().getOrDefault(of, create).offset(), j), sparkSourceOffset.getPartitionOffsetMap().getOrDefault(of, create).offset())));
        }
        return new SparkSourceOffset(hashMap);
    }
}
