package ai.yda.framework.rag.generator.assistant.openai.streaming;

import ai.yda.framework.rag.core.generator.StreamingGenerator;
import ai.yda.framework.rag.core.model.RagRequest;
import ai.yda.framework.rag.core.model.RagResponse;
import ai.yda.framework.rag.generator.shared.AzureOpenAiAssistantService;
import ai.yda.framework.session.core.ReactiveSessionProvider;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:ai/yda/framework/rag/generator/assistant/openai/streaming/OpenAiAssistantStreamingGenerator.class */
public class OpenAiAssistantStreamingGenerator implements StreamingGenerator<RagRequest, RagResponse> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(OpenAiAssistantStreamingGenerator.class);
    private static final String THREAD_ID_KEY = "threadId";
    private final AzureOpenAiAssistantService assistantService;
    private final String assistantId;
    private final ReactiveSessionProvider sessionProvider;

    public OpenAiAssistantStreamingGenerator(String str, String str2, ReactiveSessionProvider reactiveSessionProvider) {
        this.assistantService = new AzureOpenAiAssistantService(str);
        this.assistantId = str2;
        this.sessionProvider = reactiveSessionProvider;
    }

    public Flux<RagResponse> streamGeneration(RagRequest ragRequest, String str) {
        return this.sessionProvider.get(THREAD_ID_KEY).map((v0) -> {
            return v0.toString();
        }).flatMap(str2 -> {
            return Mono.fromRunnable(() -> {
                this.assistantService.addMessageToThread(str2, ragRequest.getQuery());
            }).subscribeOn(Schedulers.boundedElastic()).thenReturn(str2);
        }).switchIfEmpty(Mono.defer(() -> {
            return Mono.fromCallable(() -> {
                return this.assistantService.createThread(ragRequest.getQuery()).getId();
            }).subscribeOn(Schedulers.boundedElastic()).flatMap(str3 -> {
                return this.sessionProvider.put(THREAD_ID_KEY, str3).thenReturn(str3);
            });
        })).doOnNext(str3 -> {
            log.debug("Thread ID: {}", str3);
        }).flatMapMany(str4 -> {
            return this.assistantService.createRunStream(str4, this.assistantId, str);
        }).map(str5 -> {
            return RagResponse.builder().result(str5).build();
        });
    }
}
