package co.cask.cdap.logging.framework.distributed;

import co.cask.cdap.api.logging.AppenderContext;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.resource.ResourceBalancerService;
import co.cask.cdap.common.service.RetryOnStartFailureService;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.logging.framework.LogPipelineLoader;
import co.cask.cdap.logging.framework.LogPipelineSpecification;
import co.cask.cdap.logging.meta.CheckpointManagerFactory;
import co.cask.cdap.logging.pipeline.LogProcessorPipelineContext;
import co.cask.cdap.logging.pipeline.kafka.KafkaLogProcessorPipeline;
import co.cask.cdap.logging.pipeline.kafka.KafkaPipelineConfig;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.twill.discovery.DiscoveryService;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.kafka.client.BrokerService;
import org.apache.twill.zookeeper.ZKClient;

/* loaded from: input_file:co/cask/cdap/logging/framework/distributed/DistributedLogFramework.class */
public class DistributedLogFramework extends ResourceBalancerService {
    private static final String SERVICE_NAME = "log.framework";
    private final CConfiguration cConf;
    private final Provider<AppenderContext> contextProvider;
    private final CheckpointManagerFactory checkpointManagerFactory;
    private final BrokerService brokerService;

    @Inject
    DistributedLogFramework(CConfiguration cConfiguration, ZKClient zKClient, DiscoveryService discoveryService, DiscoveryServiceClient discoveryServiceClient, Provider<AppenderContext> provider, CheckpointManagerFactory checkpointManagerFactory, BrokerService brokerService) {
        super(SERVICE_NAME, cConfiguration.getInt("log.publish.num.partitions"), zKClient, discoveryService, discoveryServiceClient);
        this.cConf = cConfiguration;
        this.contextProvider = provider;
        this.checkpointManagerFactory = checkpointManagerFactory;
        this.brokerService = brokerService;
    }

    protected Service createService(Set<Integer> set) {
        Map load = new LogPipelineLoader(this.cConf).load(this.contextProvider);
        int size = load.size();
        final ArrayList arrayList = new ArrayList();
        for (final LogPipelineSpecification logPipelineSpecification : load.values()) {
            final CConfiguration conf = logPipelineSpecification.getConf();
            final AppenderContext context = logPipelineSpecification.getContext();
            long bufferSize = getBufferSize(size, conf, set.size());
            final String str = conf.get("log.kafka.topic");
            final KafkaPipelineConfig kafkaPipelineConfig = new KafkaPipelineConfig(str, set, bufferSize, conf.getLong("log.process.pipeline.event.delay.ms"), conf.getInt("log.process.pipeline.kafka.fetch.size"), conf.getLong("log.process.pipeline.checkpoint.interval.ms"));
            arrayList.add(new RetryOnStartFailureService(new Supplier<Service>() { // from class: co.cask.cdap.logging.framework.distributed.DistributedLogFramework.1
                /* renamed from: get, reason: merged with bridge method [inline-methods] */
                public Service m19get() {
                    return new KafkaLogProcessorPipeline(new LogProcessorPipelineContext(conf, context.getName(), context, context.getMetricsContext(), context.getInstanceId()), DistributedLogFramework.this.checkpointManagerFactory.create(str, logPipelineSpecification.getCheckpointPrefix()), DistributedLogFramework.this.brokerService, kafkaPipelineConfig);
                }
            }, RetryStrategies.fromConfiguration(conf, "system.log.process.")));
        }
        return new AbstractIdleService() { // from class: co.cask.cdap.logging.framework.distributed.DistributedLogFramework.2
            protected void startUp() throws Exception {
                DistributedLogFramework.this.validateAllFutures(Iterables.transform(arrayList, (v0) -> {
                    return v0.start();
                }));
            }

            protected void shutDown() throws Exception {
                DistributedLogFramework.this.validateAllFutures(Iterables.transform(arrayList, (v0) -> {
                    return v0.stop();
                }));
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void validateAllFutures(Iterable<? extends ListenableFuture<?>> iterable) throws Exception {
        Futures.successfulAsList(iterable).get();
        Throwable th = null;
        Iterator<? extends ListenableFuture<?>> it = iterable.iterator();
        while (it.hasNext()) {
            try {
                it.next().get();
            } catch (ExecutionException e) {
                if (th == null) {
                    th = e.getCause();
                } else {
                    th.addSuppressed(e.getCause());
                }
            }
        }
        if (th != null) {
            if (!(th instanceof Exception)) {
                throw new RuntimeException(th);
            }
            throw ((Exception) th);
        }
    }

    private long getBufferSize(int i, CConfiguration cConfiguration, int i2) {
        long j = cConfiguration.getLong("log.process.pipeline.buffer.size");
        if (j > 0) {
            return j;
        }
        double d = cConfiguration.getDouble("log.process.pipeline.auto.buffer.ratio");
        Preconditions.checkArgument(d > 0.0d && d < 1.0d, "Config %s must be between 0 and 1", new Object[]{"log.process.pipeline.auto.buffer.ratio"});
        long maxMemory = (long) (((Runtime.getRuntime().maxMemory() * d) - (cConfiguration.getInt("log.process.pipeline.kafka.fetch.size") * i2)) / i);
        if (maxMemory > 0) {
            return maxMemory;
        }
        return 1L;
    }
}
