/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl;

import java.time.Clock;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.OffloadPrefixTest;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.util.MockClock;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class OffloadLedgerDeleteTest
extends MockedBookKeeperTestCase {
    private static final Logger log = LoggerFactory.getLogger(OffloadLedgerDeleteTest.class);

    @Test
    public void testLaggedDelete() throws Exception {
        OffloadPrefixTest.MockLedgerOffloader offloader = new OffloadPrefixTest.MockLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        MockClock clock = new MockClock();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(Long.valueOf(300000L));
        config.setLedgerOffloader((LedgerOffloader)offloader);
        config.setClock((Clock)clock);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (int i = 0; i < 15; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        long firstLedgerId = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId();
        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        Assert.assertEquals(ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers());
        Assert.assertTrue((boolean)this.bkc.getLedgers().contains(firstLedgerId));
        clock.advance(2L, TimeUnit.MINUTES);
        CompletableFuture promise = new CompletableFuture();
        ledger.internalTrimConsumedLedgers(promise);
        promise.join();
        Assert.assertTrue((boolean)this.bkc.getLedgers().contains(firstLedgerId));
        clock.advance(5L, TimeUnit.MINUTES);
        CompletableFuture promise2 = new CompletableFuture();
        ledger.internalTrimConsumedLedgers(promise2);
        promise2.join();
        OffloadPrefixTest.assertEventuallyTrue(() -> !this.bkc.getLedgers().contains(firstLedgerId));
        Assert.assertEquals(ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers());
        clock.advance(5L, TimeUnit.MINUTES);
        CompletableFuture promise3 = new CompletableFuture();
        ledger.internalTrimConsumedLedgers(promise3);
        promise3.join();
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        OffloadPrefixTest.assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedgerId));
    }

    @Test(timeOut=5000L)
    public void testFileSystemOffloadDeletePath() throws Exception {
        MockFileSystemLedgerOffloader offloader = new MockFileSystemLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        MockClock clock = new MockClock();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(3, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(Long.valueOf(300000L));
        config.setLedgerOffloader((LedgerOffloader)offloader);
        config.setClock((Clock)clock);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger_filesystem", config);
        for (int i = 0; i < 15; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        long firstLedgerId = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId();
        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        Assert.assertEquals(ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers());
        Assert.assertTrue((boolean)this.bkc.getLedgers().contains(firstLedgerId));
        Assert.assertEquals(ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers());
        clock.advance(5L, TimeUnit.MINUTES);
        CompletableFuture promise3 = new CompletableFuture();
        ledger.internalTrimConsumedLedgers(promise3);
        promise3.join();
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        OffloadPrefixTest.assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedgerId));
    }

    @Test
    public void testLaggedDeleteRetentionSetLower() throws Exception {
        OffloadPrefixTest.MockLedgerOffloader offloader = new OffloadPrefixTest.MockLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        MockClock clock = new MockClock();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(5, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(Long.valueOf(600000L));
        config.setLedgerOffloader((LedgerOffloader)offloader);
        config.setClock((Clock)clock);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (int i = 0; i < 15; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        long firstLedgerId = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId();
        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        Assert.assertEquals(ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers());
        Assert.assertTrue((boolean)this.bkc.getLedgers().contains(firstLedgerId));
        clock.advance(2L, TimeUnit.MINUTES);
        CompletableFuture promise = new CompletableFuture();
        ledger.internalTrimConsumedLedgers(promise);
        promise.join();
        Assert.assertTrue((boolean)this.bkc.getLedgers().contains(firstLedgerId));
        clock.advance(5L, TimeUnit.MINUTES);
        CompletableFuture promise2 = new CompletableFuture();
        ledger.internalTrimConsumedLedgers(promise2);
        promise2.join();
        OffloadPrefixTest.assertEventuallyTrue(() -> !this.bkc.getLedgers().contains(firstLedgerId));
        OffloadPrefixTest.assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedgerId));
    }

    @Test
    public void testLaggedDeleteSlowConsumer() throws Exception {
        OffloadPrefixTest.MockLedgerOffloader offloader = new OffloadPrefixTest.MockLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        MockClock clock = new MockClock();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(Long.valueOf(300000L));
        config.setLedgerOffloader((LedgerOffloader)offloader);
        config.setClock((Clock)clock);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("sub1");
        for (int i = 0; i < 15; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        long firstLedgerId = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId();
        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        Assert.assertEquals(ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers());
        Assert.assertTrue((boolean)this.bkc.getLedgers().contains(firstLedgerId));
        clock.advance(2L, TimeUnit.MINUTES);
        CompletableFuture promise = new CompletableFuture();
        ledger.internalTrimConsumedLedgers(promise);
        promise.join();
        Assert.assertTrue((boolean)this.bkc.getLedgers().contains(firstLedgerId));
        clock.advance(5L, TimeUnit.MINUTES);
        CompletableFuture promise2 = new CompletableFuture();
        ledger.internalTrimConsumedLedgers(promise2);
        promise2.join();
        OffloadPrefixTest.assertEventuallyTrue(() -> !this.bkc.getLedgers().contains(firstLedgerId));
        Assert.assertEquals(ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers());
    }

    @Test
    public void isOffloadedNeedsDeleteTest() throws Exception {
        OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
        LedgerOffloader ledgerOffloader = (LedgerOffloader)Mockito.mock(LedgerOffloader.class);
        Mockito.when((Object)ledgerOffloader.getOffloadPolicies()).thenReturn((Object)offloadPolicies);
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        MockClock clock = new MockClock();
        config.setLedgerOffloader(ledgerOffloader);
        config.setClock((Clock)clock);
        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)this.factory.open("isOffloadedNeedsDeleteTest", config);
        MLDataFormats.OffloadContext offloadContext = MLDataFormats.OffloadContext.newBuilder().setTimestamp(config.getClock().millis() - 1000L).setComplete(true).setBookkeeperDeleted(false).build();
        boolean needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies));
        Assert.assertFalse((boolean)needsDelete);
        offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(Long.valueOf(500L));
        needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies));
        Assert.assertTrue((boolean)needsDelete);
        offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(Long.valueOf(2000L));
        needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies));
        Assert.assertFalse((boolean)needsDelete);
        offloadContext = MLDataFormats.OffloadContext.newBuilder().setTimestamp(config.getClock().millis() - 1000L).setComplete(false).setBookkeeperDeleted(false).build();
        needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies));
        Assert.assertFalse((boolean)needsDelete);
        offloadContext = MLDataFormats.OffloadContext.newBuilder().setTimestamp(config.getClock().millis() - 1000L).setComplete(true).setBookkeeperDeleted(true).build();
        needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies));
        Assert.assertFalse((boolean)needsDelete);
    }

    static class MockFileSystemLedgerOffloader
    implements LedgerOffloader {
        private String storageBasePath = "/Users/pulsar_filesystem_offloader";
        ConcurrentHashMap<Long, String> offloads = new ConcurrentHashMap();
        ConcurrentHashMap<Long, String> deletes = new ConcurrentHashMap();
        OffloadPrefixTest.MockLedgerOffloader.InjectAfterOffload inject = null;
        OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create((String)"filesystem", (String)"", (String)"", (String)"", null, null, null, null, (Integer)0x4000000, (Integer)0x100000, (Long)OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES, (Long)OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, (OffloadedReadPriority)OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY);

        MockFileSystemLedgerOffloader() {
        }

        private static String getStoragePath(String storageBasePath, String managedLedgerName) {
            return storageBasePath == null ? managedLedgerName + "/" : storageBasePath + "/" + managedLedgerName + "/";
        }

        private static String getDataFilePath(String storagePath, long ledgerId, UUID uuid) {
            return storagePath + ledgerId + "-" + uuid.toString();
        }

        Set<Long> offloadedLedgers() {
            return this.offloads.keySet();
        }

        Set<Long> deletedOffloads() {
            return this.deletes.keySet();
        }

        public String getOffloadDriverName() {
            return "mockfilesystem";
        }

        public CompletableFuture<Void> offload(ReadHandle ledger, UUID uuid, Map<String, String> extraMetadata) {
            Assert.assertNotNull((Object)extraMetadata.get("ManagedLedgerName"));
            String storagePath = MockFileSystemLedgerOffloader.getStoragePath(this.storageBasePath, extraMetadata.get("ManagedLedgerName"));
            String dataFilePath = MockFileSystemLedgerOffloader.getDataFilePath(storagePath, ledger.getId(), uuid);
            CompletableFuture<Void> promise = new CompletableFuture<Void>();
            if (this.offloads.putIfAbsent(ledger.getId(), dataFilePath) == null) {
                promise.complete(null);
            } else {
                promise.completeExceptionally(new Exception("Already exists exception"));
            }
            if (this.inject != null) {
                this.inject.call();
            }
            return promise;
        }

        public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid, Map<String, String> offloadDriverMetadata) {
            CompletableFuture<ReadHandle> promise = new CompletableFuture<ReadHandle>();
            promise.completeExceptionally(new UnsupportedOperationException());
            return promise;
        }

        public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid, Map<String, String> offloadDriverMetadata) {
            Assert.assertNotNull((Object)offloadDriverMetadata.get("ManagedLedgerName"));
            String storagePath = MockFileSystemLedgerOffloader.getStoragePath(this.storageBasePath, offloadDriverMetadata.get("ManagedLedgerName"));
            String dataFilePath = MockFileSystemLedgerOffloader.getDataFilePath(storagePath, ledgerId, uuid);
            CompletableFuture<Void> promise = new CompletableFuture<Void>();
            if (this.offloads.remove(ledgerId, dataFilePath)) {
                this.deletes.put(ledgerId, dataFilePath);
                promise.complete(null);
            } else {
                promise.completeExceptionally(new Exception("Not found"));
            }
            return promise;
        }

        public OffloadPoliciesImpl getOffloadPolicies() {
            return this.offloadPolicies;
        }

        public void close() {
        }

        static interface InjectAfterOffload {
            public void call();
        }
    }
}

