package com.microsoft.azure.spring.integration.test.support;

import com.microsoft.azure.spring.integration.core.api.CheckpointConfig;
import com.microsoft.azure.spring.integration.core.api.CheckpointMode;
import com.microsoft.azure.spring.integration.core.api.Checkpointer;
import com.microsoft.azure.spring.integration.core.api.SendOperation;
import com.microsoft.azure.spring.integration.test.support.pojo.User;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;

/* loaded from: input_file:com/microsoft/azure/spring/integration/test/support/SendSubscribeOperationTest.class */
public abstract class SendSubscribeOperationTest<T extends SendOperation> {
    protected T sendSubscribeOperation;
    protected String partitionId = "1";
    protected String destination = "test";
    private String payload = "payload";
    private User user = new User(this.payload);
    private Map<String, Object> headers = new HashMap();
    protected Message<User> userMessage = new GenericMessage(this.user, this.headers);
    protected List<Message<User>> messages = (List) IntStream.range(1, 5).mapToObj(String::valueOf).map(User::new).map(user -> {
        return new GenericMessage(user, this.headers);
    }).collect(Collectors.toList());
    private Message<String> stringMessage = new GenericMessage(this.payload, this.headers);
    private Message<byte[]> byteMessage = new GenericMessage(this.payload.getBytes(), this.headers);

    @Test
    public void testSendString() {
        subscribe(this.destination, this::stringHandler, String.class);
        this.sendSubscribeOperation.sendAsync(this.destination, this.stringMessage);
    }

    @Test
    public void testSendByte() {
        subscribe(this.destination, this::byteHandler, byte[].class);
        this.sendSubscribeOperation.sendAsync(this.destination, this.byteMessage);
    }

    @Test
    public void testSendUser() {
        subscribe(this.destination, this::userHandler, User.class);
        this.sendSubscribeOperation.sendAsync(this.destination, this.userMessage);
    }

    @Test
    public void testSendReceiveWithManualCheckpointMode() {
        setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
        subscribe(this.destination, this::manualCheckpointHandler, User.class);
        this.sendSubscribeOperation.sendAsync(this.destination, this.userMessage);
    }

    @Test
    public void testSendReceiveWithRecordCheckpointMode() {
        setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.RECORD).build());
        subscribe(this.destination, this::recordCheckpointHandler, User.class);
        this.messages.forEach(message -> {
            this.sendSubscribeOperation.sendAsync(this.destination, message);
        });
        verifyCheckpointSuccessCalled(this.messages.size());
    }

    private void manualCheckpointHandler(Message<?> message) {
        Assert.assertTrue(message.getHeaders().containsKey("azure_checkpointer"));
        Checkpointer checkpointer = (Checkpointer) message.getHeaders().get("azure_checkpointer", Checkpointer.class);
        Assert.assertNotNull(checkpointer);
        verifyCheckpointSuccess(checkpointer);
        verifyCheckpointFailure(checkpointer);
    }

    private void recordCheckpointHandler(Message<?> message) {
    }

    private void stringHandler(Message<?> message) {
        Assert.assertEquals(this.payload, message.getPayload());
    }

    private void byteHandler(Message<?> message) {
        Assert.assertEquals(this.payload, new String((byte[]) message.getPayload()));
    }

    private void userHandler(Message<?> message) {
        Assert.assertEquals(this.user, message.getPayload());
    }

    @Before
    public abstract void setUp();

    protected abstract void verifyCheckpointSuccessCalled(int i);

    protected abstract void verifyCheckpointBatchSuccessCalled(int i);

    protected abstract void verifyCheckpointFailureCalled(int i);

    protected abstract void subscribe(String str, Consumer<Message<?>> consumer, Class<?> cls);

    protected abstract void setCheckpointConfig(CheckpointConfig checkpointConfig);

    protected void verifyCheckpointSuccess(Checkpointer checkpointer) {
        checkpointer.success();
        verifyCheckpointSuccessCalled(1);
    }

    protected void verifyCheckpointFailure(Checkpointer checkpointer) {
        checkpointer.failure();
        verifyCheckpointFailureCalled(1);
    }
}
