package io.streamthoughts.kafka.connect.filepulse.state;

import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
import io.streamthoughts.kafka.connect.filepulse.storage.KafkaStateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStore.class */
public class KafkaFileObjectStateBackingStore implements FileObjectStateBackingStore {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaFileObjectStateBackingStore.class);
    private static final String KEY_PREFIX = "connect-file-pulse";
    private KafkaStateBackingStore<FileObject> store;

    @Override // io.streamthoughts.kafka.connect.filepulse.state.FileObjectStateBackingStore
    public void configure(Map<String, ?> map) {
        KafkaFileObjectStateBackingStoreConfig kafkaFileObjectStateBackingStoreConfig = new KafkaFileObjectStateBackingStoreConfig(map);
        this.store = new KafkaStateBackingStore<>(kafkaFileObjectStateBackingStoreConfig.getTaskStorageTopic(), KEY_PREFIX, kafkaFileObjectStateBackingStoreConfig.getTaskStorageName(), kafkaFileObjectStateBackingStoreConfig.getProducerTaskStorageConfigs(), kafkaFileObjectStateBackingStoreConfig.getConsumerTaskStorageConfigs(), new FileObjectSerde(), kafkaFileObjectStateBackingStoreConfig.getTaskStorageConsumerEnabled());
        if (kafkaFileObjectStateBackingStoreConfig.isTopicCreationEnable()) {
            AdminClient create = AdminClient.create(kafkaFileObjectStateBackingStoreConfig.getAdminClientTaskStorageConfigs());
            try {
                HashMap hashMap = new HashMap();
                hashMap.put("cleanup.policy", "compact");
                createTopic(create, new NewTopic(kafkaFileObjectStateBackingStoreConfig.getTaskStorageTopic(), kafkaFileObjectStateBackingStoreConfig.getTopicPartitions(), kafkaFileObjectStateBackingStoreConfig.getReplicationFactor()).configs(hashMap));
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    private void createTopic(AdminClient adminClient, NewTopic newTopic) {
        try {
            LOG.info("Attempt to create new topic '{}'", newTopic);
            adminClient.createTopics(List.of(newTopic)).all().get();
        } catch (InterruptedException e) {
            LOG.warn("Failed to create topic '{}'", newTopic, e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            if (e2.getCause() instanceof TopicExistsException) {
                LOG.debug("Failed to created topic '{}'. Topic already exists.", newTopic);
            } else {
                LOG.warn("Failed to create topic '{}'", newTopic, e2);
            }
        }
    }

    public void start() {
        this.store.start();
    }

    public void stop() {
        this.store.stop();
    }

    public boolean isStarted() {
        return this.store.isStarted();
    }

    public StateSnapshot<FileObject> snapshot() {
        return this.store.snapshot();
    }

    public boolean contains(String str) {
        return this.store.contains(str);
    }

    public void putAsync(String str, FileObject fileObject) {
        this.store.putAsync(str, fileObject);
    }

    public void put(String str, FileObject fileObject) {
        this.store.put(str, fileObject);
    }

    public void remove(String str) {
        this.store.remove(str);
    }

    public void removeAsync(String str) {
        this.store.removeAsync(str);
    }

    public void refresh(long j, TimeUnit timeUnit) throws TimeoutException {
        this.store.refresh(j, timeUnit);
    }

    public void setUpdateListener(StateBackingStore.UpdateListener<FileObject> updateListener) {
        this.store.setUpdateListener(updateListener);
    }
}
