package org.apache.camel.component.reactive.streams.engine;

import java.io.Closeable;
import java.io.IOException;
import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/camel-reactive-streams-2.19.1.jar:org/apache/camel/component/reactive/streams/engine/CamelSubscriber.class */
public class CamelSubscriber implements Subscriber<Exchange>, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CamelSubscriber.class);
    private static final long MAX_INFLIGHT_UNBOUNDED = 4611686018427387903L;
    private ReactiveStreamsConsumer consumer;
    private Subscription subscription;
    private String name;
    private long requested;
    private long inflightCount;

    public CamelSubscriber(String str) {
        this.name = str;
    }

    public void attachConsumer(ReactiveStreamsConsumer reactiveStreamsConsumer) {
        synchronized (this) {
            if (this.consumer != null) {
                throw new IllegalStateException("A consumer is already attached to the stream '" + this.name + "'");
            }
            this.consumer = reactiveStreamsConsumer;
        }
        refill();
    }

    public synchronized ReactiveStreamsConsumer getConsumer() {
        return this.consumer;
    }

    public void detachConsumer() {
        synchronized (this) {
            this.consumer = null;
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        if (subscription == null) {
            throw new NullPointerException("subscription is null for stream '" + this.name + "'");
        }
        boolean z = true;
        synchronized (this) {
            if (this.subscription != null) {
                z = false;
            } else {
                this.subscription = subscription;
            }
        }
        if (z) {
            refill();
        } else {
            LOG.warn("There is another active subscription: cancelled");
            subscription.cancel();
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(Exchange exchange) {
        ReactiveStreamsConsumer reactiveStreamsConsumer;
        if (exchange == null) {
            throw new NullPointerException("exchange is null");
        }
        synchronized (this) {
            this.requested--;
            reactiveStreamsConsumer = this.consumer;
            if (reactiveStreamsConsumer != null) {
                this.inflightCount++;
            }
        }
        if (reactiveStreamsConsumer != null) {
            reactiveStreamsConsumer.process(exchange, z -> {
                synchronized (this) {
                    this.inflightCount--;
                }
                refill();
            });
        } else {
            LOG.warn("Message received in stream '{}', but no consumers were attached. Discarding {}.", this.name, exchange);
        }
    }

    protected void refill() {
        Long l = null;
        Subscription subscription = null;
        synchronized (this) {
            if (this.consumer != null && this.subscription != null) {
                Integer maxInflightExchanges = this.consumer.getEndpoint().getMaxInflightExchanges();
                long longValue = (((maxInflightExchanges == null || maxInflightExchanges.intValue() <= 0) ? MAX_INFLIGHT_UNBOUNDED : maxInflightExchanges.longValue()) - this.requested) - this.inflightCount;
                if (longValue > 0) {
                    l = Long.valueOf(longValue);
                    this.requested += l.longValue();
                    subscription = this.subscription;
                }
            }
        }
        if (l != null) {
            subscription.request(l.longValue());
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        ReactiveStreamsConsumer reactiveStreamsConsumer;
        if (th == null) {
            throw new NullPointerException("throwable is null");
        }
        LOG.error("Error in reactive stream '" + this.name + "'", th);
        synchronized (this) {
            reactiveStreamsConsumer = this.consumer;
            this.subscription = null;
        }
        if (reactiveStreamsConsumer != null) {
            reactiveStreamsConsumer.onError(th);
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        ReactiveStreamsConsumer reactiveStreamsConsumer;
        LOG.info("Reactive stream '{}' completed", this.name);
        synchronized (this) {
            reactiveStreamsConsumer = this.consumer;
            this.subscription = null;
        }
        if (reactiveStreamsConsumer != null) {
            reactiveStreamsConsumer.onComplete();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Subscription subscription;
        synchronized (this) {
            subscription = this.subscription;
        }
        if (subscription != null) {
            subscription.cancel();
        }
    }

    public long getRequested() {
        return this.requested;
    }

    public long getInflightCount() {
        return this.inflightCount;
    }

    public long getBufferSize() {
        if (this.subscription == null || !(this.subscription instanceof CamelSubscription)) {
            return 0L;
        }
        return ((CamelSubscription) this.subscription).getBufferSize();
    }

    public ReactiveStreamsBackpressureStrategy getBackpressureStrategy() {
        if (this.subscription == null || !(this.subscription instanceof CamelSubscription)) {
            return null;
        }
        return ((CamelSubscription) this.subscription).getBackpressureStrategy();
    }
}
