package org.apache.storm.hdfs.blobstore;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import javax.security.auth.Subject;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.storm.blobstore.AtomicOutputStream;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.BlobStoreAclHandler;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.generated.AccessControl;
import org.apache.storm.generated.AccessControlType;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.security.auth.NimbusPrincipal;
import org.apache.storm.security.auth.SingleUserPrincipal;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/hdfs/blobstore/BlobStoreTest.class */
public class BlobStoreTest {
    URI base;
    File baseFile;
    public static final int READ = 1;
    public static final int WRITE = 2;
    public static final int ADMIN = 4;
    private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class);
    protected static MiniDFSCluster dfscluster = null;
    protected static Configuration hadoopConf = null;
    private static Map conf = new HashMap();

    @Before
    public void init() {
        System.setProperty("test.build.data", "target/test/data");
        initializeConfigs();
        this.baseFile = new File("/tmp/blob-store-test-" + UUID.randomUUID());
        this.base = this.baseFile.toURI();
    }

    @After
    public void cleanup() throws IOException {
        FileUtils.deleteDirectory(this.baseFile);
    }

    @AfterClass
    public static void cleanupAfterClass() throws IOException {
        if (dfscluster != null) {
            dfscluster.shutdown();
        }
    }

    public static void initializeConfigs() {
        conf.put("nimbus.admins", "admin");
        conf.put("nimbus.supervisor.users", "supervisor");
    }

    public static Subject getNimbusSubject() {
        Subject subject = new Subject();
        subject.getPrincipals().add(new NimbusPrincipal());
        return subject;
    }

    public static void assertStoreHasExactly(BlobStore blobStore, Subject subject, String... strArr) throws IOException, KeyNotFoundException, AuthorizationException {
        HashSet hashSet = new HashSet(Arrays.asList(strArr));
        HashSet hashSet2 = new HashSet();
        Iterator listKeys = blobStore.listKeys();
        while (listKeys.hasNext()) {
            hashSet2.add((String) listKeys.next());
        }
        HashSet hashSet3 = new HashSet(hashSet2);
        hashSet3.removeAll(hashSet);
        Assert.assertTrue("Found extra keys in the blob store " + hashSet3, hashSet3.isEmpty());
        HashSet hashSet4 = new HashSet(hashSet);
        hashSet4.removeAll(hashSet2);
        Assert.assertTrue("Found keys missing from the blob store " + hashSet4, hashSet4.isEmpty());
    }

    public static void assertStoreHasExactly(BlobStore blobStore, String... strArr) throws IOException, KeyNotFoundException, AuthorizationException {
        assertStoreHasExactly(blobStore, null, strArr);
    }

    public static int readInt(BlobStore blobStore, Subject subject, String str) throws IOException, KeyNotFoundException, AuthorizationException {
        InputStreamWithMeta blob = blobStore.getBlob(str, subject);
        try {
            int read = blob.read();
            blob.close();
            return read;
        } catch (Throwable th) {
            blob.close();
            throw th;
        }
    }

    public static int readInt(BlobStore blobStore, String str) throws IOException, KeyNotFoundException, AuthorizationException {
        return readInt(blobStore, null, str);
    }

    public static void readAssertEquals(BlobStore blobStore, String str, int i) throws IOException, KeyNotFoundException, AuthorizationException {
        Assert.assertEquals(i, readInt(blobStore, str));
    }

    public void readAssertEqualsWithAuth(BlobStore blobStore, Subject subject, String str, int i) throws IOException, KeyNotFoundException, AuthorizationException {
        Assert.assertEquals(i, readInt(blobStore, subject, str));
    }

    private HdfsBlobStore initHdfs(String str) throws Exception {
        if (hadoopConf == null) {
            hadoopConf = new Configuration();
        }
        try {
            if (dfscluster == null) {
                dfscluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(3).build();
                dfscluster.waitActive();
            }
        } catch (IOException e) {
            LOG.error("error creating MiniDFSCluster");
        }
        HashMap hashMap = new HashMap();
        hashMap.put("blobstore.dir", str);
        hashMap.put("storm.principal.tolocal", "org.apache.storm.security.auth.DefaultPrincipalToLocal");
        hashMap.put("storm.blobstore.replication.factor", 3);
        HdfsBlobStore hdfsBlobStore = new HdfsBlobStore();
        hdfsBlobStore.prepareInternal(hashMap, (String) null, dfscluster.getConfiguration(0));
        return hdfsBlobStore;
    }

    @Test
    public void testHdfsReplication() throws Exception {
        testReplication("/storm/blobstoreReplication/test", initHdfs("/storm/blobstoreReplication"));
    }

    @Test
    public void testBasicHdfs() throws Exception {
        testBasic(initHdfs("/storm/blobstore1"));
    }

    @Test
    public void testMultipleHdfs() throws Exception {
        testMultiple(initHdfs("/storm/blobstore2"));
    }

    @Test
    public void testHdfsWithAuth() throws Exception {
        testWithAuthentication(initHdfs("/storm/blobstore3"));
    }

    public void testReplication(String str, BlobStore blobStore) throws Exception {
        SettableBlobMeta settableBlobMeta = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
        settableBlobMeta.set_replication_factor(4);
        AtomicOutputStream createBlob = blobStore.createBlob("test", settableBlobMeta, (Subject) null);
        createBlob.write(1);
        createBlob.close();
        assertStoreHasExactly(blobStore, "test");
        Assert.assertEquals("Blobstore replication not matching", blobStore.getBlobReplication("test", (Subject) null), 4L);
        blobStore.deleteBlob("test", (Subject) null);
        Subject subject = getSubject("admin");
        SettableBlobMeta settableBlobMeta2 = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
        settableBlobMeta2.set_replication_factor(4);
        AtomicOutputStream createBlob2 = blobStore.createBlob("test", settableBlobMeta2, subject);
        createBlob2.write(1);
        createBlob2.close();
        assertStoreHasExactly(blobStore, "test");
        Assert.assertEquals("Blobstore replication not matching", blobStore.getBlobReplication("test", subject), 4L);
        blobStore.updateBlobReplication("test", 5, subject);
        Assert.assertEquals("Blobstore replication not matching", blobStore.getBlobReplication("test", subject), 5L);
        blobStore.deleteBlob("test", subject);
        Subject subject2 = getSubject("supervisor");
        SettableBlobMeta settableBlobMeta3 = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
        settableBlobMeta3.set_replication_factor(4);
        AtomicOutputStream createBlob3 = blobStore.createBlob("test", settableBlobMeta3, subject2);
        createBlob3.write(1);
        createBlob3.close();
        assertStoreHasExactly(blobStore, "test");
        Assert.assertEquals("Blobstore replication not matching", blobStore.getBlobReplication("test", subject2), 4L);
        blobStore.updateBlobReplication("test", 5, subject2);
        Assert.assertEquals("Blobstore replication not matching", blobStore.getBlobReplication("test", subject2), 5L);
        blobStore.deleteBlob("test", subject2);
        Subject subject3 = getSubject("createSubject");
        AccessControl accessControl = new AccessControl(AccessControlType.USER, 1);
        AccessControl accessControl2 = new AccessControl(AccessControlType.USER, 4);
        accessControl.set_name("writeSubject");
        accessControl2.set_name("adminSubject");
        SettableBlobMeta settableBlobMeta4 = new SettableBlobMeta(Arrays.asList(accessControl, accessControl2));
        settableBlobMeta4.set_replication_factor(4);
        AtomicOutputStream createBlob4 = blobStore.createBlob("test", settableBlobMeta4, subject3);
        createBlob4.write(1);
        createBlob4.close();
        assertStoreHasExactly(blobStore, "test");
        Assert.assertEquals("Blobstore replication not matching", blobStore.getBlobReplication("test", getSubject("writeSubject")), 4L);
        blobStore.updateBlobReplication("test", 5, getSubject("adminSubject"));
        Assert.assertEquals("Blobstore replication not matching", blobStore.getBlobReplication("test", r0), 5L);
        blobStore.deleteBlob("test", getSubject("createSubject"));
    }

    public Subject getSubject(String str) {
        Subject subject = new Subject();
        subject.getPrincipals().add(new SingleUserPrincipal(str));
        return subject;
    }

    public void testWithAuthentication(BlobStore blobStore) throws Exception {
        Subject subject = getSubject("admin");
        assertStoreHasExactly(blobStore, new String[0]);
        AtomicOutputStream createBlob = blobStore.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
        assertStoreHasExactly(blobStore, "test");
        createBlob.write(1);
        createBlob.close();
        blobStore.deleteBlob("test", subject);
        Subject subject2 = getSubject("supervisor");
        assertStoreHasExactly(blobStore, new String[0]);
        AtomicOutputStream createBlob2 = blobStore.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject2);
        assertStoreHasExactly(blobStore, "test");
        createBlob2.write(1);
        createBlob2.close();
        blobStore.deleteBlob("test", subject2);
        Subject nimbusSubject = getNimbusSubject();
        assertStoreHasExactly(blobStore, new String[0]);
        AtomicOutputStream createBlob3 = blobStore.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), nimbusSubject);
        assertStoreHasExactly(blobStore, "test");
        createBlob3.write(1);
        createBlob3.close();
        blobStore.deleteBlob("test", nimbusSubject);
        Subject subject3 = getSubject("test_subject");
        assertStoreHasExactly(blobStore, new String[0]);
        SettableBlobMeta settableBlobMeta = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
        AtomicOutputStream createBlob4 = blobStore.createBlob("test", settableBlobMeta, subject3);
        createBlob4.write(1);
        createBlob4.close();
        assertStoreHasExactly(blobStore, "test");
        Assert.assertTrue("ACL does not contain WORLD_EVERYTHING", settableBlobMeta.toString().contains("AccessControl(type:OTHER, access:7)"));
        readAssertEqualsWithAuth(blobStore, subject3, "test", 1);
        LOG.info("Deleting test");
        blobStore.deleteBlob("test", subject3);
        assertStoreHasExactly(blobStore, new String[0]);
        LOG.info("Creating test again");
        SettableBlobMeta settableBlobMeta2 = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
        AtomicOutputStream createBlob5 = blobStore.createBlob("test", settableBlobMeta2, subject3);
        createBlob5.write(2);
        createBlob5.close();
        assertStoreHasExactly(blobStore, "test");
        Assert.assertTrue("ACL does not contain WORLD_EVERYTHING", !settableBlobMeta2.toString().contains("AccessControl(type:OTHER, access:7)"));
        readAssertEqualsWithAuth(blobStore, subject3, "test", 2);
        LOG.info("Updating test");
        AtomicOutputStream updateBlob = blobStore.updateBlob("test", subject3);
        updateBlob.write(3);
        updateBlob.close();
        assertStoreHasExactly(blobStore, "test");
        readAssertEqualsWithAuth(blobStore, subject3, "test", 3);
        LOG.info("Updating test again");
        AtomicOutputStream updateBlob2 = blobStore.updateBlob("test", subject3);
        updateBlob2.write(4);
        updateBlob2.flush();
        LOG.info("SLEEPING");
        Thread.sleep(2L);
        assertStoreHasExactly(blobStore, "test");
        readAssertEqualsWithAuth(blobStore, subject3, "test", 3);
        Subject subject4 = new Subject();
        SettableBlobMeta settableBlobMeta3 = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
        LOG.info("Creating test");
        AtomicOutputStream createBlob6 = blobStore.createBlob("test-empty-subject-WE", settableBlobMeta3, subject4);
        createBlob6.write(2);
        createBlob6.close();
        assertStoreHasExactly(blobStore, "test-empty-subject-WE", "test");
        Assert.assertTrue("ACL does not contain WORLD_EVERYTHING", settableBlobMeta3.toString().contains("AccessControl(type:OTHER, access:7)"));
        readAssertEqualsWithAuth(blobStore, subject4, "test-empty-subject-WE", 2);
        Subject subject5 = new Subject();
        SettableBlobMeta settableBlobMeta4 = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
        LOG.info("Creating other");
        AtomicOutputStream createBlob7 = blobStore.createBlob("test-empty-subject-DEF", settableBlobMeta4, subject5);
        createBlob7.write(2);
        createBlob7.close();
        assertStoreHasExactly(blobStore, "test-empty-subject-DEF", "test", "test-empty-subject-WE");
        Assert.assertTrue("ACL does not contain WORLD_EVERYTHING", settableBlobMeta4.toString().contains("AccessControl(type:OTHER, access:7)"));
        readAssertEqualsWithAuth(blobStore, subject5, "test-empty-subject-DEF", 2);
        if (blobStore instanceof HdfsBlobStore) {
            ((HdfsBlobStore) blobStore).fullCleanup(1L);
        } else {
            Assert.fail("Error the blobstore is of unknowntype");
        }
        try {
            createBlob7.close();
        } catch (IOException e) {
        }
    }

    public void testBasic(BlobStore blobStore) throws Exception {
        assertStoreHasExactly(blobStore, new String[0]);
        LOG.info("Creating test");
        SettableBlobMeta settableBlobMeta = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
        AtomicOutputStream createBlob = blobStore.createBlob("test", settableBlobMeta, (Subject) null);
        createBlob.write(1);
        createBlob.close();
        assertStoreHasExactly(blobStore, "test");
        Assert.assertTrue("ACL does not contain WORLD_EVERYTHING", settableBlobMeta.toString().contains("AccessControl(type:OTHER, access:7)"));
        readAssertEquals(blobStore, "test", 1);
        LOG.info("Deleting test");
        blobStore.deleteBlob("test", (Subject) null);
        assertStoreHasExactly(blobStore, new String[0]);
        SettableBlobMeta settableBlobMeta2 = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
        LOG.info("Creating test again");
        AtomicOutputStream createBlob2 = blobStore.createBlob("test", settableBlobMeta2, (Subject) null);
        createBlob2.write(2);
        createBlob2.close();
        assertStoreHasExactly(blobStore, "test");
        readAssertEquals(blobStore, "test", 2);
        LOG.info("Updating test");
        AtomicOutputStream updateBlob = blobStore.updateBlob("test", (Subject) null);
        updateBlob.write(3);
        updateBlob.close();
        assertStoreHasExactly(blobStore, "test");
        readAssertEquals(blobStore, "test", 3);
        LOG.info("Updating test again");
        AtomicOutputStream updateBlob2 = blobStore.updateBlob("test", (Subject) null);
        updateBlob2.write(4);
        updateBlob2.flush();
        LOG.info("SLEEPING");
        Thread.sleep(2L);
        if (blobStore instanceof HdfsBlobStore) {
            ((HdfsBlobStore) blobStore).fullCleanup(1L);
        } else {
            Assert.fail("Error the blobstore is of unknowntype");
        }
        try {
            updateBlob2.close();
        } catch (IOException e) {
        }
    }

    public void testMultiple(BlobStore blobStore) throws Exception {
        assertStoreHasExactly(blobStore, new String[0]);
        LOG.info("Creating test");
        AtomicOutputStream createBlob = blobStore.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), (Subject) null);
        createBlob.write(1);
        createBlob.close();
        assertStoreHasExactly(blobStore, "test");
        readAssertEquals(blobStore, "test", 1);
        LOG.info("Creating other");
        AtomicOutputStream createBlob2 = blobStore.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), (Subject) null);
        createBlob2.write(2);
        createBlob2.close();
        assertStoreHasExactly(blobStore, "test", "other");
        readAssertEquals(blobStore, "test", 1);
        readAssertEquals(blobStore, "other", 2);
        LOG.info("Updating other");
        AtomicOutputStream updateBlob = blobStore.updateBlob("other", (Subject) null);
        updateBlob.write(5);
        updateBlob.close();
        assertStoreHasExactly(blobStore, "test", "other");
        readAssertEquals(blobStore, "test", 1);
        readAssertEquals(blobStore, "other", 5);
        LOG.info("Deleting test");
        blobStore.deleteBlob("test", (Subject) null);
        assertStoreHasExactly(blobStore, "other");
        readAssertEquals(blobStore, "other", 5);
        LOG.info("Creating test again");
        AtomicOutputStream createBlob3 = blobStore.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING), (Subject) null);
        createBlob3.write(2);
        createBlob3.close();
        assertStoreHasExactly(blobStore, "test", "other");
        readAssertEquals(blobStore, "test", 2);
        readAssertEquals(blobStore, "other", 5);
        LOG.info("Updating test");
        AtomicOutputStream updateBlob2 = blobStore.updateBlob("test", (Subject) null);
        updateBlob2.write(3);
        updateBlob2.close();
        assertStoreHasExactly(blobStore, "test", "other");
        readAssertEquals(blobStore, "test", 3);
        readAssertEquals(blobStore, "other", 5);
        LOG.info("Deleting other");
        blobStore.deleteBlob("other", (Subject) null);
        assertStoreHasExactly(blobStore, "test");
        readAssertEquals(blobStore, "test", 3);
        LOG.info("Updating test again");
        AtomicOutputStream updateBlob3 = blobStore.updateBlob("test", (Subject) null);
        updateBlob3.write(4);
        updateBlob3.flush();
        LOG.info("SLEEPING");
        Thread.sleep(2L);
        if (blobStore instanceof HdfsBlobStore) {
            ((HdfsBlobStore) blobStore).fullCleanup(1L);
        } else {
            Assert.fail("Error the blobstore is of unknowntype");
        }
        assertStoreHasExactly(blobStore, "test");
        readAssertEquals(blobStore, "test", 3);
        try {
            updateBlob3.close();
        } catch (IOException e) {
        }
    }
}
