/*
 * Decompiled with CFR 0.152.
 */
package io.specmesh.kafka;

import com.google.protobuf.MessageLiteOrBuilder;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.serialization.BooleanDeserializer;
import org.apache.kafka.common.serialization.BooleanSerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.FloatDeserializer;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.ShortDeserializer;
import org.apache.kafka.common.serialization.ShortSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.UUIDDeserializer;
import org.apache.kafka.common.serialization.UUIDSerializer;
import org.apache.kafka.common.serialization.VoidDeserializer;
import org.apache.kafka.common.serialization.VoidSerializer;

public final class Clients {
    public static final String SASL_MECHANISM = "sasl.mechanism";
    public static final String SASL_JAAS_CONFIG = "sasl.jaas.config";
    public static final String CONFIG_PROPERTIES = "config.properties";
    public static final String NONE = "none";
    public static final String PLAIN = "PLAIN";
    public static final String SASL_PLAINTEXT = "SASL_PLAINTEXT";
    public static final int TIMEOUT = 30;
    private static final Map<Class<?>, SerdeTypes<?>> STD_SERDE = Map.ofEntries(Map.entry(Long.class, Clients.typeMetaData(LongSerializer.class, LongDeserializer.class, Serdes.LongSerde.class)), Map.entry(Long.TYPE, Clients.typeMetaData(LongSerializer.class, LongDeserializer.class, Serdes.LongSerde.class)), Map.entry(Integer.class, Clients.typeMetaData(IntegerSerializer.class, IntegerDeserializer.class, Serdes.IntegerSerde.class)), Map.entry(Integer.TYPE, Clients.typeMetaData(IntegerSerializer.class, IntegerDeserializer.class, Serdes.IntegerSerde.class)), Map.entry(Short.class, Clients.typeMetaData(ShortSerializer.class, ShortDeserializer.class, Serdes.ShortSerde.class)), Map.entry(Short.TYPE, Clients.typeMetaData(ShortSerializer.class, ShortDeserializer.class, Serdes.ShortSerde.class)), Map.entry(Float.class, Clients.typeMetaData(FloatSerializer.class, FloatDeserializer.class, Serdes.FloatSerde.class)), Map.entry(Float.TYPE, Clients.typeMetaData(FloatSerializer.class, FloatDeserializer.class, Serdes.FloatSerde.class)), Map.entry(Double.class, Clients.typeMetaData(DoubleSerializer.class, DoubleDeserializer.class, Serdes.DoubleSerde.class)), Map.entry(Double.TYPE, Clients.typeMetaData(DoubleSerializer.class, DoubleDeserializer.class, Serdes.DoubleSerde.class)), Map.entry(Boolean.class, Clients.typeMetaData(BooleanSerializer.class, BooleanDeserializer.class, Serdes.BooleanSerde.class)), Map.entry(Boolean.TYPE, Clients.typeMetaData(BooleanSerializer.class, BooleanDeserializer.class, Serdes.BooleanSerde.class)), Map.entry(String.class, Clients.typeMetaData(StringSerializer.class, StringDeserializer.class, Serdes.StringSerde.class)), Map.entry(UUID.class, Clients.typeMetaData(UUIDSerializer.class, UUIDDeserializer.class, Serdes.UUIDSerde.class)), Map.entry(Void.TYPE, Clients.typeMetaData(VoidSerializer.class, VoidDeserializer.class, Serdes.VoidSerde.class)), Map.entry(Void.class, Clients.typeMetaData(VoidSerializer.class, VoidDeserializer.class, Serdes.VoidSerde.class)));

    private Clients() {
    }

