/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant;

import com.google.common.collect.ImmutableSet;
import com.google.protobuf.InvalidProtocolBufferException;
import io.confluent.kafka.multitenant.BasePhysicalClusterMetadata;
import io.confluent.kafka.multitenant.LogicalClusterMetadata;
import io.confluent.kafka.multitenant.SslCertificateManager;
import io.confluent.kafka.multitenant.TenantLifecycleManager;
import io.confluent.kafka.multitenant.quota.QuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.multitenant.utils.Utils;
import io.confluent.protobuf.cloud.events.v1.LogicalCluster;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicBasedPhysicalClusterMetadata
extends BasePhysicalClusterMetadata {
    private static final Logger LOG = LoggerFactory.getLogger(TopicBasedPhysicalClusterMetadata.class);
    private static final Long CLOSE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30L);
    private static final String LKC_LOAD_METRICS_GROUP_NAME = "confluent-lkc-load-metrics";
    private static final String LKC_METADATA_END_TO_END_LOAD_TIME_SENSOR_NAME = "lkc-metadata-end-to-end-load-time";
    private static final String LKC_METADATA_STARTUP_LOAD_TIME_SENSOR_NAME = "lkc-metadata-startup-load-time";
    static final String LKC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME = "lkc-metadata-end-to-end-load-time-min";
    static final String LKC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME = "lkc-metadata-end-to-end-load-time-max";
    static final String LKC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME = "lkc-metadata-end-to-end-load-time-avg";
    static final String LKC_METADATA_STARTUP_LOAD_TIME_METRIC_NAME = "lkc-metadata-startup-load-time-max";
    private Set<String> kafkaLogicalClusterIds = new HashSet<String>();
    private static final String NUMBER_OF_TENANTS_GROUP_NAME = "confluent-number-of-tenants";
    static final String NUMBER_OF_TENANTS_METRIC_NAME = "number-of-tenants";
    private final AtomicBoolean startedMonitoringDeactivatedClusters = new AtomicBoolean(false);
    private final AtomicInteger numberOfTenantsValue = new AtomicInteger(0);
    private final Gauge<Integer> numberOfTenantsMetric = (config, now) -> this.numberOfTenantsValue.get();
    private final Sensor lkcTimeToLoadEndToEndSensor;
    private final Sensor lkcStartupLoadSensor;
    private final Time time;
    private final ScheduledExecutorService backgroundUpdatesExecutorService;
    private List<String> multitenantListenerNames = Collections.emptyList();
    private final Map<String, LCMPair> logicalClusterMap;
    public TenantLifecycleManager tenantLifecycleManager;
    public SslCertificateManager sslCertificateManager;
    private final AtomicReference<State> sslCertManagerState;
    private final AtomicReference<State> logConsumerState;
    private long maxPartitionRetryDelayMs;
    private String topicName;
    private String topicClientId;
    private long topicLoadTimeoutMs;
    private long updateDeactivatedTenantsIntervalMs;
    private KafkaBasedLog<String, byte[]> lkcLog;

    public TopicBasedPhysicalClusterMetadata(Metrics metrics) {
        this(metrics, Time.SYSTEM);
    }

    public TopicBasedPhysicalClusterMetadata(Metrics metrics, Time time) {
        this.lkcTimeToLoadEndToEndSensor = metrics.sensor(LKC_METADATA_END_TO_END_LOAD_TIME_SENSOR_NAME);
        this.lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The minimum end to end load time of logical cluster metadata in ms"), (MeasurableStat)new Min());
        this.lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The maximum end to end load time of logical cluster metadata in ms"), (MeasurableStat)new Max());
        this.lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The mean end to end load time of logical cluster metadata in ms"), (MeasurableStat)new Avg());
        MetricName numTenantsMetricName = metrics.metricName(NUMBER_OF_TENANTS_METRIC_NAME, NUMBER_OF_TENANTS_GROUP_NAME, "The number of tenants (i.e. logical clusters) in the physical cluster");
        if (!metrics.metrics().containsKey(numTenantsMetricName)) {
            metrics.addMetric(numTenantsMetricName, this.numberOfTenantsMetric);
        }
        this.lkcStartupLoadSensor = metrics.sensor(LKC_METADATA_STARTUP_LOAD_TIME_SENSOR_NAME);
        this.lkcStartupLoadSensor.add(metrics.metricName(LKC_METADATA_STARTUP_LOAD_TIME_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The time it took for the first load of all logical cluster metadata from the topic in ms"), (MeasurableStat)new Max());
        this.time = time;
        this.sslCertManagerState = new AtomicReference<State>(State.NOT_READY);
        this.logConsumerState = new AtomicReference<State>(State.NOT_READY);
        this.logicalClusterMap = new ConcurrentHashMap<String, LCMPair>();
        this.backgroundUpdatesExecutorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(runnable, "cluster-metadata-bg-updates");
            thread.setDaemon(true);
            return thread;
        });
    }

    void configure(ConfluentAdmin adminClient, String brokerId, String sslCertsPath, long deletedDelayMs, long updateDeactivatedTenantsIntervalMs) throws IOException {
        LOG.warn("configure(AdminClient, ConfluentAdmin, String, String) called, shouldn't happen outside tests (Ignore if this is a unittest.)");
        this.multitenantListenerNames = Collections.emptyList();
        this.tenantLifecycleManager = new TenantLifecycleManager(deletedDelayMs, adminClient, this.time);
        this.sslCertificateManager = new SslCertificateManager(brokerId, sslCertsPath, adminClient);
        this.updateDeactivatedTenantsIntervalMs = updateDeactivatedTenantsIntervalMs;
        this.startWatchingSslCertificates();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void configure(Map<String, ?> configs) {
        String sessionUuid = Utils.getBrokerSessionUuid(configs);
        Map map = INSTANCES;
        synchronized (map) {
            BasePhysicalClusterMetadata instance = (BasePhysicalClusterMetadata)INSTANCES.get(sessionUuid);
            if (instance != null) {
                if (this != instance) {
                    throw new UnsupportedOperationException("TopicBasedPhysicalClusterMetadata instance already exists for broker session " + sessionUuid);
                }
                LOG.info("Skipping configuring this instance (broker session {}): Already configured.", (Object)sessionUuid);
                return;
            }
            INSTANCES.put(sessionUuid, this);
        }
        this.topicName = (String)configs.get("confluent.cdc.lkc.metadata.topic");
        if (this.topicName == null || this.topicName.isEmpty()) {
            throw new ConfigException("Config confluent.cdc.lkc.metadata.topic can not be empty when using TopicBasedPhysicalClusterMetadata");
        }
        this.topicClientId = String.format("%s-%s-%s", this.topicName, ConfluentConfigs.ClientType.CONSUMER, configs.get(KafkaConfig.BrokerIdProp()));
        Long timeoutValue = (Long)configs.get("confluent.cdc.api.keys.load.timeout.ms");
        if (timeoutValue == null || timeoutValue <= 0L) {
            throw new ConfigException("Config confluent.cdc.api.keys.load.timeout.ms must be positive integer when using TopicBasedPhysicalClusterMetadata");
        }
        this.topicLoadTimeoutMs = timeoutValue;
        this.multitenantListenerNames = ConfluentConfigs.multitenantListenerNames(configs, null);
        Long reloadDelayValue = (Long)configs.get("multitenant.metadata.reload.delay.ms");
        this.maxPartitionRetryDelayMs = reloadDelayValue == null ? ConfluentConfigs.MULTITENANT_METADATA_RELOAD_DELAY_MS_DEFAULT.longValue() : reloadDelayValue.longValue();
        Long updateIntervalMsValue = (Long)configs.get("multitenant.tenant.delete.check.ms");
        if (updateIntervalMsValue == null) {
            updateIntervalMsValue = ConfluentConfigs.MULTITENANT_TENANT_DELETE_CHECK_MS_DEFAULT;
        }
        this.updateDeactivatedTenantsIntervalMs = updateIntervalMsValue;
        this.tenantLifecycleManager = new TenantLifecycleManager(configs, this.time);
        this.sslCertificateManager = new SslCertificateManager(configs);
        try {
            this.startWatchingSslCertificates();
        }
        catch (IOException ioe) {
            this.close(sessionUuid);
            throw new ConfigException("Failed to start watching the SSL certs watcher: " + ioe.getMessage());
        }
        LOG.warn("Configured and started instance for broker session {}", (Object)sessionUuid);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(String brokerSessionUuid) {
        Map map = INSTANCES;
        synchronized (map) {
            BasePhysicalClusterMetadata instance = (BasePhysicalClusterMetadata)INSTANCES.get(brokerSessionUuid);
            if (instance != null && instance == this) {
                INSTANCES.remove(brokerSessionUuid);
                LOG.info("Removed instance for broker session {}", (Object)brokerSessionUuid);
            } else if (instance != null) {
                LOG.info("Closing instance that doesn't match the instance in the static map with the same broker session {} will not remove that instance from the map.", (Object)brokerSessionUuid);
            }
        }
        this.shutdown();
    }

    public void handleSocketServerInitialized(String endpoint) {
        this.tenantLifecycleManager.createAdminClient(endpoint);
        this.sslCertificateManager.createAdminClient(endpoint);
        this.sslCertificateManager.loadSslCertFiles();
    }

    public Map<Endpoint, CompletableFuture<Void>> start(Map<String, Object> interBrokerClientConfig, Collection<Endpoint> endpoints) {
        CompletableFuture<Void> logStartedFuture;
        if (this.logConsumerState.compareAndSet(State.NOT_READY, State.STARTING)) {
            this.lkcLog = this.createKafkaBasedLog(interBrokerClientConfig);
            try {
                logStartedFuture = CompletableFuture.runAsync(() -> this.startLog());
            }
            catch (Exception e2) {
                this.logConsumerState.set(State.FAILED_TO_START);
                throw new IllegalStateException("Unable to create a future for startLog()", e2);
            }
        } else {
            this.ensureNonTerminalState(this.logConsumerState.get());
            LOG.warn("Trying to start a TopicBasedPhysicalClusterMetadata which was already started!");
            logStartedFuture = CompletableFuture.completedFuture(null);
        }
        return endpoints.stream().collect(Collectors.toMap(Function.identity(), e -> this.multitenantListenerNames.contains(e.listenerName().orElse("")) ? logStartedFuture : CompletableFuture.completedFuture(null)));
    }

    public void start(KafkaBasedLog<String, byte[]> kafkaBasedLog) {
        LOG.warn("configure(KafkaBasedLog<>) called, shouldn't happen outside tests (Ignore if this is a unittest.)");
        if (!this.logConsumerState.compareAndSet(State.NOT_READY, State.STARTING)) {
            throw new IllegalStateException("start() called twice from the same unit test. Shouldn't happen!");
        }
        this.lkcLog = kafkaBasedLog;
        this.startLog();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startLog() {
        State currentState = this.logConsumerState.get();
        if (currentState != State.STARTING) {
            throw new IllegalStateException("Trying to start a log which is in a non-starting state: " + (Object)((Object)currentState));
        }
        try {
            long startNano = this.time.nanoseconds();
            this.lkcLog.start();
            Map<String, LCMPair> map = this.logicalClusterMap;
            synchronized (map) {
                this.postUpdateBookkeeping();
                this.logConsumerState.set(State.RUNNING);
                this.startMonitoringDeactivatedTenants();
            }
            long loadTimeNano = this.time.nanoseconds() - startNano;
            this.lkcStartupLoadSensor.record((double)TimeUnit.NANOSECONDS.toMillis(loadTimeNano));
            LOG.info("Consumed initial set of {} lkcs metadata from topic {} in {} ns", new Object[]{this.logicalClusterMap.size(), this.topicName, loadTimeNano});
        }
        catch (Exception e) {
            this.logConsumerState.set(State.FAILED_TO_START);
            throw new IllegalStateException("Unable to start consuming lkc metadata from topic", e);
        }
    }

    void shutdown() {
        LOG.info("Shutting down");
        try {
            State prevSslCertManagerState = this.sslCertManagerState.getAndSet(State.CLOSED);
            if (prevSslCertManagerState.equals((Object)State.RUNNING) || prevSslCertManagerState.equals((Object)State.STARTING)) {
                this.sslCertificateManager.shutdown();
                this.sslCertificateManager.close();
            } else {
                LOG.info("Trying to close already closed sslCertificateManager");
            }
        }
        catch (Exception e) {
            LOG.error("Error when shutting down sslCertificateManager", (Throwable)e);
        }
        try {
            State prevLogState = this.logConsumerState.getAndSet(State.CLOSED);
            if (prevLogState.equals((Object)State.RUNNING) || prevLogState.equals((Object)State.STARTING)) {
                this.lkcLog.stop();
            } else {
                LOG.info("Trying to close an lkcLog that was in a non-closable state: {}", (Object)prevLogState);
            }
        }
        catch (Exception e) {
            LOG.error("Error when shutting down lkcLog", (Throwable)e);
        }
        this.backgroundUpdatesExecutorService.shutdownNow();
        try {
            this.backgroundUpdatesExecutorService.awaitTermination(CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            LOG.debug("Shutting down was interrupted", (Throwable)e);
        }
        this.tenantLifecycleManager.close();
        LOG.info("Closed topic-based tenant cluster metadata store");
    }

    public boolean isUp() {
        return State.RUNNING.equals((Object)this.sslCertManagerState.get()) && State.RUNNING.equals((Object)this.logConsumerState.get());
    }

    public String dedicatedLogicalClusterId() {
        if (this.kafkaLogicalClusterIds.size() == 1) {
            return (String)this.kafkaLogicalClusterIds.stream().findFirst().get();
        }
        return "";
    }

    public Set<String> kafkaLogicalClusterIds() {
        return this.kafkaLogicalClusterIds;
    }

    @Override
    public Set<String> logicalClusterIds() {
        this.ensureOpen();
        return this.logicalClusterMap.entrySet().stream().filter(e -> ((LCMPair)e.getValue()).isActiveCluster()).map(Map.Entry::getKey).collect(Collectors.toSet());
    }

    @Override
    public Set<String> logicalClusterIdsIncludingStale() {
        this.ensureOpen();
        return ImmutableSet.copyOf(this.logicalClusterMap.keySet());
    }

    @Override
    public LogicalClusterMetadata metadata(String logicalClusterId) {
        this.ensureOpen();
        LCMPair lcmPair = this.logicalClusterMap.get(logicalClusterId);
        if (lcmPair != null && lcmPair.isActiveCluster()) {
            return lcmPair.getLCM();
        }
        return null;
    }

    private void startWatchingSslCertificates() throws IOException {
        if (this.sslCertManagerState.compareAndSet(State.NOT_READY, State.RUNNING)) {
            try {
                this.sslCertificateManager.startWatching();
            }
            catch (IOException ioe) {
                this.sslCertManagerState.compareAndSet(State.RUNNING, State.NOT_READY);
                throw ioe;
            }
        } else {
            LOG.warn("startWatchingSslCertificates, but state is: " + this.sslCertManagerState.get().toString());
        }
    }

    private LogicalClusterMetadata parseLCM(ConsumerRecord<String, byte[]> record) {
        LogicalClusterMetadata lcm = null;
        try {
            LogicalCluster asProto = LogicalCluster.parseFrom((byte[])record.value());
            lcm = LogicalClusterMetadata.fromProtobuf(asProto);
        }
        catch (InvalidProtocolBufferException e) {
            throw new IllegalArgumentException(e);
        }
        return lcm;
    }

    private void updateNumberOfTenantsMetric() {
        int size = (int)this.logicalClusterMap.values().stream().filter(LCMPair::isActiveCluster).count();
        this.numberOfTenantsValue.set(size);
    }

    private void updateQuotas() {
        Map<String, QuotaConfig> tenantQuotas = this.logicalClusterMap.entrySet().stream().filter(e -> ((LCMPair)e.getValue()).isActiveCluster()).collect(Collectors.toMap(Map.Entry::getKey, e -> ((LCMPair)e.getValue()).getLCM().quotaConfig()));
        TenantQuotaCallback.updateQuotas(tenantQuotas, QuotaConfig.UNLIMITED_QUOTA);
    }

    private void updateMaxPartitionsIfNecessary(LogicalClusterMetadata oldLcm, LogicalClusterMetadata newLcm) {
        if (!this.tenantLifecycleManager.updateMaxPartitionsIfNecessary(oldLcm, newLcm)) {
            LOG.info("updateMaxPartitionsIfNecessary() failed, rescheduling it");
            this.backgroundUpdatesExecutorService.schedule(() -> this.updateMaxPartitionsIfNecessary(oldLcm, newLcm), this.maxPartitionRetryDelayMs, TimeUnit.MILLISECONDS);
        }
    }

    private void recordEndToEndSensor(LogicalClusterMetadata newLcm) {
        Date creationDate;
        if (newLcm.lifecycleMetadata() != null && (creationDate = newLcm.lifecycleMetadata().creationDate()) != null) {
            long endToEndLoadTime = this.time.milliseconds() - creationDate.getTime();
            this.lkcTimeToLoadEndToEndSensor.record((double)endToEndLoadTime);
        }
    }

    private void updateTenant(LogicalClusterMetadata oldLcm, LogicalClusterMetadata newLcm) {
        if (State.RUNNING.equals((Object)this.logConsumerState.get())) {
            if (newLcm != null) {
                LOG.info("Adding or updating lkc metadata for cluster: {}", (Object)newLcm.logicalClusterId());
                this.recordEndToEndSensor(newLcm);
                if (this.addOrUpdate(oldLcm, newLcm)) {
                    this.updateMaxPartitionsIfNecessary(oldLcm, newLcm);
                }
            } else {
                LOG.info("Deleting lkc metadata for cluster: {}", (Object)oldLcm.logicalClusterId());
            }
            this.postUpdateBookkeeping();
        }
    }

    private void postUpdateBookkeeping() {
        this.updateNumberOfTenantsMetric();
        this.updateKafkaLogicalClusterIds();
        this.updateQuotas();
    }

    private void startMonitoringDeactivatedTenants() {
        if (this.updateDeactivatedTenantsIntervalMs < 1L) {
            LOG.error("The interval to check for deactivated tenants is set at {}. No tenants would be actually deleted (only deactivated!) and partitions and ACLs would leak!", (Object)this.updateDeactivatedTenantsIntervalMs);
            return;
        }
        if (!this.startedMonitoringDeactivatedClusters.getAndSet(true)) {
            this.backgroundUpdatesExecutorService.scheduleAtFixedRate(() -> this.updateDeactivatedTenants(), this.updateDeactivatedTenantsIntervalMs, this.updateDeactivatedTenantsIntervalMs, TimeUnit.MILLISECONDS);
        } else {
            LOG.info("startMonitoringDeactivatedTenants() called twice. Ignoring");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateDeactivatedTenants() {
        if (!State.RUNNING.equals((Object)this.logConsumerState.get())) {
            return;
        }
        List deactivatedClusters = this.logicalClusterMap.values().stream().filter(LCMPair::exists).filter(p -> !p.isActiveCluster()).map(LCMPair::getLCM).collect(Collectors.toList());
        for (LogicalClusterMetadata lcm : deactivatedClusters) {
            this.tenantLifecycleManager.updateTenantState(lcm);
        }
        this.tenantLifecycleManager.deleteTenants();
        Set<String> fullyDeletedClusters = this.tenantLifecycleManager.fullyDeletedClusters();
        Map<String, LCMPair> map = this.logicalClusterMap;
        synchronized (map) {
            for (String lkcId : fullyDeletedClusters) {
                LCMPair oldRecord = this.logicalClusterMap.get(lkcId);
                if (oldRecord == null) continue;
                this.logicalClusterMap.put(lkcId, new LCMPair(oldRecord.getSequenceId(), null));
            }
        }
    }

    private void updateKafkaLogicalClusterIds() {
        this.kafkaLogicalClusterIds = this.logicalClusterMap.values().stream().filter(LCMPair::isActiveCluster).filter(e -> e.getLCM().isKafkaLogicalCluster()).map(e -> e.getLCM().logicalClusterId()).collect(Collectors.toSet());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateLogicalCluster(String clusterId, Long sequenceId, LogicalClusterMetadata lcm) {
        if (!this.isStartingOrRunningState(this.logConsumerState.get())) {
            LOG.warn("Tried to add or update a logical cluster with a non running log (state = {})", (Object)this.logConsumerState.get());
        } else {
            Map<String, LCMPair> map = this.logicalClusterMap;
            synchronized (map) {
                LCMPair prevRecord = this.logicalClusterMap.get(clusterId);
                if (prevRecord == null || prevRecord.getSequenceId() < sequenceId) {
                    this.logicalClusterMap.put(clusterId, new LCMPair(sequenceId, lcm));
                    this.tenantLifecycleManager.updateTenantState(lcm);
                    LogicalClusterMetadata prevLcm = null;
                    if (prevRecord != null) {
                        prevLcm = prevRecord.getLCM();
                    }
                    this.updateTenant(prevLcm, lcm);
                } else {
                    LOG.warn("Got asked to update a cluster {} which has a newer sequence id in map: {}", (Object)clusterId, (Object)prevRecord.getSequenceId());
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeLogicalCluster(String clusterId, Long sequenceId) {
        if (!this.isStartingOrRunningState(this.logConsumerState.get())) {
            LOG.warn("Tried to add or update a logical cluster with a non running log (state = {})", (Object)this.logConsumerState.get());
        } else {
            Map<String, LCMPair> map = this.logicalClusterMap;
            synchronized (map) {
                LCMPair prevRecord = this.logicalClusterMap.get(clusterId);
                if (prevRecord == null) {
                    LOG.info("Got asked to remove a cluster {} which isn't in the map (maybe it's not Kafka?)", (Object)clusterId);
                } else if (prevRecord.getSequenceId() < sequenceId) {
                    this.logicalClusterMap.put(clusterId, new LCMPair(sequenceId, null));
                    this.updateTenant(prevRecord.getLCM(), null);
                } else {
                    LOG.warn("Got asked to remove a cluster {} which has a newer sequence id in map: {}", (Object)clusterId, (Object)prevRecord.getSequenceId());
                }
            }
        }
    }

    void consume(ConsumerRecord<String, byte[]> record) {
        String lkcId = (String)record.key();
        if (lkcId == null) {
            LOG.error("Missing key in LKC metadata record! (partition = {}, offset = {}, timestamp = {}", new Object[]{record.partition(), record.offset(), record.timestamp()});
            return;
        }
        Long sequenceId = Utils.tryParseEventsSequenceId(record);
        if (sequenceId == null) {
            LOG.error("Unable to decode sequence id for lkc metadata message (key = {}, partition = {}, offset = {}, timestamp = {})", new Object[]{lkcId, record.partition(), record.offset(), record.timestamp()});
            return;
        }
        if (record.value() == null) {
            LOG.info("seqId: {}. Removing LogicalClusterMetadata for {}", (Object)sequenceId, (Object)lkcId);
            this.removeLogicalCluster(lkcId, sequenceId);
        } else {
            try {
                LogicalClusterMetadata lcm = this.parseLCM(record);
                if (lcm.isValid()) {
                    if (!lkcId.equals(lcm.logicalClusterId())) {
                        LOG.error("seqId: {}. LKC id in key ({}) doesn't match one in message: {}. Skipping!", new Object[]{sequenceId, lkcId, lcm.logicalClusterId()});
                    } else {
                        LOG.info("seqId: {}. Updating LogicalClusterMetadata for {}", (Object)sequenceId, (Object)lkcId);
                        this.updateLogicalCluster(lkcId, sequenceId, lcm);
                    }
                }
            }
            catch (IllegalArgumentException iae) {
                LOG.error(String.format("seqId: %s. Unable to decode lkc metadata message for key %s", sequenceId, lkcId), (Throwable)iae);
            }
        }
    }

    private KafkaBasedLog<String, byte[]> createKafkaBasedLog(Map<String, ?> interBrokerClientConfigs) {
        HashSet consumerConfigNames = new HashSet(ConsumerConfig.configNames());
        consumerConfigNames.remove("metric.reporters");
        HashMap consumerProps = new HashMap(interBrokerClientConfigs);
        consumerProps.keySet().retainAll(consumerConfigNames);
        consumerProps.put("client.id", this.topicClientId);
        consumerProps.put("bootstrap.servers", interBrokerClientConfigs.get("bootstrap.servers"));
        consumerProps.put("allow.auto.create.topics", false);
        consumerProps.put("key.deserializer", StringDeserializer.class.getName());
        consumerProps.put("value.deserializer", ByteArrayDeserializer.class.getName());
        return new KafkaBasedLog(this.topicName, null, consumerProps, (Callback)new ConsumeCallback(), this.time, null, this.topicLoadTimeoutMs);
    }

    private void ensureOpen() {
        if (!State.RUNNING.equals((Object)this.sslCertManagerState.get())) {
            throw new IllegalStateException("SslCertificateManager not started.");
        }
        if (!State.RUNNING.equals((Object)this.logConsumerState.get())) {
            throw new IllegalStateException("KafkaBasedLog for the consumer topic not started.");
        }
    }

    private void ensureNonTerminalState(State state) {
        if (State.FAILED_TO_START.equals((Object)state) || State.CLOSED.equals((Object)state)) {
            throw new IllegalStateException("Unable to resume from state: " + state.toString());
        }
    }

    private boolean isStartingOrRunningState(State state) {
        return State.STARTING.equals((Object)state) || State.RUNNING.equals((Object)state);
    }

    private boolean addOrUpdate(LogicalClusterMetadata oldMeta, LogicalClusterMetadata newMeta) {
        boolean updateNeeded = !newMeta.equals(oldMeta);
        boolean expiredCluster = oldMeta == null && !newMeta.isActive();
        return updateNeeded && !expiredCluster;
    }

    private class ConsumeCallback
    implements Callback<ConsumerRecord<String, byte[]>> {
        private ConsumeCallback() {
        }

        public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
            if (error != null) {
                LOG.error("Unexpected error in ConsumeCallback for TopicBasedPhysicalClusterMetadata", error);
            } else {
                TopicBasedPhysicalClusterMetadata.this.consume(record);
            }
        }
    }

    public static enum State {
        NOT_READY,
        STARTING,
        RUNNING,
        CLOSED,
        FAILED_TO_START;

    }

    private static class LCMPair {
        private final long sequenceId;
        private final LogicalClusterMetadata lcm;

        LCMPair(long sequenceId, LogicalClusterMetadata lcm) {
            this.sequenceId = sequenceId;
            this.lcm = lcm;
        }

        long getSequenceId() {
            return this.sequenceId;
        }

        boolean exists() {
            return this.lcm != null;
        }

        boolean isActiveCluster() {
            return this.lcm != null && this.lcm.isActive();
        }

        LogicalClusterMetadata getLCM() {
            return this.lcm;
        }
    }
}

