package dev.responsive.kafka.api;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
import dev.responsive.kafka.internal.clients.ResponsiveKafkaClientSupplier;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
import dev.responsive.kafka.internal.config.ResponsiveStreamsConfig;
import dev.responsive.kafka.internal.db.CassandraClientFactory;
import dev.responsive.kafka.internal.db.DefaultCassandraClientFactory;
import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient;
import dev.responsive.kafka.internal.metrics.ClientVersionMetadata;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.metrics.ResponsiveRestoreListener;
import dev.responsive.kafka.internal.metrics.ResponsiveStateListener;
import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry;
import dev.responsive.kafka.internal.utils.SessionClients;
import dev.responsive.kafka.internal.utils.SessionUtil;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/api/ResponsiveKafkaStreams.class */
public class ResponsiveKafkaStreams extends KafkaStreams {
    private static final Logger LOG = LoggerFactory.getLogger(ResponsiveKafkaStreams.class);
    private final ResponsiveMetrics responsiveMetrics;
    private final ResponsiveStateListener responsiveStateListener;
    private final ResponsiveRestoreListener responsiveRestoreListener;
    private final SessionClients sessionClients;

    /* loaded from: input_file:dev/responsive/kafka/api/ResponsiveKafkaStreams$Params.class */
    protected static class Params {
        final Topology topology;
        final ResponsiveConfig responsiveConfig;
        final StreamsConfig streamsConfig;
        final ResponsiveMetrics metrics;
        private SessionClients sessionClients;
        private ResponsiveKafkaClientSupplier responsiveKafkaClientSupplier;
        private Time time = Time.SYSTEM;
        private KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier();
        private CassandraClientFactory cassandraFactory = new DefaultCassandraClientFactory();
        final ResponsiveStoreRegistry storeRegistry = new ResponsiveStoreRegistry();

        public Params(Topology topology, Map<?, ?> map) {
            this.topology = topology;
            this.responsiveConfig = ResponsiveConfig.loggedConfig(map);
            this.streamsConfig = ResponsiveStreamsConfig.streamsConfig(map);
            this.metrics = ResponsiveKafkaStreams.createMetrics(this.streamsConfig);
        }

        public Params withClientSupplier(KafkaClientSupplier kafkaClientSupplier) {
            this.clientSupplier = kafkaClientSupplier;
            return this;
        }

        public Params withCassandraClientFactory(CassandraClientFactory cassandraClientFactory) {
            this.cassandraFactory = cassandraClientFactory;
            return this;
        }

        public Params withTime(Time time) {
            this.time = time;
            return this;
        }

