package com.github.imrafaelmerino.kafkacli;

import fun.gen.Gen;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import jio.IO;
import jio.ListExp;
import jio.RetryPolicies;
import jio.cli.Command;
import jio.cli.ConsolePrinter;
import jio.cli.ConsolePrograms;
import jio.cli.State;
import jsonvalues.JsObj;
import jsonvalues.spec.JsonToAvro;
import org.apache.avro.Schema;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:com/github/imrafaelmerino/kafkacli/PublishFileCommand.class */
class PublishFileCommand extends Command {
    private static final String COMMAND_NAME = "producer-publish-file";
    private static final String USAGE = "Publishes records from a file to the specified Kafka channel.\n\nUsage: producer-publish-file {channel} {file_path}\n\n{channel}: The name of the Kafka channel to publish to. Choose one of the channels listed below.\n\n{file_path}: The absolute path of the file containing records to publish.\n\nThe file should have the following format:\n  - Each record should be separated by a new line.\n  - Each record consists of one or more lines, starting with either \"headers:\", \"key:\", or \"value:\".\n  - headers and key are optional.\n  - headers must be a Json object.\n\nExamples:\n  producer-publish-file (prompts the user to input the channel name and file absolute path)\n  producer-publish-file channel1 /path/to/another_records.txt\n";
    private final KafkaProducers producers;
    private final AvroSchemas avroSchemas;
    Map<String, Gen<?>> generators;

    public PublishFileCommand(Map<String, Gen<?>> map, KafkaProducers kafkaProducers, AvroSchemas avroSchemas) {
        super(COMMAND_NAME, USAGE, strArr -> {
            return strArr[0].equals(COMMAND_NAME);
        });
        this.generators = map;
        this.producers = kafkaProducers;
        this.avroSchemas = avroSchemas;
    }

    public Function<String[], IO<String>> apply(JsObj jsObj, State state) {
        return strArr -> {
            return strArr.length == 1 ? ConsolePrograms.ASK_FOR_PAIR(Prompts.ASK_FOR_CHANNEL.apply(jsObj, this.producers), new ConsolePrograms.AskForInputParams("\n%s".formatted("Type the file absolute path:"), str -> {
                return Files.exists(Path.of(str, new String[0]), new LinkOption[0]);
            }, "File doesn't exist", RetryPolicies.limitRetries(3))).then(pair -> {
                return sendFile(jsObj, (String) pair.first(), (String) pair.second());
            }) : sendFile(jsObj, strArr[1], strArr[2]);
        };
    }

    private IO<String> sendFile(JsObj jsObj, String str, String str2) {
        JsObj obj = jsObj.getObj(ConfigurationFields.CHANNELS).getObj(str);
        String str3 = obj.getStr(ConfigurationFields.PRODUCER);
        String str4 = obj.getStr(ConfigurationFields.TOPIC);
        KafkaProducer<Object, Object> apply = this.producers.apply(str3);
        if (apply == null) {
            return IO.fail(new IllegalArgumentException(String.format("Producer `%s` not started. Use the command `start-producer %s`", str3, str3)));
        }
        List<Message> parseRecordsFromFile = FileParser.parseRecordsFromFile(str2);
        ListExp seq = ListExp.seq(new IO[0]);
        for (Message message : parseRecordsFromFile) {
            if (message.key() == null) {
                seq.append(sendValue(message.value(), str, apply, str4));
            }
            seq.append(sendKeyAndValue(message.key(), message.value(), str, apply, str4));
        }
        return seq.map(list -> {
            return String.join("\n", list);
        });
    }

    IO<String> sendKeyAndValue(String str, String str2, String str3, KafkaProducer<Object, Object> kafkaProducer, String str4) {
        Schema schema = this.avroSchemas.keySchemasPerChannel.get(str3);
        Schema schema2 = this.avroSchemas.valueSchemasPerChannel.get(str3);
        return (schema2 == null || schema == null) ? schema2 != null ? sendRecordTask(kafkaProducer, new ProducerRecord<>(str4, str, JsonToAvro.convert(JsObj.parse(str2), schema2))) : schema != null ? sendRecordTask(kafkaProducer, new ProducerRecord<>(str4, JsonToAvro.convert(JsObj.parse(str), schema), str2)) : sendRecordTask(kafkaProducer, new ProducerRecord<>(str4, str, str2)) : sendRecordTask(kafkaProducer, new ProducerRecord<>(str4, JsonToAvro.convert(JsObj.parse(str), schema), JsonToAvro.convert(JsObj.parse(str2), schema2)));
    }

    private IO<String> sendRecordTask(KafkaProducer<Object, Object> kafkaProducer, ProducerRecord<Object, Object> producerRecord) {
        return IO.lazy(() -> {
            return ConsolePrinter.printlnResult(Fun.getMessageSent(producerRecord));
        }).then(str -> {
            return IO.effect(() -> {
                return kafkaProducer.send(producerRecord);
            }).map(recordMetadata -> {
                return new KafkaResponse(producerRecord.timestamp().longValue(), recordMetadata.offset(), recordMetadata.partition()).getResponseReceivedMessage(producerRecord.topic());
            });
        });
    }

    IO<String> sendValue(String str, String str2, KafkaProducer<Object, Object> kafkaProducer, String str3) {
        Schema schema = this.avroSchemas.valueSchemasPerChannel.get(str2);
        return schema != null ? sendRecordTask(kafkaProducer, new ProducerRecord<>(str3, JsonToAvro.convert(JsObj.parse(str), schema))) : sendRecordTask(kafkaProducer, new ProducerRecord<>(str3, str));
    }
}
