package io.axual.streams.proxy.axual;

import io.axual.client.proxy.generic.client.StaticClientProxy;
import io.axual.client.proxy.generic.registry.ProxyChain;
import io.axual.common.annotation.InterfaceStability;
import io.axual.common.tools.MapUtil;
import io.axual.streams.proxy.generic.proxy.StreamsProxy;
import io.axual.streams.proxy.generic.streams.StaticStreamsProxy;
import io.axual.streams.proxy.switching.SwitchingStreamsConfig;
import io.axual.streams.proxy.switching.SwitchingStreamsFactory;
import io.axual.streams.proxy.wrapped.WrappedStreamsConfig;
import io.axual.streams.proxy.wrapped.WrappedStreamsFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:io/axual/streams/proxy/axual/AxualStreams.class */
public class AxualStreams extends StaticStreamsProxy<AxualStreamsConfig> implements StreamsProxy {
    private static final Logger LOG = LoggerFactory.getLogger(AxualStreams.class);

    public AxualStreams(Map<String, Object> map) {
        super(createChain(map));
    }

    public AxualStreams(Properties properties) {
        this((Map<String, Object>) MapUtil.objectToStringMap(properties));
    }

    private static StaticClientProxy.ClientProxyInitializer<StreamsProxy, AxualStreamsConfig> createChain(Map<String, Object> map) {
        AxualStreamsConfig axualStreamsConfig = new AxualStreamsConfig(map);
        boolean containsElement = axualStreamsConfig.getProxyChain().containsElement("SWITCHING");
        String proxyChain = ProxyChain.newBuilder(axualStreamsConfig.getProxyChain()).remove("SWITCHING").build().toString();
        HashMap hashMap = new HashMap(axualStreamsConfig.getDownstreamConfigs());
        hashMap.put("application.id", axualStreamsConfig.getApplicationId());
        hashMap.put(AxualSerdeConfig.BACKING_KEY_SERDE_CONFIG, axualStreamsConfig.getDefaultKeySerde());
        hashMap.put(AxualSerdeConfig.BACKING_VALUE_SERDE_CONFIG, axualStreamsConfig.getDefaultValueSerde());
        hashMap.put(AxualSerdeConfig.KEY_SERDE_CHAIN_CONFIG, proxyChain);
        hashMap.put(AxualSerdeConfig.VALUE_SERDE_CHAIN_CONFIG, proxyChain);
        hashMap.put("default.key.serde", AxualSerde.class);
        hashMap.put("default.value.serde", AxualSerde.class);
        hashMap.put(AxualClientSupplier.CHAIN_CONFIG, proxyChain);
        hashMap.put(WrappedStreamsConfig.KAFKA_CLIENT_SUPPLIER_CONFIG, new AxualClientSupplier(new DefaultKafkaClientSupplier()));
        hashMap.put(WrappedStreamsConfig.UNCAUGHT_EXCEPTION_HANDLER_FACTORY_CONFIG, streams -> {
            return (thread, th) -> {
                LOG.error(String.format("Uncaught exception in AxualStreams thread (%s)", thread.getName()), th);
            };
        });
        if (!containsElement) {
            return new StaticClientProxy.ClientProxyInitializer<>(axualStreamsConfig, new WrappedStreamsFactory().create((Map<String, Object>) hashMap));
        }
        hashMap.put(SwitchingStreamsConfig.BACKING_FACTORY_CONFIG, WrappedStreamsFactory.class.getName());
        return new StaticClientProxy.ClientProxyInitializer<>(axualStreamsConfig, new SwitchingStreamsFactory().create((Map<String, Object>) hashMap));
    }
}
