package ru.tinkoff.kora.common.util;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import ru.tinkoff.kora.common.Context;

/* loaded from: input_file:ru/tinkoff/kora/common/util/ReactorUtils.class */
public class ReactorUtils {
    private static final AtomicReference<Scheduler> CACHED_ELASTIC = new AtomicReference<>();

    private ReactorUtils() {
    }

    private static Scheduler ioScheduler() {
        Scheduler scheduler = CACHED_ELASTIC.get();
        if (scheduler != null) {
            return scheduler;
        }
        Scheduler newBoundedElastic = Schedulers.newBoundedElastic(Math.min(Math.max(Runtime.getRuntime().availableProcessors(), 2) * 8, 64), Integer.MAX_VALUE, "kora-io-", 60, true);
        if (CACHED_ELASTIC.compareAndSet(null, newBoundedElastic)) {
            return newBoundedElastic;
        }
        newBoundedElastic.dispose();
        return CACHED_ELASTIC.get();
    }

    public static <T> Mono<T> ioMono(Supplier<T> supplier) {
        return Mono.create(monoSink -> {
            ioScheduler().schedule(() -> {
                Context current = Context.current();
                try {
                    try {
                        Context.Reactor.current(monoSink.contextView()).inject();
                        monoSink.success(supplier.get());
                        current.inject();
                    } catch (Throwable th) {
                        monoSink.error(th);
                        current.inject();
                    }
                } catch (Throwable th2) {
                    current.inject();
                    throw th2;
                }
            });
        });
    }

    public static Mono<Void> ioMono(Runnable runnable) {
        return ioMono(() -> {
            runnable.run();
            return null;
        });
    }

    public static Mono<ByteBuffer> toByteBufferMono(Flux<ByteBuffer> flux) {
        return flux.reduce(ByteBuffer.allocate(0), (byteBuffer, byteBuffer2) -> {
            return ByteBuffer.allocate(byteBuffer.remaining() + byteBuffer2.remaining()).put(byteBuffer).put(byteBuffer2).rewind();
        });
    }

    public static Mono<ByteBuffer> toByteBufferMono(Publisher<ByteBuffer> publisher) {
        return Flux.from(publisher).reduce(ByteBuffer.allocate(0), (byteBuffer, byteBuffer2) -> {
            return ByteBuffer.allocate(byteBuffer.remaining() + byteBuffer2.remaining()).put(byteBuffer).put(byteBuffer2).rewind();
        });
    }

    public static Mono<byte[]> toByteArrayMono(Flux<ByteBuffer> flux) {
        return flux.reduce(new byte[0], (bArr, byteBuffer) -> {
            byte[] copyOf = Arrays.copyOf(bArr, bArr.length + byteBuffer.remaining());
            byteBuffer.get(copyOf, bArr.length, byteBuffer.remaining());
            return copyOf;
        });
    }

    public static Mono<byte[]> toByteArrayMono(Flux<ByteBuffer> flux, int i) {
        return flux.reduce(new byte[0], (bArr, byteBuffer) -> {
            if (bArr.length >= i) {
                return bArr;
            }
            byte[] copyOf = Arrays.copyOf(bArr, Math.min(bArr.length + byteBuffer.remaining(), i));
            byteBuffer.get(copyOf, bArr.length, byteBuffer.remaining());
            return copyOf;
        });
    }

    public static Mono<byte[]> toByteArrayMono(Publisher<ByteBuffer> publisher) {
        return Flux.from(publisher).reduce(new byte[0], (bArr, byteBuffer) -> {
            byte[] copyOf = Arrays.copyOf(bArr, bArr.length + byteBuffer.remaining());
            byteBuffer.get(copyOf, bArr.length, byteBuffer.remaining());
            return copyOf;
        });
    }
}
