/*
 * Decompiled with CFR 0.152.
 */
package infra.web.handler.method;

import infra.core.MethodParameter;
import infra.core.ReactiveAdapter;
import infra.core.ReactiveAdapterRegistry;
import infra.core.ResolvableType;
import infra.core.task.SyncTaskExecutor;
import infra.core.task.TaskExecutor;
import infra.http.MediaType;
import infra.http.codec.ServerSentEvent;
import infra.lang.Assert;
import infra.lang.Nullable;
import infra.logging.Logger;
import infra.logging.LoggerFactory;
import infra.util.MimeType;
import infra.util.ObjectUtils;
import infra.web.HandlerMatchingMetadata;
import infra.web.HttpMediaTypeNotAcceptableException;
import infra.web.RequestContext;
import infra.web.accept.ContentNegotiationManager;
import infra.web.async.DeferredResult;
import infra.web.handler.method.ResponseBodyEmitter;
import infra.web.handler.method.SseEmitter;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

final class ReactiveTypeHandler {
    private static final Logger log = LoggerFactory.getLogger(ReactiveTypeHandler.class);
    private static final long STREAMING_TIMEOUT_VALUE = -1L;
    private static final MediaType WILDCARD_SUBTYPE_SUFFIXED_BY_NDJSON = MediaType.valueOf("application/*+x-ndjson");
    private final TaskExecutor taskExecutor;
    private final ReactiveAdapterRegistry adapterRegistry;
    private final ContentNegotiationManager contentNegotiationManager;

    public ReactiveTypeHandler() {
        this(ReactiveAdapterRegistry.getSharedInstance(), (TaskExecutor)new SyncTaskExecutor(), new ContentNegotiationManager());
    }

    public ReactiveTypeHandler(ContentNegotiationManager manager) {
        this(ReactiveAdapterRegistry.getSharedInstance(), (TaskExecutor)new SyncTaskExecutor(), manager);
    }

    public ReactiveTypeHandler(ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager) {
        Assert.notNull((Object)executor, (String)"TaskExecutor is required");
        Assert.notNull((Object)registry, (String)"ReactiveAdapterRegistry is required");
        Assert.notNull((Object)manager, (String)"ContentNegotiationManager is required");
        this.adapterRegistry = registry;
        this.taskExecutor = executor;
        this.contentNegotiationManager = manager;
    }

    public boolean isReactiveType(Class<?> type) {
        return this.adapterRegistry.getAdapter(type) != null;
    }