        public Params build() {
            this.responsiveKafkaClientSupplier = new ResponsiveKafkaClientSupplier(this.clientSupplier, this.streamsConfig, this.storeRegistry, this.metrics);
            Admin admin = this.responsiveKafkaClientSupplier.getAdmin(this.responsiveConfig.originals());
            StorageBackend valueOf = StorageBackend.valueOf(this.responsiveConfig.getString(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG).toUpperCase(Locale.ROOT));
            switch (valueOf) {
                case CASSANDRA:
                    this.sessionClients = new SessionClients(Optional.empty(), Optional.of(this.cassandraFactory.createClient(this.cassandraFactory.createCqlSession(this.responsiveConfig), this.responsiveConfig)), admin);
                    break;
                case MONGO_DB:
                    String string = this.responsiveConfig.getString(ResponsiveConfig.STORAGE_HOSTNAME_CONFIG);
                    String string2 = this.responsiveConfig.getString(ResponsiveConfig.CLIENT_ID_CONFIG);
                    Password password = this.responsiveConfig.getPassword(ResponsiveConfig.CLIENT_SECRET_CONFIG);
                    this.sessionClients = new SessionClients(Optional.of(new ResponsiveMongoClient(SessionUtil.connect(string, string2, password == null ? null : password.value()))), Optional.empty(), admin);
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + valueOf);
            }
            return this;
        }
    }

    public ResponsiveKafkaStreams(Topology topology, Map<?, ?> map) {
        this(topology, map, Time.SYSTEM);
    }

    public ResponsiveKafkaStreams(Topology topology, Map<?, ?> map, KafkaClientSupplier kafkaClientSupplier) {
        this(topology, map, kafkaClientSupplier, Time.SYSTEM);
    }

    public ResponsiveKafkaStreams(Topology topology, Map<?, ?> map, Time time) {
        this(topology, map, new DefaultKafkaClientSupplier(), time);
    }

    public ResponsiveKafkaStreams(Topology topology, Map<?, ?> map, KafkaClientSupplier kafkaClientSupplier, Time time) {
        this(new Params(topology, map).withClientSupplier(kafkaClientSupplier).withTime(time).build());
    }

    protected ResponsiveKafkaStreams(Params params) {
        super(params.topology, propsWithOverrides(params.responsiveConfig.originals(), params.sessionClients, params.storeRegistry, params.topology.describe()), params.responsiveKafkaClientSupplier, params.time);
        try {
            ResponsiveStreamsConfig.validateStreamsConfig(this.applicationConfigs);
            this.responsiveMetrics = params.metrics;
            this.sessionClients = params.sessionClients;
            ClientVersionMetadata loadVersionMetadata = ClientVersionMetadata.loadVersionMetadata();
            LOG.info("Responsive Client version: {}", loadVersionMetadata.responsiveClientVersion);
            LOG.info("Responsive Client commit ID: {}", loadVersionMetadata.responsiveClientCommitId);
            this.responsiveMetrics.initializeTags(this.applicationConfigs.getString("application.id"), this.clientId, loadVersionMetadata, this.applicationConfigs.originalsWithPrefix("metrics.context."));
            this.responsiveRestoreListener = new ResponsiveRestoreListener(this.responsiveMetrics);
            this.responsiveStateListener = new ResponsiveStateListener(this.responsiveMetrics);
            this.sessionClients.initialize(this.responsiveMetrics, this.responsiveRestoreListener);
            super.setGlobalStateRestoreListener(this.responsiveRestoreListener);
            super.setStateListener(this.responsiveStateListener);
        } catch (ConfigException e) {
            throw new StreamsException("Configuration error, please check your properties");
        }
    }

    private static ResponsiveMetrics createMetrics(StreamsConfig streamsConfig) {
        MetricConfig timeWindow = new MetricConfig().samples(streamsConfig.getInt("metrics.num.samples").intValue()).recordLevel(Sensor.RecordingLevel.forName(streamsConfig.getString("metrics.recording.level"))).timeWindow(streamsConfig.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS);
        JmxReporter jmxReporter = new JmxReporter();
        jmxReporter.configure(streamsConfig.originals());
        return new ResponsiveMetrics(new Metrics(timeWindow, Collections.singletonList(jmxReporter), Time.SYSTEM, new KafkaMetricsContext(ResponsiveMetrics.RESPONSIVE_METRICS_NAMESPACE, streamsConfig.originalsWithPrefix("metrics.context."))));
    }

    private static Properties propsWithOverrides(Map<?, ?> map, SessionClients sessionClients, ResponsiveStoreRegistry responsiveStoreRegistry, TopologyDescription topologyDescription) {
        Properties properties = new Properties();
        properties.putAll(map);
        properties.putAll(new InternalSessionConfigs.Builder().withSessionClients(sessionClients).withStoreRegistry(responsiveStoreRegistry).withTopologyDescription(topologyDescription).build());
        Object obj = map.get("internal.task.assignor.class");
        if (obj == null) {
            properties.put("internal.task.assignor.class", ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE);
        } else if (!ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE.equals(obj.toString())) {
            String format = String.format("Invalid Streams configuration value for '%s': got %s, expected '%s'", "internal.task.assignor.class", obj, ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE);
            LOG.error(format);
            throw new ConfigException(format);
        }
        return properties;
    }

    public void setStateListener(KafkaStreams.StateListener stateListener) {
        this.responsiveStateListener.registerUserStateListener(stateListener);
    }

    public KafkaStreams.StateListener stateListener() {
        return this.responsiveStateListener.userStateListener();
    }

    public void setGlobalStateRestoreListener(StateRestoreListener stateRestoreListener) {
        this.responsiveRestoreListener.registerUserRestoreListener(stateRestoreListener);
    }

    public StateRestoreListener stateRestoreListener() {
        return this.responsiveRestoreListener.userRestoreListener();
    }

    public Map<MetricName, ? extends Metric> metrics() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.putAll(streamsMetrics());
        linkedHashMap.putAll(responsiveMetrics());
        return Collections.unmodifiableMap(linkedHashMap);
    }

    public Map<MetricName, ? extends Metric> streamsMetrics() {
        return super.metrics();
    }

    public Map<MetricName, ? extends Metric> responsiveMetrics() {
        return this.responsiveMetrics.metrics();
    }

    private void closeInternal() {
        this.responsiveStateListener.close();
        this.sessionClients.closeAll();
    }

    public void close() {
        super.close();
        closeInternal();
    }

    public boolean close(Duration duration) {
        boolean close = super.close(duration);
        closeInternal();
        return close;
    }

    public boolean close(KafkaStreams.CloseOptions closeOptions) {
        boolean close = super.close(closeOptions);
        closeInternal();
        return close;
    }
}
