package co.cask.cdap.internal.app.runtime.flow;

import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.QueueConsumer;
import co.cask.cdap.data2.registry.RuntimeUsageRegistry;
import co.cask.cdap.data2.transaction.stream.StreamConsumer;
import co.cask.cdap.internal.app.runtime.DataFabricFacade;
import co.cask.cdap.proto.id.EntityId;
import co.cask.cdap.proto.id.StreamId;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import java.io.Closeable;
import java.io.IOException;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@NotThreadSafe
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/flow/ConsumerSupplier.class */
public final class ConsumerSupplier<T> implements Supplier<T>, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerSupplier.class);
    private final DataFabricFacade dataFabricFacade;
    private final QueueName queueName;
    private final int numGroups;
    private final RuntimeUsageRegistry runtimeUsageRegistry;
    private final Iterable<? extends EntityId> owners;
    private ConsumerConfig consumerConfig;
    private Closeable consumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> ConsumerSupplier<T> create(Iterable<? extends EntityId> iterable, RuntimeUsageRegistry runtimeUsageRegistry, DataFabricFacade dataFabricFacade, QueueName queueName, ConsumerConfig consumerConfig) {
        return create(iterable, runtimeUsageRegistry, dataFabricFacade, queueName, consumerConfig, -1);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> ConsumerSupplier<T> create(Iterable<? extends EntityId> iterable, RuntimeUsageRegistry runtimeUsageRegistry, DataFabricFacade dataFabricFacade, QueueName queueName, ConsumerConfig consumerConfig, int i) {
        return new ConsumerSupplier<>(iterable, runtimeUsageRegistry, dataFabricFacade, queueName, consumerConfig, i);
    }

    private ConsumerSupplier(Iterable<? extends EntityId> iterable, RuntimeUsageRegistry runtimeUsageRegistry, DataFabricFacade dataFabricFacade, QueueName queueName, ConsumerConfig consumerConfig, int i) {
        this.owners = iterable;
        this.runtimeUsageRegistry = runtimeUsageRegistry;
        this.dataFabricFacade = dataFabricFacade;
        this.queueName = queueName;
        this.numGroups = i;
        this.consumerConfig = consumerConfig;
        open(consumerConfig.getGroupSize());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void open(int i) {
        try {
            close();
            ConsumerConfig consumerConfig = this.consumerConfig;
            if (i != consumerConfig.getGroupSize()) {
                consumerConfig = new ConsumerConfig(this.consumerConfig.getGroupId(), this.consumerConfig.getInstanceId(), i, this.consumerConfig.getDequeueStrategy(), this.consumerConfig.getHashKey());
            }
            if (this.queueName.isQueue()) {
                QueueConsumer createConsumer = this.dataFabricFacade.createConsumer(this.queueName, consumerConfig, this.numGroups);
                this.consumerConfig = createConsumer.getConfig();
                this.consumer = createConsumer;
            } else {
                StreamId streamId = this.queueName.toStreamId();
                for (EntityId entityId : this.owners) {
                    try {
                        this.runtimeUsageRegistry.register(entityId, streamId);
                    } catch (Exception e) {
                        LOG.warn("Failed to register usage of {} -> {}", new Object[]{entityId, streamId, e});
                    }
                }
                StreamConsumer createStreamConsumer = this.dataFabricFacade.createStreamConsumer(this.queueName.toStreamId(), consumerConfig);
                this.consumerConfig = createStreamConsumer.getConsumerConfig();
                this.consumer = createStreamConsumer;
            }
        } catch (Exception e2) {
            throw Throwables.propagate(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueueName getQueueName() {
        return this.queueName;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            if (this.consumer != null) {
                this.consumer.close();
            }
        } catch (Exception e) {
            LOG.warn("Fail to close queue consumer.", e);
        }
        this.consumer = null;
    }

    public T get() {
        return (T) this.consumer;
    }
}
