package com.microsoft.azure.spring.integration.storage.queue;

import com.microsoft.azure.spring.integration.core.AzureCheckpointer;
import com.microsoft.azure.spring.integration.core.AzureHeaders;
import com.microsoft.azure.spring.integration.core.api.CheckpointMode;
import com.microsoft.azure.spring.integration.core.api.PartitionSupplier;
import com.microsoft.azure.spring.integration.storage.queue.converter.StorageQueueMessageConverter;
import com.microsoft.azure.spring.integration.storage.queue.factory.StorageQueueClientFactory;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
import com.microsoft.azure.storage.queue.QueueRequestOptions;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;

/* loaded from: input_file:com/microsoft/azure/spring/integration/storage/queue/StorageQueueTemplate.class */
public class StorageQueueTemplate implements StorageQueueOperation {
    private final StorageQueueClientFactory storageQueueClientFactory;
    private static final int DEFAULT_VISIBILITY_TIMEOUT_IN_SECONDS = 30;
    private int visibilityTimeoutInSeconds = DEFAULT_VISIBILITY_TIMEOUT_IN_SECONDS;
    private Class messagePayloadType = byte[].class;
    private CheckpointMode checkpointMode = CheckpointMode.RECORD;
    protected StorageQueueMessageConverter messageConverter = new StorageQueueMessageConverter();

    public StorageQueueTemplate(@NonNull StorageQueueClientFactory storageQueueClientFactory) {
        this.storageQueueClientFactory = storageQueueClientFactory;
    }

    @Override // com.microsoft.azure.spring.integration.core.api.SendOperation
    public <T> CompletableFuture<Void> sendAsync(String str, @NonNull Message<T> message, PartitionSupplier partitionSupplier) {
        Assert.hasText(str, "destination can't be null or empty");
        CloudQueueMessage fromMessage = this.messageConverter.fromMessage(message, CloudQueueMessage.class);
        return CompletableFuture.runAsync(() -> {
            try {
                this.storageQueueClientFactory.getQueueCreator().apply(str).addMessage(fromMessage);
            } catch (StorageException e) {
                throw new StorageQueueRuntimeException("Failed to add message to cloud queue", e);
            }
        });
    }

    @Override // com.microsoft.azure.spring.integration.storage.queue.StorageQueueOperation
    public CompletableFuture<Message<?>> receiveAsync(String str) {
        return receiveAsync(str, this.visibilityTimeoutInSeconds);
    }

    @Override // com.microsoft.azure.spring.integration.storage.queue.StorageQueueOperation
    public CompletableFuture<Message<?>> receiveAsync(String str, int i) {
        Assert.hasText(str, "destination can't be null or empty");
        return CompletableFuture.supplyAsync(() -> {
            return receiveMessage(str, i);
        });
    }

    private Message<?> receiveMessage(String str, int i) {
        CloudQueue apply = this.storageQueueClientFactory.getQueueCreator().apply(str);
        try {
            CloudQueueMessage retrieveMessage = apply.retrieveMessage(i, (QueueRequestOptions) null, (OperationContext) null);
            HashMap hashMap = new HashMap();
            AzureCheckpointer azureCheckpointer = new AzureCheckpointer(() -> {
                return checkpointMessage(apply, retrieveMessage);
            });
            if (this.checkpointMode == CheckpointMode.RECORD) {
                azureCheckpointer.success();
            } else if (this.checkpointMode == CheckpointMode.MANUAL) {
                hashMap.put(AzureHeaders.CHECKPOINTER, azureCheckpointer);
            }
            Message<?> message = null;
            if (retrieveMessage != null) {
                message = this.messageConverter.toMessage(retrieveMessage, new MessageHeaders(hashMap), this.messagePayloadType);
            }
            return message;
        } catch (StorageException e) {
            throw new StorageQueueRuntimeException("Failed to peek message from cloud queue", e);
        }
    }

    private CompletableFuture<Void> checkpointMessage(CloudQueue cloudQueue, CloudQueueMessage cloudQueueMessage) {
        return CompletableFuture.runAsync(() -> {
            try {
                cloudQueue.deleteMessage(cloudQueueMessage);
            } catch (StorageException e) {
                throw new StorageQueueRuntimeException("Failed to checkpoint message from cloud queue", e);
            }
        });
    }

    @Override // com.microsoft.azure.spring.integration.storage.queue.StorageQueueOperation
    public int getVisibilityTimeoutInSeconds() {
        return this.visibilityTimeoutInSeconds;
    }

    @Override // com.microsoft.azure.spring.integration.storage.queue.StorageQueueOperation
    public void setVisibilityTimeoutInSeconds(int i) {
        this.visibilityTimeoutInSeconds = i;
    }

    @Override // com.microsoft.azure.spring.integration.storage.queue.StorageQueueOperation
    public Class getMessagePayloadType() {
        return this.messagePayloadType;
    }

    @Override // com.microsoft.azure.spring.integration.storage.queue.StorageQueueOperation
    public void setMessagePayloadType(Class cls) {
        this.messagePayloadType = cls;
    }

    @Override // com.microsoft.azure.spring.integration.storage.queue.StorageQueueOperation
    public CheckpointMode getCheckpointMode() {
        return this.checkpointMode;
    }

    @Override // com.microsoft.azure.spring.integration.storage.queue.StorageQueueOperation
    public void setCheckpointMode(CheckpointMode checkpointMode) {
        this.checkpointMode = checkpointMode;
    }

    public StorageQueueMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(StorageQueueMessageConverter storageQueueMessageConverter) {
        this.messageConverter = storageQueueMessageConverter;
    }
}
