package io.streamthoughts.jikkou.kafka.change.record;

import io.streamthoughts.jikkou.core.data.TypeConverter;
import io.streamthoughts.jikkou.core.models.change.GenericStateChange;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.reconciler.ChangeMetadata;
import io.streamthoughts.jikkou.core.reconciler.ChangeResponse;
import io.streamthoughts.jikkou.core.reconciler.Operation;
import io.streamthoughts.jikkou.core.reconciler.TextDescription;
import io.streamthoughts.jikkou.core.reconciler.change.BaseChangeHandler;
import io.streamthoughts.jikkou.kafka.change.record.KafkaTableRecordChangeComputer;
import io.streamthoughts.jikkou.kafka.internals.KafkaRecord;
import io.streamthoughts.jikkou.kafka.internals.producer.KafkaRecordSender;
import io.streamthoughts.jikkou.kafka.model.DataValue;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTableRecordSpec;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.VisibleForTesting;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/change/record/KafkaTableRecordChangeHandler.class */
public final class KafkaTableRecordChangeHandler extends BaseChangeHandler<ResourceChange> {
    private final Producer<byte[], byte[]> producer;

    public KafkaTableRecordChangeHandler(@NotNull Producer<byte[], byte[]> producer) {
        super((Set<Operation>) Set.of(Operation.CREATE, Operation.UPDATE));
        this.producer = (Producer) Objects.requireNonNull(producer, "producerFactory must not be null");
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.ChangeHandler
    public TextDescription describe(@NotNull ResourceChange resourceChange) {
        return new KafkaTableRecordChangeDescription(resourceChange);
    }

    @Override // io.streamthoughts.jikkou.core.reconciler.ChangeHandler
    public List<ChangeResponse<ResourceChange>> handleChanges(@NotNull List<ResourceChange> list) {
        KafkaRecordSender kafkaRecordSender = new KafkaRecordSender(this.producer);
        return (List) list.stream().map(resourceChange -> {
            return send(resourceChange, kafkaRecordSender);
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NotNull
    public static ChangeResponse<ResourceChange> send(ResourceChange resourceChange, KafkaRecordSender<byte[], byte[]> kafkaRecordSender) {
        return new ChangeResponse<>(resourceChange, (CompletableFuture<ChangeMetadata>) kafkaRecordSender.send(toKafkaRecord(resourceChange).mapKey(byteBuffer -> {
            return (byte[]) Optional.ofNullable(byteBuffer).map((v0) -> {
                return v0.array();
            }).orElse(null);
        }).mapValue(byteBuffer2 -> {
            return (byte[]) Optional.ofNullable(byteBuffer2).map((v0) -> {
                return v0.array();
            }).orElse(null);
        })).thenApply(producerRequestResult -> {
            return producerRequestResult.error() != null ? ChangeMetadata.of(producerRequestResult.error()) : ChangeMetadata.empty();
        }));
    }

    @VisibleForTesting
    static KafkaRecord<ByteBuffer, ByteBuffer> toKafkaRecord(ResourceChange resourceChange) {
        GenericStateChange last = resourceChange.getSpec2().getChanges().getLast(KafkaTableRecordChangeComputer.KafkaTableRecordChangeFactory.DATA_RECORD, TypeConverter.of(V1KafkaTableRecordSpec.class));
        Operation op = last.getOp();
        DataValue key = op == Operation.CREATE ? ((V1KafkaTableRecordSpec) last.getAfter()).getKey() : ((V1KafkaTableRecordSpec) last.getBefore()).getKey();
        String topic = op == Operation.DELETE ? ((V1KafkaTableRecordSpec) last.getBefore()).getTopic() : ((V1KafkaTableRecordSpec) last.getAfter()).getTopic();
        Optional<ByteBuffer> serialize = key.type().getDataSerde().serialize(topic, key.data(), Collections.emptyMap(), true);
        Optional<ByteBuffer> empty = Optional.empty();
        if (op != Operation.DELETE) {
            DataValue value = ((V1KafkaTableRecordSpec) last.getAfter()).getValue();
            empty = value.type().getDataSerde().serialize(topic, value.data(), Collections.emptyMap(), false);
        }
        return KafkaRecord.builder().topic(topic).headers(new RecordHeaders((List) ((V1KafkaTableRecordSpec) last.getAfter()).getHeaders().stream().map(kafkaRecordHeader -> {
            return new RecordHeader(kafkaRecordHeader.name(), kafkaRecordHeader.value().getBytes(StandardCharsets.UTF_8));
        }).collect(Collectors.toList()))).key(serialize.orElse(null)).value(empty.orElse(null)).build();
    }
}
