package org.apache.pulsar.reactive.client.internal.adapter;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.reactive.client.api.MessageSendResult;
import org.apache.pulsar.reactive.client.api.MessageSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSendingException;
import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSender.class */
class AdaptedReactiveMessageSender<T> implements ReactiveMessageSender<T> {
    private final Schema<T> schema;
    private final ReactiveMessageSenderSpec senderSpec;
    private final int maxConcurrency;
    private final ReactiveProducerAdapterFactory reactiveProducerAdapterFactory;
    private final ProducerCache producerCache;
    private final Supplier<PublisherTransformer> producerActionTransformer;
    private final Object producerActionTransformerKey;
    private final boolean stopOnError;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AdaptedReactiveMessageSender(Schema<T> schema, ReactiveMessageSenderSpec reactiveMessageSenderSpec, int i, ReactiveProducerAdapterFactory reactiveProducerAdapterFactory, ProducerCache producerCache, Supplier<PublisherTransformer> supplier, Object obj, boolean z) {
        this.schema = schema;
        this.senderSpec = reactiveMessageSenderSpec;
        this.maxConcurrency = i;
        this.reactiveProducerAdapterFactory = reactiveProducerAdapterFactory;
        this.producerCache = producerCache;
        this.producerActionTransformer = supplier;
        this.producerActionTransformerKey = obj;
        this.stopOnError = z;
    }

    ReactiveProducerAdapter<T> createReactiveProducerAdapter() {
        return this.reactiveProducerAdapterFactory.create(pulsarClient -> {
            ProducerBuilder<T> newProducer = pulsarClient.newProducer(this.schema);
            configureProducerBuilder(newProducer);
            return newProducer;
        }, this.producerCache, this.producerActionTransformer, this.producerActionTransformerKey);
    }

    private void configureProducerBuilder(ProducerBuilder<T> producerBuilder) {
        if (this.senderSpec.getTopicName() != null) {
            producerBuilder.topic(this.senderSpec.getTopicName());
        }
        if (this.senderSpec.getProducerName() != null) {
            producerBuilder.producerName(this.senderSpec.getProducerName());
        }
        if (this.senderSpec.getSendTimeout() != null) {
            producerBuilder.sendTimeout((int) (this.senderSpec.getSendTimeout().toMillis() / 1000), TimeUnit.SECONDS);
        }
        if (this.senderSpec.getMaxPendingMessages() != null) {
            producerBuilder.maxPendingMessages(this.senderSpec.getMaxPendingMessages().intValue());
        }
        if (this.senderSpec.getMaxPendingMessagesAcrossPartitions() != null) {
            producerBuilder.maxPendingMessagesAcrossPartitions(this.senderSpec.getMaxPendingMessagesAcrossPartitions().intValue());
        }
        if (this.senderSpec.getMessageRoutingMode() != null) {
            producerBuilder.messageRoutingMode(this.senderSpec.getMessageRoutingMode());
        }
        if (this.senderSpec.getHashingScheme() != null) {
            producerBuilder.hashingScheme(this.senderSpec.getHashingScheme());
        }
        if (this.senderSpec.getCryptoFailureAction() != null) {
            producerBuilder.cryptoFailureAction(this.senderSpec.getCryptoFailureAction());
        }
        if (this.senderSpec.getMessageRouter() != null) {
            producerBuilder.messageRouter(this.senderSpec.getMessageRouter());
        }
        if (this.senderSpec.getBatchingMaxPublishDelay() != null) {
            producerBuilder.batchingMaxPublishDelay(this.senderSpec.getBatchingMaxPublishDelay().toNanos(), TimeUnit.NANOSECONDS);
        }
        if (this.senderSpec.getRoundRobinRouterBatchingPartitionSwitchFrequency() != null) {
            producerBuilder.roundRobinRouterBatchingPartitionSwitchFrequency(this.senderSpec.getRoundRobinRouterBatchingPartitionSwitchFrequency().intValue());
        }
        if (this.senderSpec.getBatchingMaxMessages() != null) {
            producerBuilder.batchingMaxMessages(this.senderSpec.getBatchingMaxMessages().intValue());
        }
        if (this.senderSpec.getBatchingMaxBytes() != null) {
            producerBuilder.batchingMaxBytes(this.senderSpec.getBatchingMaxBytes().intValue());
        }
        if (this.senderSpec.getBatchingEnabled() != null) {
            producerBuilder.enableBatching(this.senderSpec.getBatchingEnabled().booleanValue());
        }
        if (this.senderSpec.getBatcherBuilder() != null) {
            producerBuilder.batcherBuilder(this.senderSpec.getBatcherBuilder());
        }
        if (this.senderSpec.getChunkingEnabled() != null) {
            producerBuilder.enableChunking(this.senderSpec.getChunkingEnabled().booleanValue());
        }
        if (this.senderSpec.getCryptoKeyReader() != null) {
            producerBuilder.cryptoKeyReader(this.senderSpec.getCryptoKeyReader());
        }
        if (this.senderSpec.getEncryptionKeys() != null && !this.senderSpec.getEncryptionKeys().isEmpty()) {
            Set encryptionKeys = this.senderSpec.getEncryptionKeys();
            Objects.requireNonNull(producerBuilder);
            encryptionKeys.forEach(producerBuilder::addEncryptionKey);
        }
        if (this.senderSpec.getCompressionType() != null) {
            producerBuilder.compressionType(this.senderSpec.getCompressionType());
        }
        if (this.senderSpec.getInitialSequenceId() != null) {
            producerBuilder.initialSequenceId(this.senderSpec.getInitialSequenceId().longValue());
        }
        if (this.senderSpec.getAutoUpdatePartitions() != null) {
            producerBuilder.autoUpdatePartitions(this.senderSpec.getAutoUpdatePartitions().booleanValue());
        }
        if (this.senderSpec.getAutoUpdatePartitionsInterval() != null) {
            producerBuilder.autoUpdatePartitionsInterval((int) (this.senderSpec.getAutoUpdatePartitionsInterval().toMillis() / 1000), TimeUnit.SECONDS);
        }
        if (this.senderSpec.getMultiSchema() != null) {
            producerBuilder.enableMultiSchema(this.senderSpec.getMultiSchema().booleanValue());
        }
        if (this.senderSpec.getAccessMode() != null) {
            producerBuilder.accessMode(this.senderSpec.getAccessMode());
        }
        if (this.senderSpec.getLazyStartPartitionedProducers() != null) {
            producerBuilder.enableLazyStartPartitionedProducers(this.senderSpec.getLazyStartPartitionedProducers().booleanValue());
        }
        if (this.senderSpec.getProperties() == null || this.senderSpec.getProperties().isEmpty()) {
            return;
        }
        producerBuilder.properties(Collections.unmodifiableMap(new LinkedHashMap(this.senderSpec.getProperties())));
    }

