package org.apache.kafka.streams;

import ch.qos.logback.classic.Level;
import ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/StreamsConfig.class */
public class StreamsConfig extends AbstractConfig {
    private final boolean eosEnabled;
    public static final String TOPIC_PREFIX = "topic.";
    public static final String CONSUMER_PREFIX = "consumer.";
    public static final String PRODUCER_PREFIX = "producer.";
    public static final String ADMIN_CLIENT_PREFIX = "admin.";
    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
    public static final String CLIENT_ID_CONFIG = "client.id";
    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
    public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
    public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
    public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";
    public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
    public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
    public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
    public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
    public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
    public static final String RETRIES_CONFIG = "retries";
    public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
    public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
    public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
    private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
    private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) StreamsConfig.class);
    private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS = {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG};
    private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS = {ConsumerConfig.ISOLATION_LEVEL_CONFIG};
    private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS = {ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION};
    public static final String APPLICATION_ID_CONFIG = "application.id";
    private static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
    private static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application.";
    public static final String STATE_DIR_CONFIG = "state.dir";
    private static final String STATE_DIR_DOC = "Directory location for state store.";
    public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
    private static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads";
    private static final String CLIENT_ID_DOC = "An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern '<client.id>-StreamThread-<threadSequenceNumber>-<consumer|producer|restore-consumer>'.";
    public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler";
    private static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.";
    public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
    private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface.";
    private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler";
    private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface.";
    public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor";
    private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.";
    public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde";
    private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface.";
    public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
    private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
    public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
    private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
    public static final String PROCESSING_GUARANTEE_CONFIG = "processing.guarantee";
    public static final String AT_LEAST_ONCE = "at_least_once";
    public static final String EXACTLY_ONCE = "exactly_once";
    private static final String PROCESSING_GUARANTEE_DOC = "The processing guarantee that should be used. Possible values are <code>at_least_once</code> (default) and <code>exactly_once</code>. Note that exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker setting `transaction.state.log.replication.factor`.";
    public static final String APPLICATION_SERVER_CONFIG = "application.server";
    private static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single KafkaStreams application";
    public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
    private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
    public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
    private static final long DEFAULT_COMMIT_INTERVAL_MS = 30000;
    private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor. (Note, if 'processing.guarantee' is set to 'exactly_once', the default value is 100, otherwise the default value is 30000.";
    public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
    private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>org.apache.kafka.streams.processor.PartitionGrouper</code> interface.";
    public static final String POLL_MS_CONFIG = "poll.ms";
    private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
    private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100;
    public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter";
    private static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class or class name that implements the <code>org.apache.kafka.streams.state.RocksDBConfigSetter</code> interface";
    public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
    private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least state.cleanup.delay.ms will be removed";
    public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms";
    private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day";

    @Deprecated
    public static final String KEY_SERDE_CLASS_CONFIG = "key.serde";
    private static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. This config is deprecated, use <code>default.key.serde</code> instead";

    @Deprecated
    public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
    private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface. This config is deprecated, use <code>default.timestamp.extractor</code> instead";

    @Deprecated
    public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde";
    private static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. This config is deprecated, use <code>default.value.serde</code> instead";

    @Deprecated
    public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
    private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management. This config is deprecated and will be ignored as Streams API does not use Zookeeper anymore.";
    private static final ConfigDef CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, APPLICATION_ID_DOC).define("bootstrap.servers", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC).define(REPLICATION_FACTOR_CONFIG, ConfigDef.Type.INT, 1, ConfigDef.Importance.HIGH, REPLICATION_FACTOR_DOC).define(STATE_DIR_CONFIG, ConfigDef.Type.STRING, "/tmp/kafka-streams", ConfigDef.Importance.HIGH, STATE_DIR_DOC).define(CACHE_MAX_BYTES_BUFFERING_CONFIG, ConfigDef.Type.LONG, Long.valueOf(SizeBasedTriggeringPolicy.DEFAULT_MAX_FILE_SIZE), ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, CACHE_MAX_BYTES_BUFFERING_DOC).define("client.id", ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, CLIENT_ID_DOC).define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, ConfigDef.Type.CLASS, LogAndFailExceptionHandler.class.getName(), ConfigDef.Importance.MEDIUM, DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC).define(DEFAULT_KEY_SERDE_CLASS_CONFIG, ConfigDef.Type.CLASS, Serdes.ByteArraySerde.class.getName(), ConfigDef.Importance.MEDIUM, DEFAULT_KEY_SERDE_CLASS_DOC).define(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ConfigDef.Type.CLASS, DefaultProductionExceptionHandler.class.getName(), ConfigDef.Importance.MEDIUM, DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC).define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, ConfigDef.Type.CLASS, FailOnInvalidTimestamp.class.getName(), ConfigDef.Importance.MEDIUM, DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC).define(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ConfigDef.Type.CLASS, Serdes.ByteArraySerde.class.getName(), ConfigDef.Importance.MEDIUM, DEFAULT_VALUE_SERDE_CLASS_DOC).define(NUM_STANDBY_REPLICAS_CONFIG, ConfigDef.Type.INT, 0, ConfigDef.Importance.MEDIUM, NUM_STANDBY_REPLICAS_DOC).define(NUM_STREAM_THREADS_CONFIG, ConfigDef.Type.INT, 1, ConfigDef.Importance.MEDIUM, NUM_STREAM_THREADS_DOC).define(PROCESSING_GUARANTEE_CONFIG, ConfigDef.Type.STRING, AT_LEAST_ONCE, ConfigDef.ValidString.in(AT_LEAST_ONCE, EXACTLY_ONCE), ConfigDef.Importance.MEDIUM, PROCESSING_GUARANTEE_DOC).define("security.protocol", ConfigDef.Type.STRING, "PLAINTEXT", ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC).define(APPLICATION_SERVER_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, APPLICATION_SERVER_DOC).define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, ConfigDef.Type.INT, 1000, ConfigDef.Importance.LOW, BUFFERED_RECORDS_PER_PARTITION_DOC).define(COMMIT_INTERVAL_MS_CONFIG, ConfigDef.Type.LONG, Long.valueOf(DEFAULT_COMMIT_INTERVAL_MS), ConfigDef.Importance.LOW, COMMIT_INTERVAL_MS_DOC).define("connections.max.idle.ms", ConfigDef.Type.LONG, 540000, ConfigDef.Importance.LOW, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC).define("metadata.max.age.ms", ConfigDef.Type.LONG, 300000, ConfigDef.Range.atLeast(0), ConfigDef.Importance.LOW, CommonClientConfigs.METADATA_MAX_AGE_DOC).define("metrics.num.samples", ConfigDef.Type.INT, 2, ConfigDef.Range.atLeast(1), ConfigDef.Importance.LOW, CommonClientConfigs.METRICS_NUM_SAMPLES_DOC).define("metric.reporters", ConfigDef.Type.LIST, "", ConfigDef.Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC).define("metrics.recording.level", ConfigDef.Type.STRING, Sensor.RecordingLevel.INFO.toString(), ConfigDef.ValidString.in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()), ConfigDef.Importance.LOW, CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC).define("metrics.sample.window.ms", ConfigDef.Type.LONG, 30000, ConfigDef.Range.atLeast(0), ConfigDef.Importance.LOW, CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC).define(PARTITION_GROUPER_CLASS_CONFIG, ConfigDef.Type.CLASS, DefaultPartitionGrouper.class.getName(), ConfigDef.Importance.LOW, PARTITION_GROUPER_CLASS_DOC).define(POLL_MS_CONFIG, ConfigDef.Type.LONG, 100, ConfigDef.Importance.LOW, POLL_MS_DOC).define("receive.buffer.bytes", ConfigDef.Type.INT, 32768, ConfigDef.Range.atLeast(0), ConfigDef.Importance.LOW, CommonClientConfigs.RECEIVE_BUFFER_DOC).define("reconnect.backoff.ms", ConfigDef.Type.LONG, 50L, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC).define("reconnect.backoff.max.ms", ConfigDef.Type.LONG, 1000L, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC).define("retries", ConfigDef.Type.INT, 0, ConfigDef.Range.between(0, Integer.MAX_VALUE), ConfigDef.Importance.LOW, CommonClientConfigs.RETRIES_DOC).define("retry.backoff.ms", ConfigDef.Type.LONG, Long.valueOf(EOS_DEFAULT_COMMIT_INTERVAL_MS), ConfigDef.Range.atLeast(0L), ConfigDef.Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC).define("request.timeout.ms", ConfigDef.Type.INT, Integer.valueOf(Level.ERROR_INT), ConfigDef.Range.atLeast(0), ConfigDef.Importance.LOW, CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC).define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, ROCKSDB_CONFIG_SETTER_CLASS_DOC).define("send.buffer.bytes", ConfigDef.Type.INT, 131072, ConfigDef.Range.atLeast(0), ConfigDef.Importance.LOW, CommonClientConfigs.SEND_BUFFER_DOC).define(STATE_CLEANUP_DELAY_MS_CONFIG, ConfigDef.Type.LONG, 600000, ConfigDef.Importance.LOW, STATE_CLEANUP_DELAY_MS_DOC).define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, ConfigDef.Type.LONG, 86400000, ConfigDef.Importance.LOW, WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC).define(KEY_SERDE_CLASS_CONFIG, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, KEY_SERDE_CLASS_DOC).define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, TIMESTAMP_EXTRACTOR_CLASS_DOC).define(VALUE_SERDE_CLASS_CONFIG, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, VALUE_SERDE_CLASS_DOC).define(ZOOKEEPER_CONNECT_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, ZOOKEEPER_CONNECT_DOC);

    /* loaded from: input_file:org/apache/kafka/streams/StreamsConfig$InternalConfig.class */
    public static class InternalConfig {
        public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__";
    }

    public static String consumerPrefix(String str) {
        return CONSUMER_PREFIX + str;
    }

    public static String producerPrefix(String str) {
        return PRODUCER_PREFIX + str;
    }

    public static String adminClientPrefix(String str) {
        return ADMIN_CLIENT_PREFIX + str;
    }

    public static String topicPrefix(String str) {
        return TOPIC_PREFIX + str;
    }

    public static ConfigDef configDef() {
        return new ConfigDef(CONFIG);
    }

    public StreamsConfig(Map<?, ?> map) {
        super(CONFIG, map);
        this.eosEnabled = EXACTLY_ONCE.equals(getString(PROCESSING_GUARANTEE_CONFIG));
    }

    @Override // org.apache.kafka.common.config.AbstractConfig
    protected Map<String, Object> postProcessParsedConfig(Map<String, Object> map) {
        Map<String, Object> postProcessReconnectBackoffConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, map);
        if (EXACTLY_ONCE.equals(map.get(PROCESSING_GUARANTEE_CONFIG)) && !originals().containsKey(COMMIT_INTERVAL_MS_CONFIG)) {
            log.debug("Using {} default value of {} as exactly once is enabled.", COMMIT_INTERVAL_MS_CONFIG, Long.valueOf(EOS_DEFAULT_COMMIT_INTERVAL_MS));
            postProcessReconnectBackoffConfigs.put(COMMIT_INTERVAL_MS_CONFIG, Long.valueOf(EOS_DEFAULT_COMMIT_INTERVAL_MS));
        }
        return postProcessReconnectBackoffConfigs;
    }

    private Map<String, Object> getCommonConsumerConfigs() {
        Map<String, Object> clientPropsWithPrefix = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
        checkIfUnexpectedUserSpecifiedConsumerConfig(clientPropsWithPrefix, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
        checkIfUnexpectedUserSpecifiedConsumerConfig(clientPropsWithPrefix, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
        HashMap hashMap = new HashMap(this.eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
        hashMap.putAll(getClientCustomProps());
        hashMap.putAll(clientPropsWithPrefix);
        hashMap.put("bootstrap.servers", originals().get("bootstrap.servers"));
        hashMap.remove(ZOOKEEPER_CONNECT_CONFIG);
        return hashMap;
    }

    private void checkIfUnexpectedUserSpecifiedConsumerConfig(Map<String, Object> map, String[] strArr) {
        for (String str : strArr) {
            if (map.containsKey(str)) {
                if (CONSUMER_DEFAULT_OVERRIDES.containsKey(str)) {
                    if (!map.get(str).equals(CONSUMER_DEFAULT_OVERRIDES.get(str))) {
                        log.warn(String.format("Unexpected user-specified %s config: %s found. %sUser setting (%s) will be ignored and the Streams default setting (%s) will be used ", ConsumerProtocol.PROTOCOL_TYPE, str, "", map.get(str), CONSUMER_DEFAULT_OVERRIDES.get(str)));
                        map.remove(str);
                    }
                } else if (this.eosEnabled) {
                    if (CONSUMER_EOS_OVERRIDES.containsKey(str)) {
                        if (!map.get(str).equals(CONSUMER_EOS_OVERRIDES.get(str))) {
                            log.warn(String.format("Unexpected user-specified %s config: %s found. %sUser setting (%s) will be ignored and the Streams default setting (%s) will be used ", ConsumerProtocol.PROTOCOL_TYPE, str, "processing.guarantee is set to exactly_once. Hence, ", map.get(str), CONSUMER_EOS_OVERRIDES.get(str)));
                            map.remove(str);
                        }
                    } else if (PRODUCER_EOS_OVERRIDES.containsKey(str) && !map.get(str).equals(PRODUCER_EOS_OVERRIDES.get(str))) {
                        log.warn(String.format("Unexpected user-specified %s config: %s found. %sUser setting (%s) will be ignored and the Streams default setting (%s) will be used ", "producer", str, "processing.guarantee is set to exactly_once. Hence, ", map.get(str), PRODUCER_EOS_OVERRIDES.get(str)));
                        map.remove(str);
                    }
                }
            }
        }
    }

    public Map<String, Object> getConsumerConfigs(String str, String str2) {
        Map<String, Object> commonConsumerConfigs = getCommonConsumerConfigs();
        commonConsumerConfigs.put(APPLICATION_ID_CONFIG, str);
        commonConsumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, str);
        commonConsumerConfigs.put("client.id", str2 + "-consumer");
        commonConsumerConfigs.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
        commonConsumerConfigs.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
        commonConsumerConfigs.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
        commonConsumerConfigs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
        commonConsumerConfigs.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
        commonConsumerConfigs.put(adminClientPrefix("retries"), new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames())).getInt("retries"));
        Map<String, Object> originalsWithPrefix = originalsWithPrefix(TOPIC_PREFIX, false);
        if (originalsWithPrefix.containsKey(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG))) {
            int parseInt = Integer.parseInt(originalsWithPrefix.get(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)).toString());
            Map<String, Object> clientPropsWithPrefix = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
            int parseInt2 = clientPropsWithPrefix.containsKey(ProducerConfig.BATCH_SIZE_CONFIG) ? Integer.parseInt(clientPropsWithPrefix.get(ProducerConfig.BATCH_SIZE_CONFIG).toString()) : new ProducerConfig(new Properties()).getInt(ProducerConfig.BATCH_SIZE_CONFIG).intValue();
            if (parseInt < parseInt2) {
                throw new IllegalArgumentException(String.format("Specified topic segment size %d is is smaller than the configured producer batch size %d, this will cause produced batch not able to be appended to the topic", Integer.valueOf(parseInt), Integer.valueOf(parseInt2)));
            }
        }
        commonConsumerConfigs.putAll(originalsWithPrefix);
        return commonConsumerConfigs;
    }

    public Map<String, Object> getRestoreConsumerConfigs(String str) {
        Map<String, Object> commonConsumerConfigs = getCommonConsumerConfigs();
        commonConsumerConfigs.remove(ConsumerConfig.GROUP_ID_CONFIG);
        commonConsumerConfigs.put("client.id", str + "-restore-consumer");
        commonConsumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
        return commonConsumerConfigs;
    }

    public Map<String, Object> getProducerConfigs(String str) {
        Map<String, Object> clientPropsWithPrefix = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
        checkIfUnexpectedUserSpecifiedConsumerConfig(clientPropsWithPrefix, NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS);
        HashMap hashMap = new HashMap(this.eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);
        hashMap.putAll(getClientCustomProps());
        hashMap.putAll(clientPropsWithPrefix);
        hashMap.put("bootstrap.servers", originals().get("bootstrap.servers"));
        hashMap.put("client.id", str + "-producer");
        return hashMap;
    }

    public Map<String, Object> getAdminConfigs(String str) {
        Map<String, Object> clientPropsWithPrefix = getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames());
        HashMap hashMap = new HashMap();
        hashMap.putAll(getClientCustomProps());
        hashMap.putAll(clientPropsWithPrefix);
        hashMap.put("client.id", str + "-admin");
        return hashMap;
    }

    private Map<String, Object> getClientPropsWithPrefix(String str, Set<String> set) {
        Map<String, Object> clientProps = clientProps(set, originals());
        clientProps.putAll(originalsWithPrefix(str));
        return clientProps;
    }

    private Map<String, Object> getClientCustomProps() {
        Map<String, Object> originals = originals();
        originals.keySet().removeAll(CONFIG.names());
        originals.keySet().removeAll(ConsumerConfig.configNames());
        originals.keySet().removeAll(ProducerConfig.configNames());
        originals.keySet().removeAll(AdminClientConfig.configNames());
        originals.keySet().removeAll(originalsWithPrefix(CONSUMER_PREFIX, false).keySet());
        originals.keySet().removeAll(originalsWithPrefix(PRODUCER_PREFIX, false).keySet());
        originals.keySet().removeAll(originalsWithPrefix(ADMIN_CLIENT_PREFIX, false).keySet());
        return originals;
    }

    @Deprecated
    public Serde keySerde() {
        return defaultKeySerde();
    }

    public Serde defaultKeySerde() {
        Object obj = get(KEY_SERDE_CLASS_CONFIG);
        try {
            Serde serde = (Serde) getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class);
            if (serde == null) {
                obj = get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
                serde = (Serde) getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class);
            }
            serde.configure(originals(), true);
            return serde;
        } catch (Exception e) {
            throw new StreamsException(String.format("Failed to configure key serde %s", obj), e);
        }
    }

    @Deprecated
    public Serde valueSerde() {
        return defaultValueSerde();
    }

    public Serde defaultValueSerde() {
        Object obj = get(VALUE_SERDE_CLASS_CONFIG);
        try {
            Serde serde = (Serde) getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class);
            if (serde == null) {
                obj = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
                serde = (Serde) getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
            }
            serde.configure(originals(), false);
            return serde;
        } catch (Exception e) {
            throw new StreamsException(String.format("Failed to configure value serde %s", obj), e);
        }
    }

    public TimestampExtractor defaultTimestampExtractor() {
        TimestampExtractor timestampExtractor = (TimestampExtractor) getConfiguredInstance(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
        return timestampExtractor == null ? (TimestampExtractor) getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class) : timestampExtractor;
    }

    public DeserializationExceptionHandler defaultDeserializationExceptionHandler() {
        return (DeserializationExceptionHandler) getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
    }

    public ProductionExceptionHandler defaultProductionExceptionHandler() {
        return (ProductionExceptionHandler) getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
    }

    private Map<String, Object> clientProps(Set<String> set, Map<String, Object> map) {
        HashMap hashMap = new HashMap();
        for (String str : set) {
            if (map.containsKey(str)) {
                hashMap.put(str, map.get(str));
            }
        }
        return hashMap;
    }

    public static void main(String[] strArr) {
        System.out.println(CONFIG.toHtmlTable());
    }

    static {
        HashMap hashMap = new HashMap();
        hashMap.put(ProducerConfig.LINGER_MS_CONFIG, "100");
        hashMap.put("retries", 10);
        PRODUCER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(hashMap);
        HashMap hashMap2 = new HashMap(PRODUCER_DEFAULT_OVERRIDES);
        hashMap2.put("retries", Integer.MAX_VALUE);
        hashMap2.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        hashMap2.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(hashMap2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
        hashMap3.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        hashMap3.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        hashMap3.put("internal.leave.group.on.close", false);
        hashMap3.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
        CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(hashMap3);
        HashMap hashMap4 = new HashMap(CONSUMER_DEFAULT_OVERRIDES);
        hashMap4.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
        CONSUMER_EOS_OVERRIDES = Collections.unmodifiableMap(hashMap4);
    }
}