    public static Admin adminClient(String brokerUrl, String username, String secret) {
        try {
            HashMap<String, Object> properties = new HashMap<String, Object>();
            properties.put("client.id", UUID.randomUUID().toString());
            properties.put("bootstrap.servers", brokerUrl);
            if (username != null) {
                properties.putAll(Clients.clientSaslAuthProperties(username, secret));
            }
            if (!System.getProperty(CONFIG_PROPERTIES, NONE).equals(NONE)) {
                return Clients.validate(AdminClient.create(Clients.loadPropertiesFile(properties, System.getProperty(CONFIG_PROPERTIES))));
            }
            return Clients.validate(AdminClient.create(properties));
        }
        catch (Exception ex) {
            throw new ClientsException("cannot load:" + brokerUrl + " with username:" + username, ex);
        }
    }

    private static Admin validate(AdminClient adminClient) {
        try {
            adminClient.describeCluster().clusterId().get(30L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            throw new RuntimeException("AdminClient cannot access the cluster (client.describeCluster), check connection-url/credentials/broker logs", e);
        }
        return adminClient;
    }

    private static Map<String, Object> loadPropertiesFile(Map<String, Object> defaultProperties, String propertyFilename) {
        Map<String, Object> map;
        FileInputStream input = new FileInputStream(propertyFilename);
        try {
            Properties prop = new Properties();
            prop.putAll(defaultProperties);
            prop.load(input);
            map = prop.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().toString(), Map.Entry::getValue));
        }
        catch (Throwable throwable) {
            try {
                try {
                    ((InputStream)input).close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (IOException e) {
                throw new RuntimeException("Failed to load properties file:" + propertyFilename, e);
            }
        }
        ((InputStream)input).close();
        return map;
    }

    public static Map<String, Object> clientSaslAuthProperties(String principal, String secret) {
        if (Clients.isPrincipalSpecified(principal)) {
            return Map.of(SASL_MECHANISM, System.getProperty(SASL_MECHANISM, PLAIN), "security.protocol", System.getProperty("security.protocol", SASL_PLAINTEXT), SASL_JAAS_CONFIG, System.getProperty(SASL_JAAS_CONFIG, Clients.buildJaasConfig(principal, secret)));
        }
        return Map.of();
    }

    private static boolean isPrincipalSpecified(String principal) {
        return principal != null && !principal.isBlank();
    }

    private static String buildJaasConfig(String userName, String password) {
        return PlainLoginModule.class.getCanonicalName() + " required username=\"" + userName + "\" password=\"" + password + "\";";
    }

    @Deprecated(forRemoval=true, since="0.10.1")
    public static Optional<SchemaRegistryClient> schemaRegistryClient(boolean srEnabled, String schemaRegistryUrl, String srApiKey, String srApiSecret) {
        if (srEnabled && schemaRegistryUrl != null) {
            return Optional.of(Clients.schemaRegistryClient(schemaRegistryUrl, srApiKey, srApiSecret));
        }
        return Optional.empty();
    }

    public static SchemaRegistryClient schemaRegistryClient(String schemaRegistryUrl, String srApiKey, String srApiSecret) {
        HashMap<String, Object> properties = new HashMap<String, Object>();
        if (srApiKey != null && !srApiKey.isBlank()) {
            properties.put("basic.auth.credentials.source", "USER_INFO");
            properties.put("basic.auth.user.info", srApiKey + ":" + srApiSecret);
        }
        return new CachedSchemaRegistryClient(Objects.requireNonNull(schemaRegistryUrl, "schemaRegistryUrl"), 5, properties);
    }

    public static ClientBuilder builder(String domainId, String serviceId, String bootstrapServers, String schemaRegistryUrl) {
        return new ClientBuilder(domainId, serviceId, bootstrapServers, schemaRegistryUrl);
    }

    public static <K, V> KafkaProducer<K, V> producer(ProducerProperties<K, V> properties) {
        return new KafkaProducer(properties.asMap());
    }

    public static <K, V> KafkaConsumer<K, V> consumer(ConsumerProperties<K, V> properties) {
        return new KafkaConsumer(properties.asMap());
    }

    @Deprecated(forRemoval=true, since="0.15.1")
    public static <K, V> KafkaProducer<K, V> producer(Class<K> keyClass, Class<V> valueClass, Map<String, Object> producerProperties) {
        return new KafkaProducer(producerProperties);
    }

    @SafeVarargs
    @Deprecated(forRemoval=true, since="0.15.1")
    public static Map<String, Object> producerProperties(String domainId, String serviceId, String bootstrapServers, String schemaRegistryUrl, Class<?> keySerializerClass, Class<?> valueSerializerClass, boolean acksAll, Map<String, Object> ... additionalProperties) {
        ClientBuilder builder = Clients.builder(domainId, serviceId, bootstrapServers, schemaRegistryUrl).withProp("use.latest.version", true).withProp("schema.reflection", true);
        for (Map<String, Object> additional : additionalProperties) {
            builder = builder.withProps(additional);
        }
        ProducerProperties<Void, Void> producerProps = builder.producer().withKeyType(Void.class).withValueType(Void.class).withAcks(acksAll).buildProperties();
        HashMap<String, Object> props = new HashMap<String, Object>(producerProps.asMap());
        props.put("key.serializer", keySerializerClass);
        props.put("value.serializer", valueSerializerClass);
        props.remove("avro.remove.java.properties");
        return props;
    }

    @SafeVarargs
    @Deprecated(forRemoval=true, since="0.15.1")
    public static Map<String, Object> kstreamsProperties(String domainId, String serviceId, String bootstrapServers, String schemaRegistryUrl, Class<?> keySerdeClass, Class<?> valueSerdeClass, boolean acksAll, Map<String, Object> ... additionalProperties) {
        ClientBuilder builder = Clients.builder(domainId, serviceId, bootstrapServers, schemaRegistryUrl).withProp("use.latest.version", true).withProp("schema.reflection", true);
        for (Map<String, Object> additional : additionalProperties) {
            builder = builder.withProps(additional);
        }
        KStreamsProperties<Void, Void> kstreamProps = builder.kstreams().withKeyType(Void.class).withValueType(Void.class).withAcks(acksAll).buildProperties();
        HashMap<String, Object> props = new HashMap<String, Object>(kstreamProps.asMap());
        props.put("default.key.serde", keySerdeClass);
        props.put("default.value.serde", valueSerdeClass);
        props.remove("avro.remove.java.properties");
        return props;
    }

    @Deprecated(forRemoval=true, since="0.15.1")
    public static <K, V> KafkaConsumer<K, V> consumer(Class<K> keyClass, Class<V> valueClass, Map<String, Object> consumerProperties) {
        return new KafkaConsumer(consumerProperties);
    }

    @Deprecated(forRemoval=true, since="0.15.1")
    @SafeVarargs
    public static Map<String, Object> consumerProperties(String domainId, String serviceId, String bootstrapServers, String schemaRegistryUrl, Class<?> keyDeserializerClass, Class<?> valueDeserializerClass, boolean autoOffsetResetEarliest, Map<String, Object> ... additionalProperties) {
        ClientBuilder builder = Clients.builder(domainId, serviceId, bootstrapServers, schemaRegistryUrl).withProp("schema.reflection", true);
        for (Map<String, Object> additional : additionalProperties) {
            builder = builder.withProps(additional);
        }
        ConsumerProperties<Void, Void> consumerProps = builder.consumer().withKeyType(Void.class).withValueType(Void.class).withAutoOffsetReset(autoOffsetResetEarliest).buildProperties();
        HashMap<String, Object> props = new HashMap<String, Object>(consumerProps.asMap());
        props.put("key.deserializer", keyDeserializerClass);
        props.put("value.deserializer", valueDeserializerClass);
        return props;
    }

    private static <T> SerdeTypes<T> stdSerdeType(Class<T> type) {
        SerdeTypes<?> found = STD_SERDE.get(type);
        if (found != null) {
            return found;
        }
        throw new ClientsException("Could not determine serde for type: " + type.getName());
    }

    private static <T> SerdeTypes<T> typeMetaData(Class<? extends Serializer<T>> serializer, Class<? extends Deserializer<T>> deserializer, Class<? extends Serde<T>> serde) {
        return new SerdeTypes(serializer, deserializer, serde);
    }

    private static class ClientsException
    extends RuntimeException {
        ClientsException(String message) {
            super(message);
        }

        ClientsException(String message, Exception cause) {
            super(message, cause);
        }
    }

    public static final class ClientBuilder {
        private final String domainId;
        private final String serviceId;
        private final Map<String, ?> commonProps;
        private final Map<String, Object> additional;

        private ClientBuilder(String domainId, String serviceId, String bootstrapServers, String schemaRegistryUrl) {
            this.domainId = Objects.requireNonNull(domainId, "domainId");
            this.serviceId = Objects.requireNonNull(serviceId, "serviceId");
            this.commonProps = Map.of("bootstrap.servers", bootstrapServers, "schema.registry.url", schemaRegistryUrl);
            this.additional = new HashMap<String, Object>();
        }

        private ClientBuilder(String domainId, String serviceId, Map<String, ?> commonProps, Map<String, Object> additional) {
            this.domainId = Objects.requireNonNull(domainId, "domainId");
            this.serviceId = Objects.requireNonNull(serviceId, "serviceId");
            this.commonProps = Map.copyOf(Objects.requireNonNull(commonProps, "commonProps"));
            this.additional = new HashMap<String, Object>(additional);
        }

        public ClientBuilder withProps(Map<String, ?> additional) {
            HashMap<String, Object> newAdditional = new HashMap<String, Object>(this.additional);
            newAdditional.putAll(additional);
            return new ClientBuilder(this.domainId, this.serviceId, this.commonProps, newAdditional);
        }

        public ClientBuilder withProp(String key, Object value) {
            return this.withProps(Map.of(key, value));
        }

        public ProducerBuilder<Void, Void> producer() {
            return new ProducerBuilder<Void, Void>(this);
        }

        public ConsumerBuilder<Void, Void> consumer() {
            return new ConsumerBuilder<Void, Void>(this);
        }

        public KStreamsBuilder<Void, Void> kstreams() {
            return new KStreamsBuilder<Void, Void>(this);
        }

        private String clientIdentifier(String lastPart) {
            return this.domainId + "." + this.serviceId + "." + lastPart;
        }

        private Map<String, ?> overrides(String ... nonOverridableKeys) {
            HashMap<String, Object> allowed = new HashMap<String, Object>(this.additional);
            for (String nonOverridableKey : nonOverridableKeys) {
                allowed.keySet().remove(nonOverridableKey);
            }
            return allowed;
        }

        private Map<String, Object> baseProps() {
            return new HashMap<String, Object>(this.commonProps);
        }
    }

    public static final class ProducerProperties<K, V>
    extends ClientProperties<K, V> {
        private ProducerProperties(Map<String, ?> properties) {
            super(properties);
        }
    }

    public static final class ConsumerProperties<K, V>
    extends ClientProperties<K, V> {
        private ConsumerProperties(Map<String, ?> properties) {
            super(properties);
        }
    }

    public static final class ProducerBuilder<K, V> {
        private static final Map<String, ?> DEFAULT_PRODUCER_PROPS = Map.of("auto.register.schemas", false, "avro.remove.java.properties", true);
        private final ClientBuilder clientBuilder;
        private Optional<Class<? extends Serializer<K>>> keySerializer = Optional.empty();
        private Optional<Class<? extends Serializer<V>>> valSerializer = Optional.empty();
        private boolean acksAll = true;

        private ProducerBuilder(ClientBuilder clientBuilder) {
            this.clientBuilder = Objects.requireNonNull(clientBuilder, "clientBuilder");
        }

        public <T> ProducerBuilder<T, V> withKeyType(Class<T> type) {
            return this.withKeySerializerType(ProducerBuilder.serializerFor(type));
        }

        public <T> ProducerBuilder<T, V> withKeySerializerType(Class<? extends Serializer<T>> type) {
            ProducerBuilder adjusted = this;
            adjusted.keySerializer = Optional.of(type);
            return this;
        }

        public <T> ProducerBuilder<T, V> withKeySerializerType(Class<? extends Serializer> serializerType, Class<T> type) {
            return this.withKeySerializerType(ProducerBuilder.castNonGeneric(serializerType, type));
        }

        public <T> ProducerBuilder<K, T> withValueType(Class<T> type) {
            return this.withValueSerializerType(ProducerBuilder.serializerFor(type));
        }

        public <T> ProducerBuilder<K, T> withValueSerializerType(Class<? extends Serializer<T>> type) {
            ProducerBuilder adjusted = this;
            adjusted.valSerializer = Optional.of(type);
            return this;
        }

        public <T> ProducerBuilder<K, T> withValueSerializerType(Class<? extends Serializer> serializerType, Class<T> type) {
            return this.withValueSerializerType(ProducerBuilder.castNonGeneric(serializerType, type));
        }

        public ProducerBuilder<K, V> withAcks(boolean acksAll) {
            this.acksAll = acksAll;
            return this;
        }

        public ProducerProperties<K, V> buildProperties() {
            Class<? extends Serializer<K>> keySer = this.keySerializer.orElseThrow(() -> new ClientsException("key serializer not set. Call either withKeyType or withKeySerializerType."));
            Class<? extends Serializer<V>> valSer = this.valSerializer.orElseThrow(() -> new ClientsException("value serializer not set. Call either withValueType or withValueSerializerType."));
            Map<String, Object> props = this.clientBuilder.baseProps();
            props.putAll(DEFAULT_PRODUCER_PROPS);
            props.putAll(Map.of("client.id", this.clientBuilder.clientIdentifier("producer"), "key.serializer", keySer, "value.serializer", valSer, "acks", this.acksAll ? "all" : "1"));
            props.putAll(this.clientBuilder.overrides("key.serializer", "value.serializer"));
            return new ProducerProperties(props);
        }

        public Producer<K, V> build() {
            return Clients.producer(this.buildProperties());
        }

        private static <T> Class<? extends Serializer<T>> serializerFor(Class<T> type) {
            if (MessageLiteOrBuilder.class.isAssignableFrom(type)) {
                return ProducerBuilder.castNonGeneric(KafkaProtobufSerializer.class, type);
            }
            if (IndexedRecord.class.isAssignableFrom(type)) {
                return ProducerBuilder.castNonGeneric(KafkaAvroSerializer.class, type);
            }
            return Clients.stdSerdeType(type).serializer;
        }

        private static <T> Class<? extends Serializer<T>> castNonGeneric(Class<? extends Serializer> serializerType, Class<T> type) {
            return serializerType;
        }
    }

    public static final class KStreamsBuilder<K, V> {
        private static final Map<String, ?> DEFAULT_STREAM_PROPS = Map.of();
        private final ClientBuilder clientBuilder;
        private Optional<Class<? extends Serde<K>>> keySerde = Optional.empty();
        private Optional<Class<? extends Serde<V>>> valSerde = Optional.empty();
        private boolean acksAll = true;

        private KStreamsBuilder(ClientBuilder clientBuilder) {
            this.clientBuilder = Objects.requireNonNull(clientBuilder, "clientBuilder");
        }

        public <T> KStreamsBuilder<T, V> withKeyType(Class<T> type) {
            return this.withKeySerdeType(KStreamsBuilder.serdeFor(type));
        }

        public <T> KStreamsBuilder<T, V> withKeySerdeType(Class<? extends Serde<T>> type) {
            KStreamsBuilder adjusted = this;
            adjusted.keySerde = Optional.of(type);
            return this;
        }

        public <T> KStreamsBuilder<T, V> withKeySerdeType(Class<? extends Serde> serdeType, Class<T> type) {
            return this.withKeySerdeType(KStreamsBuilder.castNonGeneric(serdeType, type));
        }

        public <T> KStreamsBuilder<K, T> withValueType(Class<T> type) {
            return this.withValueSerdeType(KStreamsBuilder.serdeFor(type));
        }

        public <T> KStreamsBuilder<K, T> withValueSerdeType(Class<? extends Serde<T>> type) {
            KStreamsBuilder adjusted = this;
            adjusted.valSerde = Optional.of(type);
            return this;
        }

        public <T> KStreamsBuilder<K, T> withValueSerdeType(Class<? extends Serde> serdeType, Class<T> type) {
            return this.withValueSerdeType(KStreamsBuilder.castNonGeneric(serdeType, type));
        }

        public KStreamsBuilder<K, V> withAcks(boolean acksAll) {
            this.acksAll = acksAll;
            return this;
        }

        public KStreamsProperties<K, V> buildProperties() {
            Class<? extends Serde<K>> keySerdeType = this.keySerde.orElseThrow(() -> new ClientsException("key serde not set. Call either withKeyType or withKeySerdeType."));
            Class<? extends Serde<V>> valSerdeType = this.valSerde.orElseThrow(() -> new ClientsException("value serde not set. Call either withSerdeType or withValueSerdeType."));
            Map<String, Object> props = this.clientBuilder.baseProps();
            props.putAll(DEFAULT_STREAM_PROPS);
            props.putAll(ProducerBuilder.DEFAULT_PRODUCER_PROPS);
            props.putAll(Map.of("application.id", this.clientBuilder.domainId + "._private." + this.clientBuilder.serviceId, "client.id", this.clientBuilder.clientIdentifier("client"), "default.key.serde", keySerdeType, "default.value.serde", valSerdeType, "commit.interval.ms", Duration.ofSeconds(10L).toMillis(), "acks", this.acksAll ? "all" : "1"));
            props.putAll(this.clientBuilder.overrides("default.key.serde", "default.value.serde"));
            return new KStreamsProperties(props);
        }

        private static <T> Class<? extends Serde<T>> serdeFor(Class<T> type) {
            if (MessageLiteOrBuilder.class.isAssignableFrom(type)) {
                return KStreamsBuilder.castNonGeneric(KafkaProtobufSerde.class, type);
            }
            if (IndexedRecord.class.isAssignableFrom(type)) {
                return KStreamsBuilder.castNonGeneric(GenericAvroSerde.class, type);
            }
            return Clients.stdSerdeType(type).serde;
        }

        private static <T> Class<? extends Serde<T>> castNonGeneric(Class<? extends Serde> serdeType, Class<T> type) {
            return serdeType;
        }
    }

    public static final class KStreamsProperties<K, V>
    extends ClientProperties<K, V> {
        private KStreamsProperties(Map<String, ?> properties) {
            super(properties);
        }
    }

    public static final class ConsumerBuilder<K, V> {
        private final ClientBuilder clientBuilder;
        private Optional<Class<? extends Deserializer<K>>> keyDeserializer = Optional.empty();
        private Optional<Class<? extends Deserializer<V>>> valDeserializer = Optional.empty();
        private boolean autoOffsetResetEarliest = true;

        private ConsumerBuilder(ClientBuilder clientBuilder) {
            this.clientBuilder = Objects.requireNonNull(clientBuilder, "clientBuilder");
        }

        public <T> ConsumerBuilder<T, V> withKeyType(Class<T> type) {
            return this.withKeyDeserializerType(ConsumerBuilder.deserializerFor(type));
        }

        public <T> ConsumerBuilder<T, V> withKeyDeserializerType(Class<? extends Deserializer<T>> type) {
            ConsumerBuilder adjusted = this;
            adjusted.keyDeserializer = Optional.of(type);
            return this;
        }

        public <T> ConsumerBuilder<T, V> withKeyDeserializerType(Class<? extends Deserializer> deserializerType, Class<T> type) {
            return this.withKeyDeserializerType(ConsumerBuilder.castNonGeneric(deserializerType, type));
        }

        public <T> ConsumerBuilder<K, T> withValueType(Class<T> type) {
            return this.withValueDeserializerType(ConsumerBuilder.deserializerFor(type));
        }

        public <T> ConsumerBuilder<K, T> withValueDeserializerType(Class<? extends Deserializer<T>> type) {
            ConsumerBuilder adjusted = this;
            adjusted.valDeserializer = Optional.of(type);
            return this;
        }

        public <T> ConsumerBuilder<K, T> withValueDeserializerType(Class<? extends Deserializer> deserializerType, Class<T> type) {
            return this.withValueDeserializerType(ConsumerBuilder.castNonGeneric(deserializerType, type));
        }

        public ConsumerBuilder<K, V> withAutoOffsetReset(boolean earliest) {
            this.autoOffsetResetEarliest = earliest;
            return this;
        }

        public ConsumerProperties<K, V> buildProperties() {
            Class<? extends Deserializer<K>> keyDeser = this.keyDeserializer.orElseThrow(() -> new ClientsException("key deserializer not set. Call either withKeyType or withKeyDeserializerType."));
            Class<? extends Deserializer<V>> valDeser = this.valDeserializer.orElseThrow(() -> new ClientsException("value deserializer not set. Call either withValueType or withValueDeserializerType."));
            Map<String, Object> props = this.clientBuilder.baseProps();
            props.putAll(Map.of("client.id", this.clientBuilder.clientIdentifier("consumer"), "group.id", this.clientBuilder.clientIdentifier("consumer-group"), "auto.offset.reset", this.autoOffsetResetEarliest ? "earliest" : "latest", "key.deserializer", keyDeser, "value.deserializer", valDeser));
            props.putAll(this.clientBuilder.overrides("key.deserializer", "value.deserializer"));
            return new ConsumerProperties(props);
        }

        public Consumer<K, V> build() {
            return Clients.consumer(this.buildProperties());
        }

        private static <T> Class<? extends Deserializer<T>> deserializerFor(Class<T> type) {
            if (MessageLiteOrBuilder.class.isAssignableFrom(type)) {
                return ConsumerBuilder.castNonGeneric(KafkaProtobufDeserializer.class, type);
            }
            if (IndexedRecord.class.isAssignableFrom(type)) {
                return ConsumerBuilder.castNonGeneric(KafkaAvroDeserializer.class, type);
            }
            return Clients.stdSerdeType(type).deserializer;
        }

        private static <T> Class<? extends Deserializer<T>> castNonGeneric(Class<? extends Deserializer> deserializerType, Class<T> type) {
            return deserializerType;
        }
    }

    private static final class SerdeTypes<T> {
        private final Class<? extends Serializer<T>> serializer;
        private final Class<? extends Deserializer<T>> deserializer;
        private final Class<? extends Serde<T>> serde;

        SerdeTypes(Class<? extends Serializer<T>> serializer, Class<? extends Deserializer<T>> deserializer, Class<? extends Serde<T>> serde) {
            this.serializer = Objects.requireNonNull(serializer, "serializer");
            this.deserializer = Objects.requireNonNull(deserializer, "deserializer");
            this.serde = Objects.requireNonNull(serde, "serde");
        }
    }

    public static abstract class ClientProperties<K, V> {
        private final Map<String, ?> properties;

        private ClientProperties(Map<String, ?> properties) {
            this.properties = Map.copyOf(Objects.requireNonNull(properties, "properties"));
        }

        public Map<String, Object> asMap() {
            return Map.copyOf(this.properties);
        }

        public Properties asProperties() {
            Properties props = new Properties();
            props.putAll(this.properties);
            return props;
        }
    }
}

