/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.transaction;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.tests.integration.transaction.TransactionTestBase;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TransactionTest
extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionTest.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="ServiceUrls")
    public void transferNormalTest(Supplier<String> serviceUrl) throws Exception {
        log.info("transfer normal test start.");
        PulsarClient pulsarClient = PulsarClient.builder().enableTransaction(true).serviceUrl(serviceUrl.get()).build();
        try {
            int transferCount = 20;
            String transferTopic = "transfer-" + TransactionTest.randomName(6);
            String balanceUpdateTopic = "balance-update-" + TransactionTest.randomName(6);
            Producer transferProducer = pulsarClient.newProducer(Schema.JSON(TransactionTestBase.TransferOperation.class)).topic(transferTopic).create();
            try {
                log.info("transfer producer create finished");
                this.prepareTransferData((Producer<TransactionTestBase.TransferOperation>)transferProducer, 20);
                Consumer transferConsumer = pulsarClient.newConsumer(Schema.JSON(TransactionTestBase.TransferOperation.class)).topic(new String[]{transferTopic}).subscriptionName("integration-test").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(true).subscribe();
                try {
                    Awaitility.await().until(() -> ((Consumer)transferConsumer).isConnected());
                    log.info("transfer consumer create finished");
                    Producer balanceUpdateProducer = pulsarClient.newProducer(Schema.JSON(TransactionTestBase.BalanceUpdate.class)).topic(balanceUpdateTopic).sendTimeout(0, TimeUnit.SECONDS).create();
                    try {
                        log.info("balance update producer create finished");
                        Consumer balanceUpdateConsumer = pulsarClient.newConsumer(Schema.JSON(TransactionTestBase.BalanceUpdate.class)).topic(new String[]{balanceUpdateTopic}).subscriptionName("integration-test").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                        try {
                            Message message;
                            Message message2;
                            Awaitility.await().until(() -> ((Consumer)balanceUpdateConsumer).isConnected());
                            log.info("balance update consumer create finished");
                            while ((message2 = transferConsumer.receive(10, TimeUnit.SECONDS)) != null) {
                                TransactionTestBase.TransferOperation transferOperation = (TransactionTestBase.TransferOperation)message2.getValue();
                                Transaction transaction = (Transaction)pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.MINUTES).build().get();
                                balanceUpdateProducer.newMessage(transaction).value((Object)this.getBalanceUpdate(transferOperation, true)).sendAsync();
                                balanceUpdateProducer.newMessage(transaction).value((Object)this.getBalanceUpdate(transferOperation, false)).sendAsync();
                                transferConsumer.acknowledgeAsync(message2.getMessageId(), transaction);
                                transaction.commit().get();
                            }
                            int receiveBalanceUpdateCnt = 0;
                            int balanceSum = 0;
                            while ((message = balanceUpdateConsumer.receive(10, TimeUnit.SECONDS)) != null) {
                                ++receiveBalanceUpdateCnt;
                                TransactionTestBase.BalanceUpdate balanceUpdate = (TransactionTestBase.BalanceUpdate)message.getValue();
                                balanceSum += balanceUpdate.getAmount();
                                log.info("balance account: {}, amount: {}", (Object)balanceUpdate.getAccount(), (Object)balanceUpdate.getAmount());
                            }
                            Assert.assertEquals((int)receiveBalanceUpdateCnt, (int)40);
                            Assert.assertEquals((int)balanceSum, (int)0);
                            log.info("transfer normal test finish.");
                        }
                        finally {
                            if (Collections.singletonList(balanceUpdateConsumer).get(0) != null) {
                                balanceUpdateConsumer.close();
                            }
                        }
                    }
                    finally {
                        if (Collections.singletonList(balanceUpdateProducer).get(0) != null) {
                            balanceUpdateProducer.close();
                        }
                    }
                }
                finally {
                    if (Collections.singletonList(transferConsumer).get(0) != null) {
                        transferConsumer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(transferProducer).get(0) != null) {
                    transferProducer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(pulsarClient).get(0) != null) {
                pulsarClient.close();
            }
        }
    }
}

