package com.microsoft.semantickernel.connectors.data.redis;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.microsoft.semantickernel.connectors.data.redis.RedisVectorStoreRecordMapper;
import com.microsoft.semantickernel.data.VectorStoreRecordCollection;
import com.microsoft.semantickernel.data.VectorStoreRecordMapper;
import com.microsoft.semantickernel.data.recorddefinition.VectorStoreRecordDefinition;
import com.microsoft.semantickernel.data.recordoptions.DeleteRecordOptions;
import com.microsoft.semantickernel.data.recordoptions.GetRecordOptions;
import com.microsoft.semantickernel.data.recordoptions.UpsertRecordOptions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.json.JSONArray;
import org.json.JSONObject;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import redis.clients.jedis.JedisPooled;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.json.Path2;
import redis.clients.jedis.search.IndexDefinition;
import redis.clients.jedis.search.IndexOptions;

/* loaded from: input_file:com/microsoft/semantickernel/connectors/data/redis/RedisVectorStoreRecordCollection.class */
public class RedisVectorStoreRecordCollection<Record> implements VectorStoreRecordCollection<String, Record> {
    private static final HashSet<Class<?>> supportedKeyTypes = new HashSet<>(Collections.singletonList(String.class));
    private static final HashSet<Class<?>> supportedVectorTypes = new HashSet<>(Arrays.asList(List.class, Collection.class));
    private final JedisPooled client;
    private final String collectionName;
    private final RedisVectorStoreRecordCollectionOptions<Record> options;
    private final VectorStoreRecordMapper<Record, Map.Entry<String, Object>> vectorStoreRecordMapper;
    private final VectorStoreRecordDefinition recordDefinition;
    private final Path2[] dataFields;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public RedisVectorStoreRecordCollection(@Nonnull JedisPooled jedisPooled, @Nonnull String str, @Nonnull RedisVectorStoreRecordCollectionOptions<Record> redisVectorStoreRecordCollectionOptions) {
        this.client = jedisPooled;
        this.collectionName = str;
        this.options = redisVectorStoreRecordCollectionOptions;
        if (redisVectorStoreRecordCollectionOptions.getRecordDefinition() == null) {
            this.recordDefinition = VectorStoreRecordDefinition.fromRecordClass(redisVectorStoreRecordCollectionOptions.getRecordClass());
        } else {
            this.recordDefinition = redisVectorStoreRecordCollectionOptions.getRecordDefinition();
        }
        VectorStoreRecordDefinition.validateSupportedKeyTypes(redisVectorStoreRecordCollectionOptions.getRecordClass(), this.recordDefinition, supportedKeyTypes);
        VectorStoreRecordDefinition.validateSupportedVectorTypes(redisVectorStoreRecordCollectionOptions.getRecordClass(), this.recordDefinition, supportedVectorTypes);
        if (redisVectorStoreRecordCollectionOptions.getVectorStoreRecordMapper() == null) {
            this.vectorStoreRecordMapper = new RedisVectorStoreRecordMapper.Builder().withKeyFieldName(this.recordDefinition.getKeyField().getName()).withRecordClass(redisVectorStoreRecordCollectionOptions.getRecordClass()).m6build();
        } else {
            this.vectorStoreRecordMapper = redisVectorStoreRecordCollectionOptions.getVectorStoreRecordMapper();
        }
        this.dataFields = (Path2[]) this.recordDefinition.getDataFields().stream().map((v0) -> {
            return v0.getName();
        }).map(Path2::new).toArray(i -> {
            return new Path2[i];
        });
    }

    @Override // com.microsoft.semantickernel.data.VectorStoreRecordCollection
    public String getCollectionName() {
        return this.collectionName;
    }

    @Override // com.microsoft.semantickernel.data.VectorStoreRecordCollection
    public Mono<Boolean> collectionExistsAsync() {
        return Mono.fromCallable(() -> {
            try {
                Map ftInfo = this.client.ftInfo(this.collectionName);
                return Boolean.valueOf((ftInfo == null || ftInfo.isEmpty()) ? false : true);
            } catch (Exception e) {
                if (e instanceof JedisDataException) {
                    return false;
                }
                throw e;
            }
        }).subscribeOn(Schedulers.boundedElastic());
    }

