package dev.responsive.api;

import com.datastax.oss.driver.api.core.CqlSession;
import dev.responsive.api.config.ResponsiveConfig;
import dev.responsive.internal.clients.ResponsiveKafkaClientSupplier;
import dev.responsive.internal.config.InternalConfigs;
import dev.responsive.internal.config.ResponsiveStreamsConfig;
import dev.responsive.internal.db.CassandraClient;
import dev.responsive.internal.db.CassandraClientFactory;
import dev.responsive.internal.db.DefaultCassandraClientFactory;
import dev.responsive.internal.metrics.ResponsiveStateListener;
import dev.responsive.internal.stores.ResponsiveRestoreListener;
import dev.responsive.internal.stores.ResponsiveStoreRegistry;
import dev.responsive.internal.utils.SharedClients;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/api/ResponsiveKafkaStreams.class */
public class ResponsiveKafkaStreams extends KafkaStreams {
    private static final Logger LOG = LoggerFactory.getLogger(ResponsiveKafkaStreams.class);
    private final ResponsiveStateListener responsiveStateListener;
    private final ResponsiveRestoreListener responsiveRestoreListener;
    private final SharedClients sharedClients;

    public ResponsiveKafkaStreams(Topology topology, Map<?, ?> map) {
        this(topology, map, Time.SYSTEM);
    }

    public ResponsiveKafkaStreams(Topology topology, Map<?, ?> map, KafkaClientSupplier kafkaClientSupplier) {
        this(topology, map, kafkaClientSupplier, Time.SYSTEM);
    }

    public ResponsiveKafkaStreams(Topology topology, Map<?, ?> map, Time time) {
        this(topology, map, new DefaultKafkaClientSupplier(), time);
    }

    public ResponsiveKafkaStreams(Topology topology, Map<?, ?> map, KafkaClientSupplier kafkaClientSupplier, Time time) {
        this(topology, ResponsiveConfig.loggedConfig(map), ResponsiveStreamsConfig.streamsConfig(map), kafkaClientSupplier, time, new DefaultCassandraClientFactory());
    }

    protected ResponsiveKafkaStreams(Topology topology, ResponsiveConfig responsiveConfig, StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier, Time time, CassandraClientFactory cassandraClientFactory) {
        this(topology, responsiveConfig, streamsConfig, kafkaClientSupplier, time, new ResponsiveStoreRegistry(), createMetrics(streamsConfig), cassandraClientFactory, cassandraClientFactory.createCqlSession(responsiveConfig));
    }

    private ResponsiveKafkaStreams(Topology topology, ResponsiveConfig responsiveConfig, StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier, Time time, ResponsiveStoreRegistry responsiveStoreRegistry, Metrics metrics, CassandraClientFactory cassandraClientFactory, CqlSession cqlSession) {
        this(topology, responsiveConfig, new ResponsiveKafkaClientSupplier(kafkaClientSupplier, streamsConfig, responsiveStoreRegistry, metrics), time, responsiveStoreRegistry, metrics, cassandraClientFactory.createCassandraClient(cqlSession, responsiveConfig));
    }

    private ResponsiveKafkaStreams(Topology topology, ResponsiveConfig responsiveConfig, ResponsiveKafkaClientSupplier responsiveKafkaClientSupplier, Time time, ResponsiveStoreRegistry responsiveStoreRegistry, Metrics metrics, CassandraClient cassandraClient) {
        this(topology, responsiveConfig, responsiveKafkaClientSupplier, time, responsiveStoreRegistry, metrics, new SharedClients(cassandraClient, responsiveKafkaClientSupplier.getAdmin(responsiveConfig.originals())));
    }

    private ResponsiveKafkaStreams(Topology topology, ResponsiveConfig responsiveConfig, ResponsiveKafkaClientSupplier responsiveKafkaClientSupplier, Time time, ResponsiveStoreRegistry responsiveStoreRegistry, Metrics metrics, SharedClients sharedClients) {
        super(topology, propsWithOverrides(responsiveConfig.originals(), sharedClients, responsiveStoreRegistry, topology.describe()), responsiveKafkaClientSupplier, time);
        try {
            ResponsiveStreamsConfig.validateStreamsConfig(this.applicationConfigs);
            this.sharedClients = sharedClients;
            String string = this.applicationConfigs.getString("application.id");
            this.responsiveRestoreListener = new ResponsiveRestoreListener(metrics);
            this.responsiveStateListener = new ResponsiveStateListener(metrics, string, this.clientId);
            super.setGlobalStateRestoreListener(this.responsiveRestoreListener);
            super.setStateListener(this.responsiveStateListener);
        } catch (ConfigException e) {
            throw new StreamsException("Configuration error, please check your properties");
        }
    }

    private static Metrics createMetrics(StreamsConfig streamsConfig) {
        MetricConfig timeWindow = new MetricConfig().samples(streamsConfig.getInt("metrics.num.samples").intValue()).recordLevel(Sensor.RecordingLevel.forName(streamsConfig.getString("metrics.recording.level"))).timeWindow(streamsConfig.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS);
        JmxReporter jmxReporter = new JmxReporter();
        jmxReporter.configure(streamsConfig.originals());
        return new Metrics(timeWindow, Collections.singletonList(jmxReporter), Time.SYSTEM, new KafkaMetricsContext("dev.responsive", new HashMap()));
    }

    private static Properties propsWithOverrides(Map<?, ?> map, SharedClients sharedClients, ResponsiveStoreRegistry responsiveStoreRegistry, TopologyDescription topologyDescription) {
        Properties properties = new Properties();
        properties.putAll(map);
        properties.putAll(new InternalConfigs.Builder().withCassandraClient(sharedClients.cassandraClient).withKafkaAdmin(sharedClients.admin).withStoreRegistry(responsiveStoreRegistry).withTopologyDescription(topologyDescription).build());
        Object obj = map.get("internal.task.assignor.class");
        if (obj == null) {
            properties.put("internal.task.assignor.class", ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE);
        } else if (!ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE.equals(obj.toString())) {
            String format = String.format("Invalid Streams configuration value for '%s': got %s, expected '%s'", "internal.task.assignor.class", obj, ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE);
            LOG.error(format);
            throw new ConfigException(format);
        }
        return properties;
    }

    public void setStateListener(KafkaStreams.StateListener stateListener) {
        this.responsiveStateListener.registerUserStateListener(stateListener);
    }

    public KafkaStreams.StateListener stateListener() {
        return this.responsiveStateListener.userStateListener();
    }

    public void setGlobalStateRestoreListener(StateRestoreListener stateRestoreListener) {
        this.responsiveRestoreListener.registerUserRestoreListener(stateRestoreListener);
    }

    public StateRestoreListener stateRestoreListener() {
        return this.responsiveRestoreListener.userRestoreListener();
    }

    private void closeInternal() {
        this.responsiveStateListener.close();
        this.sharedClients.closeAll();
    }

    public void close() {
        super.close();
        closeInternal();
    }

    public boolean close(Duration duration) {
        boolean close = super.close(duration);
        closeInternal();
        return close;
    }

    public boolean close(KafkaStreams.CloseOptions closeOptions) {
        boolean close = super.close(closeOptions);
        closeInternal();
        return close;
    }
}
