package dev.responsive.kafka.api;

import com.datastax.oss.driver.api.core.CqlSession;
import dev.responsive.db.CassandraClient;
import dev.responsive.kafka.api.InternalConfigs;
import dev.responsive.kafka.clients.ResponsiveKafkaClientSupplier;
import dev.responsive.kafka.config.ResponsiveConfig;
import dev.responsive.kafka.store.ResponsiveStoreRegistry;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/api/ResponsiveKafkaStreams.class */
public class ResponsiveKafkaStreams extends KafkaStreams {
    private static final Logger LOG = LoggerFactory.getLogger(ResponsiveKafkaStreams.class);
    private KafkaStreams.StateListener stateListener;
    private final CqlSession session;
    private final ScheduledExecutorService executor;
    private final Admin admin;

    public static ResponsiveKafkaStreams create(Topology topology, Map<String, Object> map) {
        ResponsiveStoreRegistry responsiveStoreRegistry = new ResponsiveStoreRegistry();
        return connect(topology, map, new ResponsiveKafkaClientSupplier(map, responsiveStoreRegistry), responsiveStoreRegistry, new DefaultCassandraClientFactory());
    }

    public static ResponsiveKafkaStreams create(Topology topology, Map<String, Object> map, KafkaClientSupplier kafkaClientSupplier) {
        ResponsiveStoreRegistry responsiveStoreRegistry = new ResponsiveStoreRegistry();
        return connect(topology, map, new ResponsiveKafkaClientSupplier(kafkaClientSupplier, map, responsiveStoreRegistry), responsiveStoreRegistry, new DefaultCassandraClientFactory());
    }

    public static ResponsiveKafkaStreams create(Topology topology, Map<String, Object> map, KafkaClientSupplier kafkaClientSupplier, CassandraClientFactory cassandraClientFactory) {
        ResponsiveStoreRegistry responsiveStoreRegistry = new ResponsiveStoreRegistry();
        return connect(topology, map, new ResponsiveKafkaClientSupplier(kafkaClientSupplier, map, responsiveStoreRegistry), responsiveStoreRegistry, cassandraClientFactory);
    }

    private static ResponsiveKafkaStreams connect(Topology topology, Map<String, Object> map, ResponsiveKafkaClientSupplier responsiveKafkaClientSupplier, ResponsiveStoreRegistry responsiveStoreRegistry, CassandraClientFactory cassandraClientFactory) {
        CqlSession createCqlSession = cassandraClientFactory.createCqlSession(new ResponsiveConfig(map));
        Admin admin = responsiveKafkaClientSupplier.getAdmin(map);
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
        return new ResponsiveKafkaStreams(topology, verifiedStreamsConfigs(map, cassandraClientFactory.createCassandraClient(createCqlSession), admin, scheduledThreadPoolExecutor, responsiveStoreRegistry, topology.describe()), responsiveKafkaClientSupplier, createCqlSession, admin, scheduledThreadPoolExecutor);
    }

    private ResponsiveKafkaStreams(Topology topology, StreamsConfig streamsConfig, ResponsiveKafkaClientSupplier responsiveKafkaClientSupplier, CqlSession cqlSession, Admin admin, ScheduledExecutorService scheduledExecutorService) {
        super(topology, streamsConfig, responsiveKafkaClientSupplier);
        this.session = cqlSession;
        this.admin = admin;
        this.executor = scheduledExecutorService;
    }

    private static StreamsConfig verifiedStreamsConfigs(Map<String, Object> map, CassandraClient cassandraClient, Admin admin, ScheduledExecutorService scheduledExecutorService, ResponsiveStoreRegistry responsiveStoreRegistry, TopologyDescription topologyDescription) {
        Properties properties = new Properties();
        properties.putAll(map);
        properties.putAll(new InternalConfigs.Builder().withCassandraClient(cassandraClient).withKafkaAdmin(admin).withExecutorService(scheduledExecutorService).withStoreRegistry(responsiveStoreRegistry).withTopologyDescription(topologyDescription).build());
        Integer num = (Integer) map.get("num.standby.replicas");
        if (num != null && num.intValue() != 0) {
            String format = String.format("Invalid Streams configuration value for '%s': got %d, expected '%d'", "num.standby.replicas", num, 0);
            LOG.error(format);
            throw new ConfigException(format);
        }
        Object obj = map.get("internal.task.assignor.class");
        if (obj == null) {
            properties.put("internal.task.assignor.class", ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE);
        } else if (!ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE.equals(obj.toString())) {
            String format2 = String.format("Invalid Streams configuration value for '%s': got %s, expected '%s'", "internal.task.assignor.class", obj, ResponsiveConfig.TASK_ASSIGNOR_CLASS_OVERRIDE);
            LOG.error(format2);
            throw new ConfigException(format2);
        }
        return new StreamsConfig(properties);
    }

    public void setStateListener(KafkaStreams.StateListener stateListener) {
        super.setStateListener(stateListener);
        this.stateListener = stateListener;
    }

    public KafkaStreams.StateListener stateListener() {
        return this.stateListener;
    }

    private void closeClients() {
        this.session.close();
        this.admin.close();
        this.executor.shutdown();
    }

    public void close() {
        super.close();
        closeClients();
    }

    public boolean close(Duration duration) {
        boolean close = super.close(duration);
        closeClients();
        return close;
    }

    public boolean close(KafkaStreams.CloseOptions closeOptions) {
        boolean close = super.close(closeOptions);
        closeClients();
        return close;
    }
}
