package com.microsoft.azure.spring.integration.core;

import com.microsoft.azure.spring.integration.eventhub.inbound.CheckpointMode;
import com.microsoft.azure.spring.integration.eventhub.inbound.ListenerMode;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.StreamSupport;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/microsoft/azure/spring/integration/core/AbstractInboundChannelAdapter.class */
public abstract class AbstractInboundChannelAdapter<D> extends MessageProducerSupport {
    private static final String DEFAULT_CONSUMER_GROUP = "$Default";
    private final String destination;
    private final SubscribeByGroupOperation<D> subscribeByGroupOperation;
    protected MessageConverter messageConverter;
    protected Map<String, Object> commonHeaders = new HashMap();
    private CheckpointMode checkpointMode = CheckpointMode.RECORD;
    private ListenerMode listenerMode = ListenerMode.RECORD;
    private String consumerGroup;

    public AbstractInboundChannelAdapter(String str, SubscribeByGroupOperation<D> subscribeByGroupOperation, String str2) {
        this.consumerGroup = DEFAULT_CONSUMER_GROUP;
        Assert.hasText(str, "destination can't be null or empty");
        Assert.notNull(subscribeByGroupOperation, "subscribeByGroupOperation can't be null");
        this.destination = str;
        this.subscribeByGroupOperation = subscribeByGroupOperation;
        if (StringUtils.hasText(str2)) {
            this.consumerGroup = str2;
        }
    }

    public void doStart() {
        super.doStart();
        this.subscribeByGroupOperation.subscribe(this.destination, this::receiveMessage, this.consumerGroup);
        if (this.checkpointMode == CheckpointMode.MANUAL) {
            this.commonHeaders.put(AzureHeaders.CHECKPOINTER, this.subscribeByGroupOperation.getCheckpointer(this.destination, this.consumerGroup));
        }
    }

    public void receiveMessage(Iterable<D> iterable) {
        if (this.listenerMode == ListenerMode.BATCH) {
            sendMessage(toMessage((Iterable) iterable));
        } else {
            StreamSupport.stream(iterable.spliterator(), false).forEach(obj -> {
                sendMessage(toMessage((AbstractInboundChannelAdapter<D>) obj));
                if (this.checkpointMode == CheckpointMode.RECORD) {
                    this.subscribeByGroupOperation.getCheckpointer(this.destination, this.consumerGroup).checkpoint(obj);
                }
            });
        }
        if (this.checkpointMode == CheckpointMode.BATCH) {
            this.subscribeByGroupOperation.getCheckpointer(this.destination, this.consumerGroup).checkpoint();
        }
    }

    protected abstract Message<?> toMessage(D d);

    protected abstract Message<?> toMessage(Iterable<D> iterable);

    protected void doStop() {
        this.subscribeByGroupOperation.unsubscribe(this.destination, this::receiveMessage, this.consumerGroup);
        super.doStop();
    }

    public CheckpointMode getCheckpointMode() {
        return this.checkpointMode;
    }

    public void setCheckpointMode(CheckpointMode checkpointMode) {
        this.checkpointMode = checkpointMode;
    }

    public ListenerMode getListenerMode() {
        return this.listenerMode;
    }

    public void setListenerMode(ListenerMode listenerMode) {
        this.listenerMode = listenerMode;
    }

    public MessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }
}