    @Override // com.microsoft.semantickernel.data.VectorStoreRecordCollection
    public Mono<Void> createCollectionAsync() {
        return Mono.fromRunnable(() -> {
            this.client.ftCreate(this.collectionName, IndexOptions.defaultOptions().setDefinition(new IndexDefinition(IndexDefinition.Type.JSON).setPrefixes(new String[]{this.collectionName + ":"})), RedisVectorStoreCollectionCreateMapping.mapToSchema(this.recordDefinition.getAllFields()));
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }

    @Override // com.microsoft.semantickernel.data.VectorStoreRecordCollection
    public Mono<Void> createCollectionIfNotExistsAsync() {
        return collectionExistsAsync().flatMap(bool -> {
            return !bool.booleanValue() ? createCollectionAsync() : Mono.empty();
        });
    }

    @Override // com.microsoft.semantickernel.data.VectorStoreRecordCollection
    public Mono<Void> deleteCollectionAsync() {
        return Mono.fromRunnable(() -> {
            this.client.ftDropIndex(this.collectionName);
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }

    private String getRedisKey(String str, String str2) {
        return this.options.isPrefixCollectionName() ? str2 + ":" + str : str;
    }

    private JsonNode removeRedisPathPrefix(JSONObject jSONObject) {
        ObjectNode createObjectNode = this.objectMapper.createObjectNode();
        jSONObject.keySet().forEach(str -> {
            String str = str;
            if (str.startsWith("$.")) {
                str = str.substring(2);
            }
            createObjectNode.set(str, this.objectMapper.valueToTree(((JSONArray) jSONObject.get(str)).get(0)));
        });
        return createObjectNode;
    }

    @Override // com.microsoft.semantickernel.data.VectorStoreRecordCollection
    public Mono<Record> getAsync(String str, GetRecordOptions getRecordOptions) {
        String redisKey = getRedisKey(str, this.collectionName);
        return Mono.defer(() -> {
            Object jsonGet;
            if (getRecordOptions != null) {
                try {
                    if (!getRecordOptions.includeVectors()) {
                        jsonGet = this.client.jsonGet(redisKey, this.dataFields);
                        if (jsonGet != null) {
                            return Mono.empty();
                        }
                        return Mono.just(this.vectorStoreRecordMapper.mapStorageModeltoRecord(new AbstractMap.SimpleEntry(str, (getRecordOptions == null || getRecordOptions.includeVectors()) ? this.objectMapper.valueToTree(jsonGet) : removeRedisPathPrefix((JSONObject) jsonGet))));
                    }
                } catch (Exception e) {
                    return Mono.error(e);
                }
            }
            jsonGet = this.client.jsonGet(redisKey);
            if (jsonGet != null) {
            }
        }).subscribeOn(Schedulers.boundedElastic());
    }

    @Override // com.microsoft.semantickernel.data.VectorStoreRecordCollection
    public Mono<List<Record>> getBatchAsync(List<String> list, GetRecordOptions getRecordOptions) {
        Pipeline pipelined = this.client.pipelined();
        ArrayList arrayList = new ArrayList(list.size());
        list.forEach(str -> {
            String redisKey = getRedisKey(str, this.collectionName);
            if (getRecordOptions == null || getRecordOptions.includeVectors()) {
                arrayList.add(new AbstractMap.SimpleEntry(str, pipelined.jsonGet(redisKey)));
            } else {
                arrayList.add(new AbstractMap.SimpleEntry(str, pipelined.jsonGet(redisKey, this.dataFields)));
            }
        });
        return Mono.defer(() -> {
            pipelined.sync();
            try {
                return Mono.just((List) arrayList.stream().map(entry -> {
                    Object obj = ((Response) entry.getValue()).get();
                    if (obj == null) {
                        return null;
                    }
                    return this.vectorStoreRecordMapper.mapStorageModeltoRecord(new AbstractMap.SimpleEntry((String) entry.getKey(), (getRecordOptions == null || getRecordOptions.includeVectors()) ? this.objectMapper.valueToTree(obj) : removeRedisPathPrefix((JSONObject) obj)));
                }).collect(Collectors.toList()));
            } catch (Exception e) {
                return Mono.error(e);
            }
        }).subscribeOn(Schedulers.boundedElastic());
    }

    @Override // com.microsoft.semantickernel.data.VectorStoreRecordCollection
    public Mono<String> upsertAsync(Record record, UpsertRecordOptions upsertRecordOptions) {
        Map.Entry<String, Object> mapRecordToStorageModel = this.vectorStoreRecordMapper.mapRecordToStorageModel(record);
        String redisKey = getRedisKey(mapRecordToStorageModel.getKey(), this.collectionName);
        return Mono.fromRunnable(() -> {
            this.client.jsonSet(redisKey, mapRecordToStorageModel.getValue());
        }).subscribeOn(Schedulers.boundedElastic()).thenReturn(mapRecordToStorageModel.getKey());
    }

    @Override // com.microsoft.semantickernel.data.VectorStoreRecordCollection
    public Mono<List<String>> upsertBatchAsync(List<Record> list, UpsertRecordOptions upsertRecordOptions) {
        Pipeline pipelined = this.client.pipelined();
        ArrayList arrayList = new ArrayList(list.size());
        list.forEach(obj -> {
            Map.Entry<String, Object> mapRecordToStorageModel = this.vectorStoreRecordMapper.mapRecordToStorageModel(obj);
            String redisKey = getRedisKey(mapRecordToStorageModel.getKey(), this.collectionName);
            arrayList.add(mapRecordToStorageModel.getKey());
            pipelined.jsonSet(redisKey, mapRecordToStorageModel.getValue());
        });
        Objects.requireNonNull(pipelined);
        return Mono.fromRunnable(pipelined::sync).subscribeOn(Schedulers.boundedElastic()).thenReturn(arrayList);
    }

    @Override // com.microsoft.semantickernel.data.VectorStoreRecordCollection
    public Mono<Void> deleteAsync(String str, DeleteRecordOptions deleteRecordOptions) {
        String redisKey = getRedisKey(str, this.collectionName);
        return Mono.fromRunnable(() -> {
            this.client.del(redisKey);
        }).subscribeOn(Schedulers.boundedElastic()).then();
    }

    @Override // com.microsoft.semantickernel.data.VectorStoreRecordCollection
    public Mono<Void> deleteBatchAsync(List<String> list, DeleteRecordOptions deleteRecordOptions) {
        Pipeline pipelined = this.client.pipelined();
        list.forEach(str -> {
            pipelined.del(getRedisKey(str, this.collectionName));
        });
        Objects.requireNonNull(pipelined);
        return Mono.fromRunnable(pipelined::sync).subscribeOn(Schedulers.boundedElastic()).then();
    }
}
