package org.apache.storm.hdfs.spout;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Method;
import java.util.ArrayList;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.storm.hdfs.common.HdfsUtils;
import org.apache.storm.hdfs.spout.FileLock;
import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/storm/hdfs/spout/TestFileLock.class */
public class TestFileLock {
    private FileSystem fs;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Rule
    public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule();
    private HdfsConfiguration conf = new HdfsConfiguration();
    private final Path filesDir = new Path("/tmp/filesdir");
    private final Path locksDir = new Path("/tmp/locksdir");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/storm/hdfs/spout/TestFileLock$FileLockingThread.class */
    public class FileLockingThread extends Thread {
        private int thdNum;
        private final FileSystem fs;
        public boolean cleanExit = false;
        private Path fileToLock;
        private Path locksDir;
        private String spoutId;

        public FileLockingThread(int i, FileSystem fileSystem, Path path, Path path2, String str) throws IOException {
            this.thdNum = i;
            this.fs = fileSystem;
            this.fileToLock = path;
            this.locksDir = path2;
            this.spoutId = str;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Thread.currentThread().setName("FileLockingThread-" + this.thdNum);
            FileLock fileLock = null;
            do {
                try {
                    try {
                        System.err.println("Trying lock - " + getName());
                        fileLock = FileLock.tryLock(this.fs, this.fileToLock, this.locksDir, this.spoutId);
                        System.err.println("Acquired lock - " + getName());
                        if (fileLock == null) {
                            System.out.println("Retrying lock - " + getName());
                        }
                        if (fileLock != null) {
                            break;
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        if (fileLock != null) {
                            try {
                                fileLock.release();
                                System.err.println("Released lock - " + getName());
                            } catch (IOException e2) {
                                e2.printStackTrace(System.err);
                            }
                        }
                    }
                } catch (Throwable th) {
                    if (fileLock != null) {
                        try {
                            fileLock.release();
                            System.err.println("Released lock - " + getName());
                        } catch (IOException e3) {
                            e3.printStackTrace(System.err);
                            throw th;
                        }
                    }
                    throw th;
                }
            } while (!Thread.currentThread().isInterrupted());
            this.cleanExit = true;
            if (fileLock != null) {
                try {
                    fileLock.release();
                    System.err.println("Released lock - " + getName());
                } catch (IOException e4) {
                    e4.printStackTrace(System.err);
                }
            }
            System.err.println("Thread exiting - " + getName());
        }
    }

    @Before
    public void setup() throws IOException {
        this.conf.set("ipc.ping.interval", "5000");
        this.fs = this.dfsClusterRule.getDfscluster().getFileSystem();
        if (!$assertionsDisabled && !this.fs.mkdirs(this.filesDir)) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !this.fs.mkdirs(this.locksDir)) {
            throw new AssertionError();
        }
    }

    @After
    public void teardown() throws IOException {
        this.fs.delete(this.filesDir, true);
        this.fs.delete(this.locksDir, true);
        this.fs.close();
    }

    @Test
    public void testBasicLocking() throws Exception {
        Path path = new Path(this.filesDir + "/file1");
        Path path2 = new Path(this.filesDir + "/file2");
        this.fs.create(path).close();
        this.fs.create(path2).close();
        FileLock tryLock = FileLock.tryLock(this.fs, path, this.locksDir, "spout1");
        Assert.assertNotNull(tryLock);
        Assert.assertTrue(this.fs.exists(tryLock.getLockFile()));
        Assert.assertEquals(tryLock.getLockFile().getParent(), this.locksDir);
        Assert.assertEquals(tryLock.getLockFile().getName(), path.getName());
        Assert.assertNull(FileLock.tryLock(this.fs, path, this.locksDir, "spout1"));
        tryLock.release();
        Assert.assertFalse(this.fs.exists(tryLock.getLockFile()));
        FileLock tryLock2 = FileLock.tryLock(this.fs, path, this.locksDir, "spout1");
        Assert.assertNotNull(tryLock2);
        Assert.assertTrue(this.fs.exists(tryLock2.getLockFile()));
        Assert.assertEquals(tryLock2.getLockFile().getParent(), this.locksDir);
        Assert.assertEquals(tryLock2.getLockFile().getName(), path.getName());
        FileLock tryLock3 = FileLock.tryLock(this.fs, path2, this.locksDir, "spout1");
        Assert.assertNotNull(tryLock3);
        Assert.assertTrue(this.fs.exists(tryLock3.getLockFile()));
        Assert.assertEquals(tryLock3.getLockFile().getParent(), this.locksDir);
        Assert.assertEquals(tryLock3.getLockFile().getName(), path2.getName());
        tryLock3.release();
        Assert.assertFalse(this.fs.exists(tryLock3.getLockFile()));
        tryLock2.release();
        Assert.assertFalse(this.fs.exists(tryLock2.getLockFile()));
    }

