/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.blob.BlobCachePutTest;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobCacheSizeTracker;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobServerCleanupTest;
import org.apache.flink.runtime.blob.BlobServerGetTest;
import org.apache.flink.runtime.blob.BlobServerPutTest;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class BlobCacheCleanupTest
extends TestLogger {
    private final Random rnd = new Random();
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPermanentBlobCleanup() throws IOException, InterruptedException {
        BlobServer server;
        JobID jobId;
        block7: {
            jobId = new JobID();
            ArrayList<PermanentBlobKey> keys = new ArrayList<PermanentBlobKey>();
            server = null;
            PermanentBlobCache cache = null;
            byte[] buf = new byte[128];
            try {
                Configuration config = new Configuration();
                config.setString(BlobServerOptions.STORAGE_DIRECTORY, this.temporaryFolder.newFolder().getAbsolutePath());
                config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
                server = new BlobServer(config, (BlobStore)new VoidBlobStore());
                server.start();
                InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
                cache = new PermanentBlobCache(config, (BlobView)new VoidBlobStore(), serverAddress);
                keys.add(server.putPermanent(jobId, buf));
                buf[0] = (byte)(buf[0] + 1);
                keys.add(server.putPermanent(jobId, buf));
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)server);
                BlobServerCleanupTest.checkFileCountForJob(0, jobId, (PermanentBlobService)cache);
                cache.registerJob(jobId);
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)server);
                BlobServerCleanupTest.checkFileCountForJob(0, jobId, (PermanentBlobService)cache);
                for (PermanentBlobKey key : keys) {
                    cache.getFile(jobId, key);
                }
                cache.registerJob(jobId);
                for (PermanentBlobKey key : keys) {
                    cache.getFile(jobId, key);
                }
                Assert.assertEquals((long)2L, (long)BlobServerCleanupTest.checkFilesExist(jobId, keys, cache, true));
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)server);
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)cache);
                cache.releaseJob(jobId);
                Assert.assertEquals((long)2L, (long)BlobServerCleanupTest.checkFilesExist(jobId, keys, cache, true));
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)server);
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)cache);
                cache.releaseJob(jobId);
                BlobCacheCleanupTest.verifyJobCleanup(cache, jobId, keys);
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)server);
                if (cache == null) break block7;
            }
            catch (Throwable throwable) {
                if (cache != null) {
                    cache.close();
                }
                if (server != null) {
                    server.close();
                }
                BlobServerCleanupTest.checkFileCountForJob(0, jobId, server);
                throw throwable;
            }
            cache.close();
        }
        if (server != null) {
            server.close();
        }
        BlobServerCleanupTest.checkFileCountForJob(0, jobId, (PermanentBlobService)server);
    }

    @Test
    public void testPermanentJobReferences() throws IOException, InterruptedException {
        JobID jobId = new JobID();
        Configuration config = new Configuration();
        config.setString(BlobServerOptions.STORAGE_DIRECTORY, this.temporaryFolder.newFolder().getAbsolutePath());
        config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3600000L);
        InetSocketAddress serverAddress = new InetSocketAddress("localhost", 12345);
        try (PermanentBlobCache cache = new PermanentBlobCache(config, (BlobView)new VoidBlobStore(), serverAddress);){
            cache.registerJob(jobId);
            Assert.assertEquals((long)1L, (long)((PermanentBlobCache.RefCount)cache.getJobRefCounters().get((Object)jobId)).references);
            Assert.assertEquals((long)-1L, (long)((PermanentBlobCache.RefCount)cache.getJobRefCounters().get((Object)jobId)).keepUntil);
            cache.registerJob(jobId);
            Assert.assertEquals((long)2L, (long)((PermanentBlobCache.RefCount)cache.getJobRefCounters().get((Object)jobId)).references);
            Assert.assertEquals((long)-1L, (long)((PermanentBlobCache.RefCount)cache.getJobRefCounters().get((Object)jobId)).keepUntil);
            cache.releaseJob(jobId);
            Assert.assertEquals((long)1L, (long)((PermanentBlobCache.RefCount)cache.getJobRefCounters().get((Object)jobId)).references);
            Assert.assertEquals((long)-1L, (long)((PermanentBlobCache.RefCount)cache.getJobRefCounters().get((Object)jobId)).keepUntil);
            long cleanupLowerBound = System.currentTimeMillis() + config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
            cache.releaseJob(jobId);
            Assert.assertEquals((long)0L, (long)((PermanentBlobCache.RefCount)cache.getJobRefCounters().get((Object)jobId)).references);
            Assert.assertThat((Object)((PermanentBlobCache.RefCount)cache.getJobRefCounters().get((Object)jobId)).keepUntil, (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(cleanupLowerBound)));
            cache.registerJob(jobId);
            Assert.assertEquals((long)1L, (long)((PermanentBlobCache.RefCount)cache.getJobRefCounters().get((Object)jobId)).references);
            Assert.assertEquals((long)-1L, (long)((PermanentBlobCache.RefCount)cache.getJobRefCounters().get((Object)jobId)).keepUntil);
            cleanupLowerBound = System.currentTimeMillis() + config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
            cache.releaseJob(jobId);
            Assert.assertEquals((long)0L, (long)((PermanentBlobCache.RefCount)cache.getJobRefCounters().get((Object)jobId)).references);
            Assert.assertThat((Object)((PermanentBlobCache.RefCount)cache.getJobRefCounters().get((Object)jobId)).keepUntil, (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(cleanupLowerBound)));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Ignore(value="manual test due to stalling: ensures a BLOB is retained first and only deleted after the (long) timeout ")
    public void testPermanentBlobDeferredCleanup() throws IOException, InterruptedException {
        BlobServer server;
        JobID jobId;
        block7: {
            long cleanupInterval = 5L;
            jobId = new JobID();
            ArrayList<PermanentBlobKey> keys = new ArrayList<PermanentBlobKey>();
            server = null;
            PermanentBlobCache cache = null;
            byte[] buf = new byte[128];
            try {
                Configuration config = new Configuration();
                config.setString(BlobServerOptions.STORAGE_DIRECTORY, this.temporaryFolder.newFolder().getAbsolutePath());
                config.setLong(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval);
                server = new BlobServer(config, (BlobStore)new VoidBlobStore());
                server.start();
                InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
                BlobCacheSizeTracker tracker = new BlobCacheSizeTracker(MemorySize.ofMebiBytes((long)100L).getBytes());
                cache = new PermanentBlobCache(config, (BlobView)new VoidBlobStore(), serverAddress, tracker);
                keys.add(server.putPermanent(jobId, buf));
                buf[0] = (byte)(buf[0] + 1);
                keys.add(server.putPermanent(jobId, buf));
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)server);
                BlobServerCleanupTest.checkFileCountForJob(0, jobId, (PermanentBlobService)cache);
                BlobCacheCleanupTest.checkBlobCacheSizeTracker(tracker, jobId, 0);
                cache.registerJob(jobId);
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)server);
                BlobServerCleanupTest.checkFileCountForJob(0, jobId, (PermanentBlobService)cache);
                BlobCacheCleanupTest.checkBlobCacheSizeTracker(tracker, jobId, 0);
                for (PermanentBlobKey key : keys) {
                    cache.readFile(jobId, key);
                }
                cache.registerJob(jobId);
                for (PermanentBlobKey key : keys) {
                    cache.readFile(jobId, key);
                }
                Assert.assertEquals((long)2L, (long)BlobServerCleanupTest.checkFilesExist(jobId, keys, cache, true));
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)server);
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)cache);
                BlobCacheCleanupTest.checkBlobCacheSizeTracker(tracker, jobId, 2);
                cache.releaseJob(jobId);
                Assert.assertEquals((long)2L, (long)BlobServerCleanupTest.checkFilesExist(jobId, keys, cache, true));
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)server);
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)cache);
                BlobCacheCleanupTest.checkBlobCacheSizeTracker(tracker, jobId, 2);
                cache.releaseJob(jobId);
                Assert.assertEquals((long)2L, (long)BlobServerCleanupTest.checkFilesExist(jobId, keys, cache, true));
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)cache);
                Thread.sleep(cleanupInterval / 5L);
                Assert.assertEquals((long)2L, (long)BlobServerCleanupTest.checkFilesExist(jobId, keys, cache, true));
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)cache);
                Thread.sleep(cleanupInterval * 4L / 5L);
                BlobCacheCleanupTest.verifyJobCleanup(cache, jobId, keys);
                BlobCacheCleanupTest.checkBlobCacheSizeTracker(tracker, jobId, 0);
                BlobServerCleanupTest.checkFileCountForJob(2, jobId, (PermanentBlobService)server);
                if (cache == null) break block7;
            }
            catch (Throwable throwable) {
                if (cache != null) {
                    cache.close();
                }
                if (server != null) {
                    server.close();
                }
                BlobServerCleanupTest.checkFileCountForJob(0, jobId, server);
                throw throwable;
            }
            cache.close();
        }
        if (server != null) {
            server.close();
        }
        BlobServerCleanupTest.checkFileCountForJob(0, jobId, (PermanentBlobService)server);
    }

    @Test
    public void testTransientBlobNoJobCleanup() throws IOException, InterruptedException, ExecutionException {
        this.testTransientBlobCleanup(null);
    }

    @Test
    public void testTransientBlobForJobCleanup() throws IOException, InterruptedException, ExecutionException {
        this.testTransientBlobCleanup(new JobID());
    }

    private void testTransientBlobCleanup(@Nullable JobID jobId) throws IOException, InterruptedException, ExecutionException {
        long cleanupInterval = 1L;
        int numberConcurrentGetOperations = 3;
        ArrayList<CompletableFuture<Void>> getOperations = new ArrayList<CompletableFuture<Void>>(3);
        byte[] data = new byte[2000000];
        this.rnd.nextBytes(data);
        byte[] data2 = Arrays.copyOfRange(data, 10, 54);
        Configuration config = new Configuration();
        config.setString(BlobServerOptions.STORAGE_DIRECTORY, this.temporaryFolder.newFolder().getAbsolutePath());
        config.setLong(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval);
        try (BlobServer server = new BlobServer(config, (BlobStore)new VoidBlobStore());
             BlobCacheService cache = new BlobCacheService(config, (BlobView)new VoidBlobStore(), new InetSocketAddress("localhost", server.getPort()));){
            ConcurrentMap transientBlobExpiryTimes = cache.getTransientBlobService().getBlobExpiryTimes();
            server.start();
            TransientBlobKey key1 = (TransientBlobKey)BlobServerPutTest.put((BlobService)server, jobId, data, BlobKey.BlobType.TRANSIENT_BLOB);
            TransientBlobKey key2 = (TransientBlobKey)BlobServerPutTest.put((BlobService)server, jobId, data2, BlobKey.BlobType.TRANSIENT_BLOB);
            long cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
            BlobServerPutTest.verifyContents((BlobService)cache, jobId, (BlobKey)key1, data);
            Long key1ExpiryFirstAccess = (Long)transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key1));
            Assert.assertThat((Object)key1ExpiryFirstAccess, (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(cleanupLowerBound)));
            Assert.assertNull(transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key2)));
            Thread.sleep(1L);
            cleanupLowerBound = System.currentTimeMillis() + cleanupInterval;
            BlobServerPutTest.verifyContents((BlobService)cache, jobId, (BlobKey)key2, data2);
            Assert.assertEquals((Object)key1ExpiryFirstAccess, transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key1)));
            Assert.assertThat(transientBlobExpiryTimes.get(Tuple2.of((Object)jobId, (Object)key2)), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(cleanupLowerBound)));
            if (jobId != null) {
                server.cleanupJob(jobId, true);
            } else {
                server.deleteFromCache(key1);
                server.deleteFromCache(key2);
            }
            BlobServerCleanupTest.checkFileCountForJob(0, jobId, (PermanentBlobService)server);
            long finishTime = System.currentTimeMillis() + 3L * cleanupInterval;
            ExecutorService executor = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 3; ++i) {
                CompletableFuture<Void> getOperation = CompletableFuture.supplyAsync(() -> {
                    try {
                        while (System.currentTimeMillis() < finishTime) {
                            BlobServerGetTest.get((BlobService)cache, jobId, (BlobKey)key1);
                        }
                        return null;
                    }
                    catch (IOException e) {
                        throw new CompletionException((Throwable)new FlinkException("Could not retrieve blob.", (Throwable)e));
                    }
                }, executor);
                getOperations.add(getOperation);
            }
            FutureUtils.ConjunctFuture filesFuture = FutureUtils.combineAll(getOperations);
            filesFuture.get();
            BlobCachePutTest.verifyDeletedEventually(server, jobId, new BlobKey[]{key1, key2});
        }
    }

    static void verifyJobCleanup(PermanentBlobCache cache, JobID jobId, List<? extends BlobKey> keys) throws InterruptedException, IOException {
        long deadline = System.currentTimeMillis() + 30000L;
        do {
            Thread.sleep(100L);
        } while (BlobServerCleanupTest.checkFilesExist(jobId, keys, cache, false) != 0 && System.currentTimeMillis() < deadline);
        BlobServerCleanupTest.checkFileCountForJob(0, jobId, (PermanentBlobService)cache);
    }

    private static void checkBlobCacheSizeTracker(BlobCacheSizeTracker tracker, JobID jobId, int expected) {
        Assert.assertEquals((long)tracker.getBlobKeysByJobId(jobId).size(), (long)expected);
    }
}

