package io.streamthoughts.jikkou.kafka.change.topics;

import io.streamthoughts.jikkou.common.utils.CollectionUtils;
import io.streamthoughts.jikkou.common.utils.Pair;
import io.streamthoughts.jikkou.core.data.TypeConverter;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.models.change.StateChange;
import io.streamthoughts.jikkou.core.models.change.StateChangeList;
import io.streamthoughts.jikkou.core.reconciler.ChangeMetadata;
import io.streamthoughts.jikkou.core.reconciler.ChangeResponse;
import io.streamthoughts.jikkou.core.reconciler.Operation;
import io.streamthoughts.jikkou.core.reconciler.TextDescription;
import io.streamthoughts.jikkou.core.reconciler.change.BaseChangeHandler;
import io.streamthoughts.jikkou.kafka.internals.Futures;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/change/topics/CreateTopicChangeHandler.class */
public final class CreateTopicChangeHandler extends BaseChangeHandler<ResourceChange> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CreateTopicChangeHandler.class);
    private final AdminClient client;

    public CreateTopicChangeHandler(AdminClient adminClient) {
        super(Operation.CREATE);
        this.client = adminClient;
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.ChangeHandler
    public TextDescription describe(@NotNull ResourceChange resourceChange) {
        return TopicChange.getDescription(resourceChange);
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.ChangeHandler
    @NotNull
    public List<ChangeResponse<ResourceChange>> handleChanges(@NotNull List<ResourceChange> list) {
        CreateTopicsResult createTopics = this.client.createTopics((List) list.stream().map(this::toNewTopic).collect(Collectors.toList()), new CreateTopicsOptions());
        Map keyBy = CollectionUtils.keyBy(list, resourceChange -> {
            return resourceChange.getMetadata().getName();
        });
        return ((Map) new HashMap(createTopics.values()).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Futures.toCompletableFuture((KafkaFuture) entry.getValue());
        }))).entrySet().stream().map(entry2 -> {
            ResourceChange resourceChange2 = (ResourceChange) keyBy.get(entry2.getKey());
            return new ChangeResponse(resourceChange2, (CompletableFuture<ChangeMetadata>) ((CompletableFuture) entry2.getValue()).thenApply(r11 -> {
                if (LOG.isInfoEnabled()) {
                    StateChangeList<? extends StateChange> changes = resourceChange2.getSpec2().getChanges();
                    LOG.info("Completed topic creation with: name={}, partitions={}, replicas={}", entry2.getKey(), changes.getLast(TopicChange.PARTITIONS, TypeConverter.Integer()).getAfter(), changes.getLast(TopicChange.REPLICAS, TypeConverter.Short()).getAfter());
                }
                return ChangeMetadata.empty();
            }));
        }).toList();
    }

    private NewTopic toNewTopic(ResourceChange resourceChange) {
        StateChangeList<? extends StateChange> changes = resourceChange.getSpec2().getChanges();
        return new NewTopic(resourceChange.getMetadata().getName(), ((Integer) changes.getLast(TopicChange.PARTITIONS, TypeConverter.Integer()).getAfter()).intValue(), ((Short) changes.getLast(TopicChange.REPLICAS, TypeConverter.Short()).getAfter()).shortValue()).configs((Map) changes.allWithPrefix("config.").stream().map(stateChange -> {
            return Pair.of(stateChange.getName(), stateChange);
        }).map(pair -> {
            return pair.mapRight((v0) -> {
                return v0.getAfter();
            });
        }).map(pair2 -> {
            return pair2.mapRight((v0) -> {
                return v0.toString();
            });
        }).collect(Collectors.toMap((v0) -> {
            return v0._1();
        }, (v0) -> {
            return v0._2();
        })));
    }
}
