package co.cask.cdap.data2.transaction.stream.inmemory;

import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.QueueClientFactory;
import co.cask.cdap.data2.transaction.queue.inmemory.InMemoryQueueService;
import co.cask.cdap.data2.transaction.stream.QueueToStreamConsumer;
import co.cask.cdap.data2.transaction.stream.StreamConsumer;
import co.cask.cdap.data2.transaction.stream.StreamConsumerFactory;
import com.google.inject.Inject;
import java.io.IOException;

/* loaded from: input_file:co/cask/cdap/data2/transaction/stream/inmemory/InMemoryStreamConsumerFactory.class */
public final class InMemoryStreamConsumerFactory implements StreamConsumerFactory {
    private final QueueClientFactory queueClientFactory;
    private final InMemoryQueueService queueService;

    @Inject
    public InMemoryStreamConsumerFactory(QueueClientFactory queueClientFactory, InMemoryQueueService inMemoryQueueService) {
        this.queueClientFactory = queueClientFactory;
        this.queueService = inMemoryQueueService;
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumerFactory
    public StreamConsumer create(QueueName queueName, String str, ConsumerConfig consumerConfig) throws IOException {
        return new QueueToStreamConsumer(queueName, consumerConfig, this.queueClientFactory.createConsumer(queueName, consumerConfig, -1));
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumerFactory
    public void dropAll(QueueName queueName, String str, Iterable<Long> iterable) throws IOException {
        int indexOf = str.indexOf(46);
        this.queueService.truncateAllWithPrefix(QueueName.prefixForFlow(str.substring(0, indexOf), str.substring(indexOf + 1)));
    }
}
