package co.cask.cdap.messaging.store.leveldb;

import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.utils.DirUtils;
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.messaging.store.MessageTable;
import co.cask.cdap.messaging.store.MetadataTable;
import co.cask.cdap.messaging.store.PayloadTable;
import co.cask.cdap.messaging.store.TableFactory;
import co.cask.cdap.proto.id.NamespaceId;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Threads;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.impl.Iq80DBFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/messaging/store/leveldb/LevelDBTableFactory.class */
public final class LevelDBTableFactory implements TableFactory {
    private static final Logger LOG = LoggerFactory.getLogger(LevelDBTableFactory.class);
    private static final Iq80DBFactory LEVEL_DB_FACTORY = Iq80DBFactory.factory;
    private final File baseDir;
    private final Options dbOptions;
    private LevelDBMetadataTable metadataTable;
    private LevelDBMessageTable messageTable;
    private LevelDBPayloadTable payloadTable;

    /* loaded from: input_file:co/cask/cdap/messaging/store/leveldb/LevelDBTableFactory$DataCleanup.class */
    private class DataCleanup implements Runnable {
        private DataCleanup() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (LevelDBTableFactory.this.metadataTable == null || LevelDBTableFactory.this.payloadTable == null || LevelDBTableFactory.this.messageTable == null) {
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            try {
                CloseableIterator<TopicMetadata> scanTopics = LevelDBTableFactory.this.metadataTable.scanTopics();
                Throwable th = null;
                while (scanTopics.hasNext()) {
                    try {
                        try {
                            TopicMetadata topicMetadata = (TopicMetadata) scanTopics.next();
                            LevelDBTableFactory.this.messageTable.pruneMessages(topicMetadata, currentTimeMillis);
                            LevelDBTableFactory.this.payloadTable.pruneMessages(topicMetadata, currentTimeMillis);
                        } finally {
                        }
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                }
                if (scanTopics != null) {
                    if (0 != 0) {
                        try {
                            scanTopics.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        scanTopics.close();
                    }
                }
            } catch (IOException e) {
                LevelDBTableFactory.LOG.debug("Unable to perform data cleanup in TMS LevelDB tables", e);
            }
        }
    }

    @VisibleForTesting
    @Inject
    public LevelDBTableFactory(CConfiguration cConfiguration) {
        this.baseDir = new File(cConfiguration.get("messaging.local.data.dir"));
        this.dbOptions = new Options().blockSize(cConfiguration.getInt("data.local.storage.blocksize", 1024)).cacheSize(cConfiguration.getLong("data.local.storage.cachesize", 104857600L)).errorIfExists(false).createIfMissing(true);
        Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("leveldb-tms-data-cleanup")).scheduleAtFixedRate(new DataCleanup(), 0L, Long.parseLong(cConfiguration.get("messaging.local.data.cleanup.frequency.secs")), TimeUnit.SECONDS);
    }

    @Override // co.cask.cdap.messaging.store.TableFactory
    public synchronized MetadataTable createMetadataTable(String str) throws IOException {
        if (this.metadataTable != null) {
            return this.metadataTable;
        }
        File ensureDirExists = ensureDirExists(new File(this.baseDir, NamespaceId.SYSTEM.getNamespace() + "." + str));
        this.metadataTable = new LevelDBMetadataTable(LEVEL_DB_FACTORY.open(ensureDirExists, this.dbOptions));
        LOG.info("Messaging metadata table created at {}", ensureDirExists);
        return this.metadataTable;
    }

    @Override // co.cask.cdap.messaging.store.TableFactory
    public synchronized MessageTable createMessageTable(String str) throws IOException {
        if (this.messageTable != null) {
            return this.messageTable;
        }
        File ensureDirExists = ensureDirExists(new File(this.baseDir, NamespaceId.SYSTEM.getNamespace() + "." + str));
        this.messageTable = new LevelDBMessageTable(LEVEL_DB_FACTORY.open(ensureDirExists, this.dbOptions));
        LOG.info("Messaging message table created at {}", ensureDirExists);
        return this.messageTable;
    }

    @Override // co.cask.cdap.messaging.store.TableFactory
    public synchronized PayloadTable createPayloadTable(String str) throws IOException {
        if (this.payloadTable != null) {
            return this.payloadTable;
        }
        File ensureDirExists = ensureDirExists(new File(this.baseDir, NamespaceId.SYSTEM.getNamespace() + "." + str));
        this.payloadTable = new LevelDBPayloadTable(LEVEL_DB_FACTORY.open(ensureDirExists, this.dbOptions));
        LOG.info("Messaging payload table created at {}", ensureDirExists);
        return this.payloadTable;
    }

    private File ensureDirExists(File file) throws IOException {
        if (DirUtils.mkdirs(file)) {
            return file;
        }
        throw new IOException("Failed to create local directory " + file + " for the messaging system.");
    }
}
