package org.apache.pulsar.broker.admin;

import com.google.common.collect.Sets;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.protocol.MockProtocolHandler;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/admin/AdminApiOffloadTest.class */
public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        this.conf.setManagedLedgerMaxEntriesPerLedger(10);
        this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
        super.internalSetup();
        this.admin.clusters().createCluster("test", new ClusterData(this.pulsar.getWebServiceAddress()));
        this.admin.tenants().createTenant("prop-xyz", new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")));
        this.admin.namespaces().createNamespace("prop-xyz/ns1", Sets.newHashSet("test"));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    private void testOffload(String str, String str2) throws Exception {
        LedgerOffloader ledgerOffloader = (LedgerOffloader) Mockito.mock(LedgerOffloader.class);
        Mockito.when(ledgerOffloader.getOffloadDriverName()).thenReturn(MockProtocolHandler.NAME);
        ((PulsarService) Mockito.doReturn(ledgerOffloader).when(this.pulsar)).getManagedLedgerOffloader((NamespaceName) ArgumentMatchers.any(), (OffloadPolicies) ArgumentMatchers.any());
        CompletableFuture completableFuture = new CompletableFuture();
        ((LedgerOffloader) Mockito.doReturn(completableFuture).when(ledgerOffloader)).offload((ReadHandle) ArgumentMatchers.any(), (UUID) ArgumentMatchers.any(), (Map) ArgumentMatchers.any());
        MessageId messageId = MessageId.latest;
        Producer<byte[]> create = this.pulsarClient.newProducer().topic(str).enableBatching(false).create();
        Throwable th = null;
        for (int i = 0; i < 15; i++) {
            try {
                try {
                    messageId = create.send("Foobar".getBytes());
                } finally {
                }
            } catch (Throwable th2) {
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th2;
            }
        }
        if (create != null) {
            if (0 != 0) {
                try {
                    create.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                create.close();
            }
        }
        ManagedLedgerInfo managedLedgerInfo = this.pulsar.getManagedLedgerFactory().getManagedLedgerInfo(str2);
        Assert.assertEquals(managedLedgerInfo.ledgers.size(), 2);
        Assert.assertEquals(this.admin.topics().offloadStatus(str).status, LongRunningProcessStatus.Status.NOT_RUN);
        this.admin.topics().triggerOffload(str, messageId);
        Assert.assertEquals(this.admin.topics().offloadStatus(str).status, LongRunningProcessStatus.Status.RUNNING);
        try {
            this.admin.topics().triggerOffload(str, messageId);
            Assert.fail("Should have failed");
        } catch (PulsarAdminException.ConflictException e) {
        }
        completableFuture.completeExceptionally(new Exception("Some random failure"));
        Assert.assertEquals(this.admin.topics().offloadStatus(str).status, LongRunningProcessStatus.Status.ERROR);
        Assert.assertTrue(this.admin.topics().offloadStatus(str).lastError.contains("Some random failure"));
        ((LedgerOffloader) Mockito.doReturn(CompletableFuture.completedFuture(null)).when(ledgerOffloader)).offload((ReadHandle) ArgumentMatchers.any(), (UUID) ArgumentMatchers.any(), (Map) ArgumentMatchers.any());
        this.admin.topics().triggerOffload(str, messageId);
        Assert.assertEquals(this.admin.topics().offloadStatus(str).status, LongRunningProcessStatus.Status.SUCCESS);
        MessageIdImpl messageIdImpl = this.admin.topics().offloadStatus(str).firstUnoffloadedMessage;
        Assert.assertEquals(messageIdImpl.getLedgerId(), ((ManagedLedgerInfo.LedgerInfo) managedLedgerInfo.ledgers.get(1)).ledgerId);
        Assert.assertEquals(messageIdImpl.getEntryId(), 0L);
        ((LedgerOffloader) Mockito.verify(ledgerOffloader, Mockito.times(2))).offload((ReadHandle) ArgumentMatchers.any(), (UUID) ArgumentMatchers.any(), (Map) ArgumentMatchers.any());
    }

    @Test
    public void testOffloadV2() throws Exception {
        testOffload("persistent://prop-xyz/ns1/topic1", "prop-xyz/ns1/persistent/topic1");
    }

    @Test
    public void testOffloadV1() throws Exception {
        testOffload("persistent://prop-xyz/test/ns1/topic2", "prop-xyz/test/ns1/persistent/topic2");
    }

    @Test
    public void testOffloadPolicies() throws Exception {
        OffloadPolicies create = OffloadPolicies.create("aws-s3", "test-region", "test-bucket", "test-endpoint", 100, 100);
        this.admin.namespaces().setOffloadPolicies("prop-xyz/ns1", create);
        Assert.assertEquals(create, this.admin.namespaces().getOffloadPolicies("prop-xyz/ns1"));
    }
}