    public Mono<MessageId> sendOne(MessageSpec<T> messageSpec) {
        return createReactiveProducerAdapter().usingProducer((producer, publisherTransformer) -> {
            return createMessageMono(messageSpec, producer, publisherTransformer);
        });
    }

    private Mono<MessageSendResult<T>> createMessageSendResult(MessageSpec<T> messageSpec, Producer<T> producer, PublisherTransformer publisherTransformer) {
        Mono map = createMessageMono(messageSpec, producer, publisherTransformer).map(messageId -> {
            return new MessageSendResult(messageId, messageSpec, (Throwable) null);
        });
        return this.stopOnError ? map.onErrorResume(th -> {
            return Mono.error(new ReactiveMessageSendingException(th, messageSpec));
        }) : map.onErrorResume(th2 -> {
            return Mono.just(new MessageSendResult((MessageId) null, messageSpec, th2));
        });
    }

    private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec, Producer<T> producer, PublisherTransformer publisherTransformer) {
        Mono adaptPulsarFuture = PulsarFutureAdapter.adaptPulsarFuture(() -> {
            TypedMessageBuilder newMessage = producer.newMessage();
            ((InternalMessageSpec) messageSpec).configure(newMessage);
            return newMessage.sendAsync();
        });
        Objects.requireNonNull(publisherTransformer);
        return adaptPulsarFuture.transform((v1) -> {
            return r1.transform(v1);
        });
    }

    public Flux<MessageSendResult<T>> sendMany(Publisher<MessageSpec<T>> publisher) {
        return (Flux<MessageSendResult<T>>) createReactiveProducerAdapter().usingProducerMany((producer, publisherTransformer) -> {
            return Flux.from(publisher).flatMapSequential(messageSpec -> {
                return createMessageSendResult(messageSpec, producer, publisherTransformer);
            }, this.maxConcurrency);
        });
    }
}
