package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/blob/BlobServerCleanupTest.class */
public class BlobServerCleanupTest extends TestLogger {
    private final Random rnd = new Random();

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testTransientBlobNoJobCleanup() throws IOException, InterruptedException, ExecutionException {
        testTransientBlobCleanup(null);
    }

    @Test
    public void testTransientBlobForJobCleanup() throws IOException, InterruptedException, ExecutionException {
        testTransientBlobCleanup(new JobID());
    }

    private void testTransientBlobCleanup(@Nullable JobID jobID) throws IOException, InterruptedException, ExecutionException {
        ArrayList arrayList = new ArrayList(3);
        byte[] bArr = new byte[2000000];
        this.rnd.nextBytes(bArr);
        byte[] copyOfRange = Arrays.copyOfRange(bArr, 10, 54);
        Configuration configuration = new Configuration();
        configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, this.temporaryFolder.newFolder().getAbsolutePath());
        configuration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
        BlobServer blobServer = new BlobServer(configuration, new VoidBlobStore());
        Throwable th = null;
        try {
            try {
                ConcurrentMap blobExpiryTimes = blobServer.getBlobExpiryTimes();
                blobServer.start();
                long currentTimeMillis = System.currentTimeMillis() + 1;
                BlobKey blobKey = (TransientBlobKey) BlobServerPutTest.put((BlobService) blobServer, jobID, bArr, BlobKey.BlobType.TRANSIENT_BLOB);
                Long l = (Long) blobExpiryTimes.get(Tuple2.of(jobID, blobKey));
                Assert.assertThat(l, Matchers.greaterThanOrEqualTo(Long.valueOf(currentTimeMillis)));
                long currentTimeMillis2 = System.currentTimeMillis() + 1;
                BlobKey blobKey2 = (TransientBlobKey) BlobServerPutTest.put((BlobService) blobServer, jobID, copyOfRange, BlobKey.BlobType.TRANSIENT_BLOB);
                Long l2 = (Long) blobExpiryTimes.get(Tuple2.of(jobID, blobKey2));
                Assert.assertThat(l2, Matchers.greaterThanOrEqualTo(Long.valueOf(currentTimeMillis2)));
                JobID jobID2 = jobID == null ? new JobID() : jobID;
                BlobKey put = BlobServerPutTest.put((BlobService) blobServer, jobID2, bArr, BlobKey.BlobType.PERMANENT_BLOB);
                Thread.sleep(1L);
                long currentTimeMillis3 = System.currentTimeMillis() + 1;
                BlobServerPutTest.verifyContents((BlobService) blobServer, jobID, blobKey, bArr);
                Long l3 = (Long) blobExpiryTimes.get(Tuple2.of(jobID, blobKey));
                Assert.assertThat(l3, Matchers.greaterThan(l));
                Assert.assertThat(l3, Matchers.greaterThanOrEqualTo(Long.valueOf(currentTimeMillis3)));
                Assert.assertEquals(l2, blobExpiryTimes.get(Tuple2.of(jobID, blobKey2)));
                Thread.sleep(1L);
                long currentTimeMillis4 = System.currentTimeMillis() + 1;
                BlobServerPutTest.verifyContents((BlobService) blobServer, jobID, blobKey2, copyOfRange);
                Assert.assertEquals(l3, blobExpiryTimes.get(Tuple2.of(jobID, blobKey)));
                Assert.assertThat(blobExpiryTimes.get(Tuple2.of(jobID, blobKey2)), Matchers.greaterThan(l2));
                Assert.assertThat(blobExpiryTimes.get(Tuple2.of(jobID, blobKey2)), Matchers.greaterThanOrEqualTo(Long.valueOf(currentTimeMillis4)));
                long currentTimeMillis5 = System.currentTimeMillis() + (3 * 1);
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
                for (int i = 0; i < 3; i++) {
                    arrayList.add(CompletableFuture.supplyAsync(() -> {
                        while (System.currentTimeMillis() < currentTimeMillis5) {
                            try {
                                BlobServerGetTest.get(blobServer, jobID, blobKey);
                            } catch (IOException e) {
                                throw new CompletionException((Throwable) new FlinkException("Could not retrieve blob.", e));
                            }
                        }
                        return null;
                    }, newFixedThreadPool));
                }
                FutureUtils.combineAll(arrayList).get();
                BlobCachePutTest.verifyDeletedEventually(blobServer, jobID, blobKey, blobKey2);
                BlobServerPutTest.verifyContents((BlobService) blobServer, jobID2, put, bArr);
                if (blobServer != null) {
                    if (0 == 0) {
                        blobServer.close();
                        return;
                    }
                    try {
                        blobServer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (blobServer != null) {
                if (th != null) {
                    try {
                        blobServer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    blobServer.close();
                }
            }
            throw th4;
        }
    }

    public static <T> int checkFilesExist(JobID jobID, Collection<? extends BlobKey> collection, T t, boolean z) throws IOException {
        File storageDir;
        int i = 0;
        for (BlobKey blobKey : collection) {
            if (t instanceof BlobServer) {
                storageDir = ((BlobServer) t).getStorageDir();
            } else if (t instanceof PermanentBlobCache) {
                storageDir = ((PermanentBlobCache) t).getStorageDir();
            } else {
                if (!(t instanceof TransientBlobCache)) {
                    throw new UnsupportedOperationException("unsupported BLOB service class: " + t.getClass().getCanonicalName());
                }
                storageDir = ((TransientBlobCache) t).getStorageDir();
            }
            File file = new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobID, blobKey));
            if (file.exists()) {
                i++;
            } else if (z) {
                throw new IOException("File " + file + " does not exist.");
            }
        }
        return i;
    }

    public static void checkFileCountForJob(int i, JobID jobID, PermanentBlobService permanentBlobService) throws IOException {
        File parentFile = permanentBlobService instanceof BlobServer ? ((BlobServer) permanentBlobService).getStorageLocation(jobID, new PermanentBlobKey()).getParentFile() : ((PermanentBlobCache) permanentBlobService).getStorageLocation(jobID, new PermanentBlobKey()).getParentFile();
        File[] listFiles = parentFile.listFiles();
        if (listFiles != null) {
            Assert.assertEquals("Too many/few files in job dir: " + Arrays.asList(listFiles).toString(), i, listFiles.length);
        } else if (i != 0) {
            throw new IOException("File " + parentFile + " does not exist.");
        }
    }
}
