package io.confluent.license;

import io.confluent.command.record.Command;
import io.confluent.serializers.ProtoSerde;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/license/LicenseStore.class */
public class LicenseStore {
    private static final Logger log = LoggerFactory.getLogger(LicenseStore.class);
    private static final String KEY_PREFIX = "CONFLUENT_LICENSE";
    private static final Command.CommandKey KEY = Command.CommandKey.newBuilder().setConfigType(Command.CommandConfigType.LICENSE_INFO).setGuid(KEY_PREFIX).m191build();
    private final String topic;
    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
    public static final long READ_TO_END_TIMEOUT_MS = 120000;
    private final KafkaBasedLog<Command.CommandKey, Command.CommandMessage> licenseLog;
    private final AtomicBoolean running;
    private final AtomicReference<String> latestLicense;
    private final Time time;

    /* loaded from: input_file:io/confluent/license/LicenseStore$ConsumeCallback.class */
    public static class ConsumeCallback implements Callback<ConsumerRecord<Command.CommandKey, Command.CommandMessage>> {
        private final AtomicReference<String> latestLicenseRef;

        ConsumeCallback(AtomicReference<String> atomicReference) {
            this.latestLicenseRef = atomicReference;
        }

        public void onCompletion(Throwable th, ConsumerRecord<Command.CommandKey, Command.CommandMessage> consumerRecord) {
            if (th != null) {
                LicenseStore.log.error("Unexpected error in consumer callback for LicenseStore: ", th);
            } else if (((Command.CommandKey) consumerRecord.key()).getConfigType() == Command.CommandConfigType.LICENSE_INFO) {
                this.latestLicenseRef.set(((Command.CommandMessage) consumerRecord.value()).getLicenseInfo().getJwt());
            }
        }
    }

    /* loaded from: input_file:io/confluent/license/LicenseStore$LicenseKeySerde.class */
    public static class LicenseKeySerde extends ProtoSerde<Command.CommandKey> {
        public LicenseKeySerde() {
            super(Command.CommandKey.getDefaultInstance());
        }
    }

    /* loaded from: input_file:io/confluent/license/LicenseStore$LicenseMessageSerde.class */
    public static class LicenseMessageSerde extends ProtoSerde<Command.CommandMessage> {
        public LicenseMessageSerde() {
            super(Command.CommandMessage.getDefaultInstance());
        }
    }

    public LicenseStore(String str, Map<String, Object> map, Map<String, Object> map2, Map<String, Object> map3) {
        this(str, map, map2, map3, Time.SYSTEM);
    }

    protected LicenseStore(String str, Map<String, Object> map, Map<String, Object> map2, Map<String, Object> map3, Time time) {
        this.running = new AtomicBoolean();
        this.topic = str;
        this.latestLicense = new AtomicReference<>();
        this.time = time;
        this.licenseLog = setupAndCreateKafkaBasedLog(this.topic, map, map2, map3, this.latestLicense, this.time);
    }

    public LicenseStore(String str, AtomicReference<String> atomicReference, KafkaBasedLog<Command.CommandKey, Command.CommandMessage> kafkaBasedLog, Time time) {
        this.running = new AtomicBoolean();
        this.topic = str;
        this.latestLicense = atomicReference;
        this.licenseLog = kafkaBasedLog;
        this.time = time;
    }

    KafkaBasedLog<Command.CommandKey, Command.CommandMessage> setupAndCreateKafkaBasedLog(String str, Map<String, Object> map, Map<String, Object> map2, Map<String, Object> map3, AtomicReference<String> atomicReference, Time time) {
        HashMap hashMap = new HashMap();
        hashMap.putAll(map);
        hashMap.put("key.serializer", LicenseKeySerde.class.getName());
        hashMap.put("value.serializer", LicenseMessageSerde.class.getName());
        hashMap.put("retries", Integer.MAX_VALUE);
        HashMap hashMap2 = new HashMap();
        hashMap2.putAll(map2);
        hashMap2.put("key.deserializer", LicenseKeySerde.class.getName());
        hashMap2.put("value.deserializer", LicenseMessageSerde.class.getName());
        String str2 = (String) map3.get(REPLICATION_FACTOR_CONFIG);
        return createKafkaBasedLog(str, hashMap, hashMap2, new ConsumeCallback(atomicReference), TopicAdmin.defineTopic(str).compacted().partitions(1).replicationFactor(str2 == null ? (short) 3 : Short.valueOf(str2).shortValue()).build(), map3, time);
    }

    private KafkaBasedLog<Command.CommandKey, Command.CommandMessage> createKafkaBasedLog(String str, Map<String, Object> map, Map<String, Object> map2, Callback<ConsumerRecord<Command.CommandKey, Command.CommandMessage>> callback, final NewTopic newTopic, final Map<String, Object> map3, Time time) {
        return new KafkaBasedLog<>(str, map, map2, callback, time, new Runnable() { // from class: io.confluent.license.LicenseStore.1
            @Override // java.lang.Runnable
            public void run() {
                TopicAdmin topicAdmin = new TopicAdmin(map3);
                Throwable th = null;
                try {
                    topicAdmin.createTopics(new NewTopic[]{newTopic});
                    if (topicAdmin != null) {
                        if (0 == 0) {
                            topicAdmin.close();
                            return;
                        }
                        try {
                            topicAdmin.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    if (topicAdmin != null) {
                        if (0 != 0) {
                            try {
                                topicAdmin.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            topicAdmin.close();
                        }
                    }
                    throw th3;
                }
            }
        });
    }

    public void start() {
        if (this.running.compareAndSet(false, true)) {
            log.info("Starting License Store");
            this.licenseLog.start();
            log.info("Started License Store");
        }
    }

    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            log.info("Closing License Store");
            this.licenseLog.stop();
            log.info("Closed License Store");
        }
    }

    public String licenseScan() {
        try {
            this.licenseLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            return this.latestLicense.get();
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to read license from Kafka: ", e);
            throw new IllegalStateException(e);
        }
    }

    public void registerLicense(String str) {
        registerLicense(str, null);
    }

    public void registerLicense(String str, org.apache.kafka.clients.producer.Callback callback) {
        this.licenseLog.send(KEY, Command.CommandMessage.newBuilder().setLicenseInfo(Command.LicenseInfo.newBuilder().setJwt(str).m287build()).m239build(), callback);
    }
}
