package com.github.dbmdz.flusswerk.framework.flow;

import com.github.dbmdz.flusswerk.framework.model.Message;
import com.github.dbmdz.flusswerk.framework.monitoring.Converter;
import com.github.dbmdz.flusswerk.framework.monitoring.FlowMetrics;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.MDC;

/* loaded from: input_file:com/github/dbmdz/flusswerk/framework/flow/Flow.class */
public class Flow {
    private final Function<Message, Object> reader;
    private final Function<Object, Object> transformer;
    private final Function<Object, Collection<Message>> writer;
    private final Runnable cleanup;
    private final Set<Consumer<FlowInfo>> flowMetrics = new HashSet();

    public Flow(FlowSpec flowSpec) {
        this.reader = (Function) Objects.requireNonNull(flowSpec.reader());
        this.transformer = (Function) Objects.requireNonNull(flowSpec.transformer());
        this.writer = (Function) Objects.requireNonNull(flowSpec.writer());
        this.cleanup = (Runnable) Objects.requireNonNullElse(flowSpec.cleanup(), () -> {
        });
        if (flowSpec.monitor() != null) {
            this.flowMetrics.add(flowSpec.monitor());
        }
    }

    public void registerFlowMetrics(Set<FlowMetrics> set) {
        this.flowMetrics.addAll(set);
    }

    public Collection<Message> process(Message message) {
        FlowInfo flowInfo = new FlowInfo(message);
        setLoggingData(message);
        long nanoTime = System.nanoTime();
        try {
            try {
                Collection<Message> innerProcess = innerProcess(message);
                flowInfo.stop();
                long nanoTime2 = System.nanoTime() - nanoTime;
                MDC.put("duration", String.format(Locale.ENGLISH, "%f", Double.valueOf(Converter.ns_to_seconds(nanoTime2))));
                MDC.put("duration_ms", String.format(Locale.ENGLISH, "%f", Double.valueOf(Converter.ns_to_milliseconds(nanoTime2))));
                this.flowMetrics.forEach(consumer -> {
                    consumer.accept(flowInfo);
                });
                return innerProcess;
            } catch (RuntimeException e) {
                flowInfo.setStatusFrom(e);
                throw e;
            }
        } catch (Throwable th) {
            flowInfo.stop();
            long nanoTime3 = System.nanoTime() - nanoTime;
            MDC.put("duration", String.format(Locale.ENGLISH, "%f", Double.valueOf(Converter.ns_to_seconds(nanoTime3))));
            MDC.put("duration_ms", String.format(Locale.ENGLISH, "%f", Double.valueOf(Converter.ns_to_milliseconds(nanoTime3))));
            this.flowMetrics.forEach(consumer2 -> {
                consumer2.accept(flowInfo);
            });
            throw th;
        }
    }

    public Collection<Message> innerProcess(Message message) {
        try {
            Collection<Message> apply = this.writer.apply(this.transformer.apply(this.reader.apply(message)));
            this.cleanup.run();
            return apply == null ? Collections.emptyList() : apply.stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).toList();
        } catch (Throwable th) {
            this.cleanup.run();
            throw th;
        }
    }

    void setLoggingData(Message message) {
        MDC.clear();
        for (Method method : message.getClass().getMethods()) {
            if ("getId".equalsIgnoreCase(method.getName()) && method.canAccess(message)) {
                try {
                    Object invoke = method.invoke(message, new Object[0]);
                    if (invoke != null) {
                        MDC.put("id", invoke.toString());
                    }
                    return;
                } catch (IllegalAccessException | InvocationTargetException e) {
                    throw new RuntimeException("Cannot get ID for logging but should be able to");
                }
            }
        }
    }
}
