package org.apache.camel.component.rxjava.engine;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.MulticastProcessor;
import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
import org.apache.camel.component.reactive.streams.ReactiveStreamsDiscardedException;
import org.apache.camel.component.reactive.streams.ReactiveStreamsHelper;
import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
import org.apache.camel.util.ObjectHelper;
import org.reactivestreams.Publisher;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.class */
public final class RxJavaCamelProcessor implements Closeable {
    private final String name;
    private final RxJavaStreamsService service;
    private ReactiveStreamsProducer camelProducer = null;
    private final AtomicReference<FlowableEmitter<Exchange>> camelEmitter = new AtomicReference<>();
    private final FlowableProcessor<Exchange> publisher = MulticastProcessor.create(1);

    /* JADX INFO: Access modifiers changed from: package-private */
    public RxJavaCamelProcessor(RxJavaStreamsService rxJavaStreamsService, String str) {
        this.service = rxJavaStreamsService;
        this.name = str;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        detach();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Publisher<Exchange> getPublisher() {
        return this.publisher;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void attach(ReactiveStreamsProducer reactiveStreamsProducer) {
        Objects.requireNonNull(reactiveStreamsProducer, "producer cannot be null, use the detach method");
        if (this.camelProducer != null) {
            throw new IllegalStateException("A producer is already attached to the stream '" + this.name + "'");
        }
        if (this.camelProducer != reactiveStreamsProducer) {
            detach();
            ReactiveStreamsBackpressureStrategy backpressureStrategy = reactiveStreamsProducer.getEndpoint().getBackpressureStrategy();
            AtomicReference<FlowableEmitter<Exchange>> atomicReference = this.camelEmitter;
            Objects.requireNonNull(atomicReference);
            Flowable create = Flowable.create((v1) -> {
                r0.set(v1);
            }, BackpressureStrategy.MISSING);
            if (ObjectHelper.equal(backpressureStrategy, ReactiveStreamsBackpressureStrategy.OLDEST)) {
                create.onBackpressureDrop(this::onBackPressure).doAfterNext(this::onItemEmitted).subscribe(this.publisher);
            } else if (ObjectHelper.equal(backpressureStrategy, ReactiveStreamsBackpressureStrategy.LATEST)) {
                create.doAfterNext(this::onItemEmitted).onBackpressureLatest().subscribe(this.publisher);
            } else {
                create.doAfterNext(this::onItemEmitted).onBackpressureBuffer().subscribe(this.publisher);
            }
            this.camelProducer = reactiveStreamsProducer;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void detach() {
        this.camelProducer = null;
        this.camelEmitter.set(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void send(Exchange exchange) {
        if (this.service.isRunAllowed()) {
            ((FlowableEmitter) ObjectHelper.notNull(this.camelEmitter.get(), "FlowableEmitter")).onNext(exchange);
        }
    }

    private void onItemEmitted(Exchange exchange) {
        if (this.service.isRunAllowed()) {
            ReactiveStreamsHelper.invokeDispatchCallback(exchange);
        }
    }

    private void onBackPressure(Exchange exchange) {
        ReactiveStreamsHelper.invokeDispatchCallback(exchange, new ReactiveStreamsDiscardedException("Discarded by back pressure strategy", exchange, this.name));
    }
}
