package com.couchbase.columnar.client.java.internal;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import org.jetbrains.annotations.ApiStatus;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@ApiStatus.Internal
/* loaded from: input_file:com/couchbase/columnar/client/java/internal/ReactorHelper.class */
public class ReactorHelper {
    private static final Object STREAM_END_SENTINEL = new Object();

    private ReactorHelper() {
        throw new AssertionError("not instantiable");
    }

    public static CancellationException propagateAsCancellation(InterruptedException interruptedException) {
        Thread.currentThread().interrupt();
        CancellationException cancellationException = new CancellationException("Thread was interrupted.");
        cancellationException.addSuppressed(interruptedException);
        return cancellationException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void putUninterruptibly(BlockingQueue<T> blockingQueue, T t) {
        boolean interrupted = Thread.interrupted();
        while (true) {
            try {
                boolean z = interrupted;
                blockingQueue.put(t);
                if (z) {
                    Thread.currentThread().interrupt();
                    return;
                }
                return;
            } catch (InterruptedException e) {
                interrupted = true;
            }
        }
    }

    public static <T> void forEachBlocking(Flux<T> flux, int i, Consumer<T> consumer) {
        Object take;
        Semaphore semaphore = new Semaphore(1);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        Disposable subscribe = flux.buffer(i).delayUntil(list -> {
            return Mono.fromRunnable(() -> {
                try {
                    semaphore.acquire();
                    linkedBlockingQueue.put(list);
                } catch (InterruptedException e) {
                    semaphore.release();
                    Thread.currentThread().interrupt();
                }
            }).subscribeOn(Schedulers.boundedElastic());
        }).doOnCancel(() -> {
            putUninterruptibly(linkedBlockingQueue, new CancellationException());
        }).subscribe(list2 -> {
        }, th -> {
            putUninterruptibly(linkedBlockingQueue, th);
        }, () -> {
            putUninterruptibly(linkedBlockingQueue, STREAM_END_SENTINEL);
        });
        while (true) {
            try {
                try {
                    take = linkedBlockingQueue.take();
                    semaphore.release();
                    if (!(take instanceof List)) {
                        break;
                    } else {
                        ((List) take).forEach(consumer);
                    }
                } catch (InterruptedException e) {
                    throw propagateAsCancellation(e);
                }
            } finally {
                subscribe.dispose();
                semaphore.release(1073741823);
            }
        }
        if (take == STREAM_END_SENTINEL) {
            return;
        }
        if (!(take instanceof Throwable)) {
            throw new RuntimeException("Please report this bug in the SDK; Unexpected item in queue: " + String.valueOf(take));
        }
        throw Exceptions.propagate((Throwable) take);
    }
}
