package co.cask.cdap.data2.transaction.queue.leveldb;

import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.dataset2.lib.table.leveldb.LevelDBOrderedTableCore;
import co.cask.cdap.data2.dataset2.lib.table.leveldb.LevelDBOrderedTableService;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.QueueClientFactory;
import co.cask.cdap.data2.queue.QueueConsumer;
import co.cask.cdap.data2.queue.QueueProducer;
import co.cask.cdap.data2.transaction.queue.QueueEvictor;
import co.cask.cdap.data2.transaction.queue.QueueMetrics;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Threads;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/leveldb/LevelDBQueueClientFactory.class */
public final class LevelDBQueueClientFactory implements QueueClientFactory {
    private static final int MAX_EVICTION_THREAD_POOL_SIZE = 10;
    private static final int EVICTION_THREAD_POOL_KEEP_ALIVE_SECONDS = 60;
    private final LevelDBOrderedTableService service;
    private final LevelDBQueueAdmin queueAdmin;
    private final LevelDBStreamAdmin streamAdmin;
    private final ConcurrentMap<String, Object> queueLocks = Maps.newConcurrentMap();
    private final ExecutorService evictionExecutor = createEvictionExecutor();

    @Inject
    public LevelDBQueueClientFactory(LevelDBOrderedTableService levelDBOrderedTableService, LevelDBQueueAdmin levelDBQueueAdmin, LevelDBStreamAdmin levelDBStreamAdmin) throws Exception {
        this.service = levelDBOrderedTableService;
        this.queueAdmin = levelDBQueueAdmin;
        this.streamAdmin = levelDBStreamAdmin;
    }

    @Override // co.cask.cdap.data2.queue.QueueClientFactory
    public QueueProducer createProducer(QueueName queueName) throws IOException {
        return createProducer(queueName, QueueMetrics.NOOP_QUEUE_METRICS);
    }

    @Override // co.cask.cdap.data2.queue.QueueClientFactory
    public QueueConsumer createConsumer(QueueName queueName, ConsumerConfig consumerConfig, int i) throws IOException {
        LevelDBOrderedTableCore levelDBOrderedTableCore = new LevelDBOrderedTableCore(ensureTableExists(queueName).getActualTableName(queueName), this.service);
        return new LevelDBQueueConsumer(levelDBOrderedTableCore, getQueueLock(queueName.toString()), consumerConfig, queueName, (i <= 0 || consumerConfig.getInstanceId() != 0) ? QueueEvictor.NOOP : new LevelDBQueueEvictor(levelDBOrderedTableCore, queueName, i, this.evictionExecutor));
    }

    @Override // co.cask.cdap.data2.queue.QueueClientFactory
    public QueueProducer createProducer(QueueName queueName, QueueMetrics queueMetrics) throws IOException {
        return new LevelDBQueueProducer(new LevelDBOrderedTableCore(ensureTableExists(queueName).getActualTableName(queueName), this.service), queueName, queueMetrics);
    }

    private LevelDBQueueAdmin ensureTableExists(QueueName queueName) throws IOException {
        LevelDBQueueAdmin levelDBQueueAdmin = queueName.isStream() ? this.streamAdmin : this.queueAdmin;
        try {
            levelDBQueueAdmin.create(queueName);
            return levelDBQueueAdmin;
        } catch (Exception e) {
            throw new IOException("Failed to open table " + levelDBQueueAdmin.getActualTableName(queueName), e);
        }
    }

    private Object getQueueLock(String str) {
        Object obj = this.queueLocks.get(str);
        if (obj == null) {
            obj = new Object();
            Object putIfAbsent = this.queueLocks.putIfAbsent(str, obj);
            if (putIfAbsent != null) {
                obj = putIfAbsent;
            }
        }
        return obj;
    }

    private ExecutorService createEvictionExecutor() {
        return new ThreadPoolExecutor(0, MAX_EVICTION_THREAD_POOL_SIZE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), Threads.createDaemonThreadFactory("queue-eviction-%d"), new ThreadPoolExecutor.CallerRunsPolicy());
    }
}
