package com.microsoft.azure.spring.integration.test.support.rx;

import com.microsoft.azure.spring.integration.core.api.CheckpointConfig;
import com.microsoft.azure.spring.integration.core.api.CheckpointMode;
import com.microsoft.azure.spring.integration.core.api.RxSendOperation;
import com.microsoft.azure.spring.integration.test.support.pojo.User;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import rx.Observable;
import rx.observers.AssertableSubscriber;

/* loaded from: input_file:com/microsoft/azure/spring/integration/test/support/rx/RxSendSubscribeOperationTest.class */
public abstract class RxSendSubscribeOperationTest<T extends RxSendOperation> {
    protected T sendSubscribeOperation;
    protected String partitionId = "1";
    protected String destination = "test";
    private String payload = "payload";
    private User user = new User(this.payload);
    private Map<String, Object> headers = new HashMap();
    protected Message<User> userMessage = new GenericMessage(this.user, this.headers);
    protected Message[] messages = (Message[]) IntStream.range(1, 5).mapToObj(String::valueOf).map(User::new).map(user -> {
        return new GenericMessage(user, this.headers);
    }).toArray(i -> {
        return new Message[i];
    });
    private Message<String> stringMessage = new GenericMessage(this.payload, this.headers);
    private Message<byte[]> byteMessage = new GenericMessage(this.payload.getBytes(), this.headers);

    @Test
    public void testSendString() {
        AssertableSubscriber test = subscribe(this.destination, String.class).map((v0) -> {
            return v0.getPayload();
        }).cast(String.class).test();
        this.sendSubscribeOperation.sendRx(this.destination, this.stringMessage);
        test.assertValue(this.payload).assertNoErrors();
    }

    @Test
    public void testSendByte() {
        AssertableSubscriber test = subscribe(this.destination, byte[].class).map((v0) -> {
            return v0.getPayload();
        }).cast(byte[].class).map(String::new).test();
        this.sendSubscribeOperation.sendRx(this.destination, this.byteMessage);
        test.assertValue(this.payload).assertNoErrors();
    }

    @Test
    public void testSendUser() {
        AssertableSubscriber test = subscribe(this.destination, User.class).map((v0) -> {
            return v0.getPayload();
        }).cast(User.class).test();
        this.sendSubscribeOperation.sendRx(this.destination, this.userMessage);
        test.assertValue(this.user).assertNoErrors();
    }

    @Test
    public void testSendReceiveWithManualCheckpointMode() {
        setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
        AssertableSubscriber test = subscribe(this.destination, User.class).map((v0) -> {
            return v0.getPayload();
        }).cast(User.class).test();
        this.sendSubscribeOperation.sendRx(this.destination, this.userMessage);
        test.assertValue(this.user).assertNoErrors();
        verifyCheckpointSuccessCalled(0);
    }

    @Test
    public void testSendReceiveWithRecordCheckpointMode() {
        setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.RECORD).build());
        AssertableSubscriber test = subscribe(this.destination, User.class).map((v0) -> {
            return v0.getPayload();
        }).cast(User.class).test();
        Arrays.stream(this.messages).forEach(message -> {
            this.sendSubscribeOperation.sendRx(this.destination, message);
        });
        test.assertValueCount(this.messages.length).assertNoErrors();
        verifyCheckpointSuccessCalled(this.messages.length);
    }

    @Before
    public abstract void setUp();

    protected abstract void verifyCheckpointSuccessCalled(int i);

    protected abstract void verifyCheckpointBatchSuccessCalled(int i);

    protected abstract Observable<Message<?>> subscribe(String str, Class<?> cls);

    protected abstract void setCheckpointConfig(CheckpointConfig checkpointConfig);
}
