package org.apache.samza.system.kinesis;

import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.kinesis.consumer.KinesisSystemConsumer;

/* loaded from: input_file:org/apache/samza/system/kinesis/KinesisSystemFactory.class */
public class KinesisSystemFactory implements SystemFactory {
    public SystemConsumer getConsumer(String str, Config config, MetricsRegistry metricsRegistry) {
        return new KinesisSystemConsumer(str, new KinesisConfig(config), metricsRegistry);
    }

    public SystemProducer getProducer(String str, Config config, MetricsRegistry metricsRegistry) {
        return null;
    }

    public SystemAdmin getAdmin(String str, Config config) {
        validateConfig(str, config);
        return new KinesisSystemAdmin(str, new KinesisConfig(config));
    }

    protected void validateConfig(String str, Config config) {
        JobConfig jobConfig = new JobConfig(config);
        if (!jobConfig.getSystemStreamPartitionGrouperFactory().equals(AllSspToSingleTaskGrouperFactory.class.getCanonicalName())) {
            throw new ConfigException(String.format("Incorrect Grouper %s used for KinesisSystemConsumer %s. Please set the %s config to %s.", jobConfig.getSystemStreamPartitionGrouperFactory(), str, "job.systemstreampartition.grouper.factory", AllSspToSingleTaskGrouperFactory.class.getCanonicalName()));
        }
        if (new TaskConfig(config).getBroadcastSystemStreams().stream().anyMatch(systemStream -> {
            return str.equals(systemStream.getSystem());
        })) {
            throw new ConfigException("Kinesis streams cannot be configured as broadcast streams.");
        }
        KinesisConfig kinesisConfig = new KinesisConfig(config);
        kinesisConfig.getKinesisStreams(str).forEach(str2 -> {
            if (new StreamConfig(kinesisConfig).getBootstrapEnabled(new SystemStream(str, str2))) {
                throw new ConfigException("Kinesis streams cannot be configured as bootstrap streams.");
            }
        });
    }
}