    @Nullable
    public ResponseBodyEmitter handleValue(Object returnValue, MethodParameter returnType, RequestContext request) throws Exception {
        Assert.notNull((Object)returnValue, (String)"Expected return value");
        ReactiveAdapter adapter = this.adapterRegistry.getAdapter(returnValue.getClass());
        if (adapter == null) {
            throw new IllegalStateException("Unexpected return value: " + returnValue);
        }
        ResolvableType elementType = ResolvableType.forMethodParameter((MethodParameter)returnType).getGeneric(new int[0]);
        Class elementClass = elementType.toClass();
        if (adapter.isMultiValue()) {
            Collection<MediaType> mediaTypes = this.getMediaTypes(request);
            if (mediaTypes.stream().anyMatch(MediaType.TEXT_EVENT_STREAM::includes) || ServerSentEvent.class.isAssignableFrom(elementClass)) {
                SseEmitter emitter = new SseEmitter(-1L);
                new SseEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue);
                return emitter;
            }
            if (CharSequence.class.isAssignableFrom(elementClass)) {
                Optional<MediaType> mediaType = mediaTypes.stream().filter(MimeType::isConcrete).findFirst();
                ResponseBodyEmitter emitter = this.getEmitter(mediaType.orElse(MediaType.TEXT_PLAIN));
                new TextEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue);
                return emitter;
            }
            MediaType streamingResponseType = ReactiveTypeHandler.findConcreteStreamingMediaType(mediaTypes);
            if (streamingResponseType != null) {
                ResponseBodyEmitter emitter = this.getEmitter(streamingResponseType);
                new JsonEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue);
                return emitter;
            }
        }
        DeferredResult<Object> result = new DeferredResult<Object>();
        new DeferredResultSubscriber(result, adapter, elementType).connect(adapter, returnValue);
        request.getAsyncManager().startDeferredResultProcessing(result, new Object[0]);
        return null;
    }

    @Nullable
    static MediaType findConcreteStreamingMediaType(Collection<MediaType> acceptedMediaTypes) {
        for (MediaType acceptedType : acceptedMediaTypes) {
            if (WILDCARD_SUBTYPE_SUFFIXED_BY_NDJSON.includes(acceptedType)) {
                if (acceptedType.isConcrete()) {
                    return acceptedType;
                }
                return MediaType.APPLICATION_NDJSON;
            }
            if (MediaType.APPLICATION_NDJSON.includes(acceptedType)) {
                return MediaType.APPLICATION_NDJSON;
            }
            if (!MediaType.APPLICATION_STREAM_JSON.includes(acceptedType)) continue;
            return MediaType.APPLICATION_STREAM_JSON;
        }
        return null;
    }

    private Collection<MediaType> getMediaTypes(RequestContext request) throws HttpMediaTypeNotAcceptableException {
        Object[] producibleMediaTypes;
        HandlerMatchingMetadata matchingMetadata = request.getMatchingMetadata();
        if (matchingMetadata != null && ObjectUtils.isNotEmpty((Object[])(producibleMediaTypes = matchingMetadata.getProducibleMediaTypes()))) {
            return Arrays.asList(producibleMediaTypes);
        }
        return this.contentNegotiationManager.resolveMediaTypes(request);
    }

    private ResponseBodyEmitter getEmitter(final MediaType mediaType) {
        return new ResponseBodyEmitter(-1L){

            @Override
            protected void extendResponse(RequestContext outputMessage) {
                outputMessage.setContentType(mediaType.toString());
            }
        };
    }

    private static class SseEmitterSubscriber
    extends AbstractEmitterSubscriber {
        SseEmitterSubscriber(SseEmitter sseEmitter, TaskExecutor executor) {
            super(sseEmitter, executor);
        }

        @Override
        protected void send(Object element) throws IOException {
            if (element instanceof ServerSentEvent) {
                ServerSentEvent event = (ServerSentEvent)element;
                ((SseEmitter)this.emitter).send(this.adapt(event));
            } else {
                this.emitter.send(element, MediaType.APPLICATION_JSON);
            }
        }

        private SseEmitter.SseEventBuilder adapt(ServerSentEvent<?> sse) {
            SseEmitter.SseEventBuilder builder = SseEmitter.event();
            String id = sse.id();
            String event = sse.event();
            Duration retry = sse.retry();
            String comment = sse.comment();
            Object data = sse.data();
            if (id != null) {
                builder.id(id);
            }
            if (event != null) {
                builder.name(event);
            }
            if (data != null) {
                builder.data(data);
            }
            if (retry != null) {
                builder.reconnectTime(retry.toMillis());
            }
            if (comment != null) {
                builder.comment(comment);
            }
            return builder;
        }
    }

    private static class TextEmitterSubscriber
    extends AbstractEmitterSubscriber {
        TextEmitterSubscriber(ResponseBodyEmitter emitter, TaskExecutor executor) {
            super(emitter, executor);
        }

        @Override
        protected void send(Object element) throws IOException {
            this.emitter.send(element, MediaType.TEXT_PLAIN);
        }
    }

    private static class JsonEmitterSubscriber
    extends AbstractEmitterSubscriber {
        JsonEmitterSubscriber(ResponseBodyEmitter emitter, TaskExecutor executor) {
            super(emitter, executor);
        }

        @Override
        protected void send(Object element) throws IOException {
            this.emitter.send(element, MediaType.APPLICATION_JSON);
            this.emitter.send("\n", MediaType.TEXT_PLAIN);
        }
    }

    private static class DeferredResultSubscriber
    implements Subscriber<Object> {
        private final DeferredResult<Object> result;
        private final boolean multiValueSource;
        private final CollectedValuesList values;

        DeferredResultSubscriber(DeferredResult<Object> result, ReactiveAdapter adapter, ResolvableType elementType) {
            this.result = result;
            this.multiValueSource = adapter.isMultiValue();
            this.values = new CollectedValuesList(elementType);
        }

        public void connect(ReactiveAdapter adapter, Object returnValue) {
            Publisher publisher = adapter.toPublisher(returnValue);
            publisher.subscribe((Subscriber)this);
        }

        public void onSubscribe(Subscription subscription) {
            this.result.onTimeout(() -> ((Subscription)subscription).cancel());
            subscription.request(Long.MAX_VALUE);
        }

        public void onNext(Object element) {
            this.values.add(element);
        }

        public void onError(Throwable ex) {
            this.result.setErrorResult(ex);
        }

        public void onComplete() {
            if (this.values.size() > 1 || this.multiValueSource) {
                this.result.setResult(this.values);
            } else if (this.values.size() == 1) {
                this.result.setResult(this.values.get(0));
            } else {
                this.result.setResult(null);
            }
        }
    }

    static class CollectedValuesList
    extends ArrayList<Object> {
        private final ResolvableType elementType;

        CollectedValuesList(ResolvableType elementType) {
            this.elementType = elementType;
        }

        public ResolvableType getReturnType() {
            return ResolvableType.forClassWithGenerics(List.class, (ResolvableType[])new ResolvableType[]{this.elementType});
        }
    }

    private static abstract class AbstractEmitterSubscriber
    implements Subscriber<Object>,
    Runnable {
        protected final ResponseBodyEmitter emitter;
        private final TaskExecutor taskExecutor;
        @Nullable
        private Subscription subscription;
        private final AtomicReference<Object> elementRef = new AtomicReference();
        @Nullable
        private Throwable error;
        private volatile boolean terminated;
        private final AtomicLong executing = new AtomicLong();
        private volatile boolean done;

        protected AbstractEmitterSubscriber(ResponseBodyEmitter emitter, TaskExecutor executor) {
            this.emitter = emitter;
            this.taskExecutor = executor;
        }

        public void connect(ReactiveAdapter adapter, Object returnValue) {
            Publisher publisher = adapter.toPublisher(returnValue);
            publisher.subscribe((Subscriber)this);
        }

        public final void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            this.emitter.onTimeout(() -> {
                if (log.isTraceEnabled()) {
                    log.trace("Connection timeout for {}", (Object)this.emitter);
                }
                this.terminate();
                this.emitter.complete();
            });
            this.emitter.onError(this.emitter::completeWithError);
            subscription.request(1L);
        }

        public final void onNext(Object element) {
            this.elementRef.lazySet(element);
            this.trySchedule();
        }

        public final void onError(Throwable ex) {
            this.error = ex;
            this.terminated = true;
            this.trySchedule();
        }

        public final void onComplete() {
            this.terminated = true;
            this.trySchedule();
        }

        private void trySchedule() {
            if (this.executing.getAndIncrement() == 0L) {
                this.schedule();
            }
        }

        private void schedule() {
            try {
                this.taskExecutor.execute((Runnable)this);
            }
            catch (Throwable ex) {
                try {
                    this.terminate();
                }
                finally {
                    this.executing.decrementAndGet();
                    this.elementRef.lazySet(null);
                }
            }
        }

        @Override
        public void run() {
            if (this.done) {
                this.elementRef.lazySet(null);
                return;
            }
            boolean isTerminated = this.terminated;
            Object element = this.elementRef.get();
            if (element != null) {
                this.elementRef.lazySet(null);
                Assert.state((this.subscription != null ? 1 : 0) != 0, (String)"No subscription");
                try {
                    this.send(element);
                    this.subscription.request(1L);
                }
                catch (Throwable ex) {
                    if (log.isTraceEnabled()) {
                        log.trace("Send for {} failed: {}", (Object)this.emitter, (Object)ex);
                    }
                    this.terminate();
                    this.emitter.completeWithError(ex);
                    return;
                }
            }
            if (isTerminated) {
                this.done = true;
                Throwable ex = this.error;
                this.error = null;
                if (ex != null) {
                    if (log.isTraceEnabled()) {
                        log.trace("Publisher for {} failed: {}", (Object)this.emitter, (Object)ex);
                    }
                    this.emitter.completeWithError(ex);
                } else {
                    if (log.isTraceEnabled()) {
                        log.trace("Publisher for {} completed", (Object)this.emitter);
                    }
                    this.emitter.complete();
                }
                return;
            }
            if (this.executing.decrementAndGet() != 0L) {
                this.schedule();
            }
        }

        protected abstract void send(Object var1) throws IOException;

        private void terminate() {
            this.done = true;
            if (this.subscription != null) {
                this.subscription.cancel();
            }
        }
    }
}

