package dev.snowdrop.vertx.http.common;

import io.vertx.core.streams.ReadStream;
import java.util.Objects;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.PropertyAccessor;
import org.springframework.util.ObjectUtils;
import reactor.core.publisher.Flux;

/* loaded from: input_file:BOOT-INF/lib/vertx-spring-boot-starter-http-1.4.1.jar:dev/snowdrop/vertx/http/common/ReadStreamFluxBuilder.class */
public class ReadStreamFluxBuilder<T, R> {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private ReadStream<T> readStream;
    private Function<T, R> dataConverter;

    public ReadStreamFluxBuilder<T, R> readStream(ReadStream<T> readStream) {
        this.readStream = readStream;
        return this;
    }

    public ReadStreamFluxBuilder<T, R> dataConverter(Function<T, R> function) {
        this.dataConverter = function;
        return this;
    }

    public Flux<R> build() {
        Objects.requireNonNull(this.readStream, "Read stream is required");
        Objects.requireNonNull(this.dataConverter, "Data converter is required");
        this.readStream.pause2();
        return Flux.create(fluxSink -> {
            String str = PropertyAccessor.PROPERTY_KEY_PREFIX + ObjectUtils.getIdentityHexString(this.readStream) + "] ";
            this.readStream.handler2(obj -> {
                this.logger.debug("{}Received '{}'", str, obj);
                fluxSink.next(this.dataConverter.apply(obj));
            }).exceptionHandler(th -> {
                this.logger.debug("{}Received exception '{}'", str, th.toString());
                fluxSink.error(th);
            }).endHandler(r7 -> {
                this.logger.debug("{}Read stream ended", str);
                fluxSink.complete();
            });
            fluxSink.onRequest(j -> {
                this.logger.debug("{} Fetching '{}' entries from a read stream", str, Long.valueOf(j));
                this.readStream.fetch2(j);
            });
        });
    }
}