    @Test
    public void testHeartbeat() throws Exception {
        Path path = new Path(this.filesDir + "/file1");
        this.fs.create(path).close();
        FileLock tryLock = FileLock.tryLock(this.fs, path, this.locksDir, "spout1");
        Assert.assertNotNull(tryLock);
        Assert.assertTrue(this.fs.exists(tryLock.getLockFile()));
        Assert.assertEquals("heartbeats appear to be missing", 1L, readTextFile(tryLock.getLockFile()).size());
        tryLock.heartbeat("1");
        tryLock.heartbeat("2");
        tryLock.heartbeat("3");
        Assert.assertEquals("heartbeats appear to be missing", 4L, readTextFile(tryLock.getLockFile()).size());
        tryLock.heartbeat("4");
        tryLock.heartbeat("5");
        tryLock.heartbeat("6");
        Assert.assertEquals("heartbeats appear to be missing", 7L, readTextFile(tryLock.getLockFile()).size());
        tryLock.release();
        Assert.assertNull(readTextFile(tryLock.getLockFile()));
        Assert.assertFalse(this.fs.exists(tryLock.getLockFile()));
    }

    @Test
    public void testConcurrentLocking() throws IOException, InterruptedException {
        Path path = new Path(this.filesDir + "/file1");
        this.fs.create(path).close();
        FileLockingThread[] fileLockingThreadArr = null;
        try {
            fileLockingThreadArr = startThreads(100, path, this.locksDir);
            for (FileLockingThread fileLockingThread : fileLockingThreadArr) {
                fileLockingThread.join(30000L);
                Assert.assertTrue(fileLockingThread.getName() + " did not exit cleanly", fileLockingThread.cleanExit);
            }
            Assert.assertFalse(this.fs.exists(new Path(this.locksDir + "/" + path.getName())));
            if (fileLockingThreadArr != null) {
                for (FileLockingThread fileLockingThread2 : fileLockingThreadArr) {
                    fileLockingThread2.interrupt();
                    fileLockingThread2.join(30000L);
                    if (fileLockingThread2.isAlive()) {
                        throw new RuntimeException("Failed to stop threads within 30 seconds, threads may leak into other tests");
                    }
                }
            }
        } catch (Throwable th) {
            if (fileLockingThreadArr != null) {
                for (FileLockingThread fileLockingThread3 : fileLockingThreadArr) {
                    fileLockingThread3.interrupt();
                    fileLockingThread3.join(30000L);
                    if (fileLockingThread3.isAlive()) {
                        throw new RuntimeException("Failed to stop threads within 30 seconds, threads may leak into other tests");
                    }
                }
            }
            throw th;
        }
    }

    private FileLockingThread[] startThreads(int i, Path path, Path path2) throws IOException {
        FileLockingThread[] fileLockingThreadArr = new FileLockingThread[i];
        for (int i2 = 0; i2 < i; i2++) {
            fileLockingThreadArr[i2] = new FileLockingThread(i2, this.fs, path, path2, "spout" + Integer.toString(i2));
        }
        for (FileLockingThread fileLockingThread : fileLockingThreadArr) {
            fileLockingThread.start();
        }
        return fileLockingThreadArr;
    }

    @Test
    public void testStaleLockDetection_SingleLock() throws Exception {
        Path path = new Path(this.filesDir + "/file1");
        this.fs.create(path).close();
        FileLock tryLock = FileLock.tryLock(this.fs, path, this.locksDir, "spout1");
        try {
            Assert.assertNotNull(tryLock);
            Assert.assertTrue(this.fs.exists(tryLock.getLockFile()));
            Thread.sleep(1500L);
            Assert.assertNotNull(FileLock.locateOldestExpiredLock(this.fs, this.locksDir, 1));
            tryLock.heartbeat("1");
            Assert.assertNull(FileLock.locateOldestExpiredLock(this.fs, this.locksDir, 1));
            FileLock.LogEntry lastLogEntry = tryLock.getLastLogEntry();
            Assert.assertNotNull(lastLogEntry);
            Assert.assertEquals("1", lastLogEntry.fileOffset);
            Thread.sleep(1500L);
            Assert.assertNotNull(FileLock.locateOldestExpiredLock(this.fs, this.locksDir, 1));
            tryLock.release();
            this.fs.delete(path, false);
        } catch (Throwable th) {
            tryLock.release();
            this.fs.delete(path, false);
            throw th;
        }
    }

    @Test
    public void testStaleLockDetection_MultipleLocks() throws Exception {
        Path path = new Path(this.filesDir + "/file1");
        Path path2 = new Path(this.filesDir + "/file2");
        Path path3 = new Path(this.filesDir + "/file3");
        this.fs.create(path).close();
        this.fs.create(path2).close();
        this.fs.create(path3).close();
        FileLock tryLock = FileLock.tryLock(this.fs, path, this.locksDir, "spout1");
        FileLock tryLock2 = FileLock.tryLock(this.fs, path2, this.locksDir, "spout2");
        FileLock tryLock3 = FileLock.tryLock(this.fs, path3, this.locksDir, "spout3");
        Assert.assertNotNull(tryLock);
        Assert.assertNotNull(tryLock2);
        Assert.assertNotNull(tryLock3);
        try {
            Assert.assertNull(FileLock.locateOldestExpiredLock(this.fs, this.locksDir, 1));
            Thread.sleep(1500L);
            tryLock.heartbeat("1");
            tryLock2.heartbeat("1");
            HdfsUtils.Pair locateOldestExpiredLock = FileLock.locateOldestExpiredLock(this.fs, this.locksDir, 1);
            Assert.assertNotNull(locateOldestExpiredLock);
            Assert.assertEquals("spout3", ((FileLock.LogEntry) locateOldestExpiredLock.getValue()).componentID);
            tryLock.release();
            tryLock2.release();
            tryLock3.release();
            this.fs.delete(path, false);
            this.fs.delete(path2, false);
            this.fs.delete(path3, false);
        } catch (Throwable th) {
            tryLock.release();
            tryLock2.release();
            tryLock3.release();
            this.fs.delete(path, false);
            this.fs.delete(path2, false);
            this.fs.delete(path3, false);
            throw th;
        }
    }

    @Test
    public void testLockRecovery() throws Exception {
        Path path = new Path(this.filesDir + "/file1");
        Path path2 = new Path(this.filesDir + "/file2");
        Path path3 = new Path(this.filesDir + "/file3");
        this.fs.create(path).close();
        this.fs.create(path2).close();
        this.fs.create(path3).close();
        FileLock tryLock = FileLock.tryLock(this.fs, path, this.locksDir, "spout1");
        FileLock tryLock2 = FileLock.tryLock(this.fs, path2, this.locksDir, "spout2");
        FileLock tryLock3 = FileLock.tryLock(this.fs, path3, this.locksDir, "spout3");
        Assert.assertNotNull(tryLock);
        Assert.assertNotNull(tryLock2);
        Assert.assertNotNull(tryLock3);
        try {
            Assert.assertNull(FileLock.locateOldestExpiredLock(this.fs, this.locksDir, 1));
            closeUnderlyingLockFile(tryLock3);
            Thread.sleep(3000L);
            tryLock.heartbeat("1");
            tryLock2.heartbeat("1");
            FileLock acquireOldestExpiredLock = FileLock.acquireOldestExpiredLock(this.fs, this.locksDir, 1, "spout1");
            Assert.assertNotNull(acquireOldestExpiredLock);
            Assert.assertEquals("Expected lock3 file", Path.getPathWithoutSchemeAndAuthority(acquireOldestExpiredLock.getLockFile()), tryLock3.getLockFile());
            tryLock.release();
            tryLock2.release();
            tryLock3.release();
            this.fs.delete(path, false);
            this.fs.delete(path2, false);
            try {
                this.fs.delete(path3, false);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Throwable th) {
            tryLock.release();
            tryLock2.release();
            tryLock3.release();
            this.fs.delete(path, false);
            this.fs.delete(path2, false);
            try {
                this.fs.delete(path3, false);
            } catch (Exception e2) {
                e2.printStackTrace();
            }
            throw th;
        }
    }

    public static void closeUnderlyingLockFile(FileLock fileLock) throws ReflectiveOperationException {
        Method declaredMethod = FileLock.class.getDeclaredMethod("forceCloseLockFile", new Class[0]);
        declaredMethod.setAccessible(true);
        declaredMethod.invoke(fileLock, new Object[0]);
    }

    private ArrayList<String> readTextFile(Path path) throws IOException {
        FSDataInputStream fSDataInputStream = null;
        try {
            fSDataInputStream = this.fs.open(path);
            if (fSDataInputStream == null) {
                if (fSDataInputStream != null) {
                    fSDataInputStream.close();
                }
                return null;
            }
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fSDataInputStream));
            ArrayList<String> arrayList = new ArrayList<>();
            for (String readLine = bufferedReader.readLine(); readLine != null; readLine = bufferedReader.readLine()) {
                arrayList.add(readLine);
            }
            if (fSDataInputStream != null) {
                fSDataInputStream.close();
            }
            return arrayList;
        } catch (FileNotFoundException e) {
            if (fSDataInputStream != null) {
                fSDataInputStream.close();
            }
            return null;
        } catch (Throwable th) {
            if (fSDataInputStream != null) {
                fSDataInputStream.close();
            }
            throw th;
        }
    }

    static {
        $assertionsDisabled = !TestFileLock.class.desiredAssertionStatus();
    }
}
