package io.carml.logicalsourceresolver;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.Predicate;
import com.jayway.jsonpath.spi.json.JacksonJsonNodeJsonProvider;
import io.carml.logicalsourceresolver.LogicalSourceResolver;
import io.carml.model.LogicalSource;
import io.carml.util.LogUtil;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.StreamSupport;
import lombok.Generated;
import org.jsfr.json.JsonPathListener;
import org.jsfr.json.JsonSurfer;
import org.jsfr.json.JsonSurferJackson;
import org.jsfr.json.NonBlockingParser;
import org.jsfr.json.SurfingConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:io/carml/logicalsourceresolver/JsonPathResolver.class */
public class JsonPathResolver implements LogicalSourceResolver<JsonNode> {
    private static final int DEFAULT_BUFFER_SIZE = 8192;
    private final JsonSurfer jsonSurfer;
    private final int bufferSize;

    @Generated
    private static final Logger LOG = LoggerFactory.getLogger(JsonPathResolver.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final Configuration JSONPATH_CONF = Configuration.builder().jsonProvider(new JacksonJsonNodeJsonProvider()).options(new Option[]{Option.DEFAULT_PATH_LEAF_TO_NULL}).options(new Option[]{Option.SUPPRESS_EXCEPTIONS}).build();

    public static JsonPathResolver getInstance() {
        return getInstance(DEFAULT_BUFFER_SIZE);
    }

    public static JsonPathResolver getInstance(int i) {
        return new JsonPathResolver(JsonSurferJackson.INSTANCE, i);
    }

    public Function<ResolvedSource<?>, Flux<LogicalSourceRecord<JsonNode>>> getLogicalSourceRecords(Set<LogicalSource> set) {
        return resolvedSource -> {
            return getLogicalSourceRecordFlux(resolvedSource, set);
        };
    }

    private Flux<LogicalSourceRecord<JsonNode>> getLogicalSourceRecordFlux(ResolvedSource<?> resolvedSource, Set<LogicalSource> set) {
        if (set.isEmpty()) {
            throw new IllegalStateException("No logical sources registered");
        }
        if (resolvedSource == null || resolvedSource.getResolved().isEmpty()) {
            throw new LogicalSourceResolverException(String.format("No source provided for logical sources:%n%s", LogUtil.exception(set)));
        }
        Object obj = resolvedSource.getResolved().get();
        if (obj instanceof InputStream) {
            return getObjectFlux((InputStream) resolvedSource.getResolved().get(), set);
        }
        if (obj instanceof JsonNode) {
            return getObjectFlux((JsonNode) obj, set);
        }
        throw new LogicalSourceResolverException(String.format("Unsupported source object provided for logical sources:%n%s", LogUtil.exception(set)));
    }

    private Flux<LogicalSourceRecord<JsonNode>> getObjectFlux(InputStream inputStream, Set<LogicalSource> set) {
        AtomicLong atomicLong = new AtomicLong();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        return Flux.create(fluxSink -> {
            fluxSink.onRequest(j -> {
                long addAndGet = atomicLong.addAndGet(j);
                if (atomicBoolean.get() && addAndGet >= 0) {
                    atomicBoolean.compareAndSet(true, false);
                } else {
                    if (atomicBoolean.get() || addAndGet >= 0) {
                        return;
                    }
                    atomicBoolean.compareAndSet(false, true);
                }
            });
            fluxSink.onDispose(() -> {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    throw new LogicalSourceResolverException("Error closing input stream.", e);
                }
            });
            SurfingConfiguration.Builder configBuilder = this.jsonSurfer.configBuilder();
            bridgeAndListen(set, configBuilder, fluxSink, atomicLong);
            readSource(inputStream, this.jsonSurfer.createNonBlockingParser(configBuilder.build()), fluxSink, atomicBoolean, atomicBoolean2);
        });
    }

    private Flux<LogicalSourceRecord<JsonNode>> getObjectFlux(JsonNode jsonNode, Set<LogicalSource> set) {
        return Flux.fromIterable(set).flatMap(logicalSource -> {
            return getObjectFluxForLogicalSource(jsonNode, logicalSource);
        });
    }

    private void bridgeAndListen(Set<LogicalSource> set, SurfingConfiguration.Builder builder, FluxSink<LogicalSourceRecord<JsonNode>> fluxSink, AtomicLong atomicLong) {
        set.forEach(logicalSource -> {
            try {
                builder.bind(logicalSource.getIterator(), new JsonPathListener[]{(obj, parsingContext) -> {
                    if (!(obj instanceof JsonNode)) {
                        throw new LogicalSourceResolverException(String.format("Encountered non-JsonNode value: %s", obj));
                    }
                    fluxSink.next(LogicalSourceRecord.of(logicalSource, (JsonNode) obj));
                    atomicLong.decrementAndGet();
                }});
            } catch (RuntimeException e) {
                fluxSink.error(new LogicalSourceResolverException(String.format("An exception occurred while parsing expression: %s", logicalSource.getIterator())));
            }
        });
    }

    private void readSource(InputStream inputStream, NonBlockingParser nonBlockingParser, FluxSink<LogicalSourceRecord<JsonNode>> fluxSink, AtomicBoolean atomicBoolean, AtomicBoolean atomicBoolean2) {
        try {
            ReadableByteChannel newChannel = Channels.newChannel(inputStream);
            try {
                ByteBuffer allocate = ByteBuffer.allocate(this.bufferSize);
                while (!atomicBoolean2.get()) {
                    while (true) {
                        if (atomicBoolean.get()) {
                            break;
                        }
                        if ((inputStream.available() > 0 ? newChannel.read(allocate) : -1) == -1) {
                            nonBlockingParser.endOfInput();
                            atomicBoolean2.compareAndSet(false, true);
                            fluxSink.complete();
                            break;
                        }
                        nonBlockingParser.feed(allocate.array(), 0, allocate.position());
                        allocate.clear();
                    }
                }
                if (newChannel != null) {
                    newChannel.close();
                }
            } finally {
            }
        } catch (IOException e) {
            fluxSink.error(new LogicalSourceResolverException("Error reading input stream.", e));
        }
    }

    private Flux<LogicalSourceRecord<JsonNode>> getObjectFluxForLogicalSource(JsonNode jsonNode, LogicalSource logicalSource) {
        JsonNode jsonNode2 = (JsonNode) JsonPath.using(JSONPATH_CONF).parse(jsonNode).read(logicalSource.getIterator(), JsonNode.class, new Predicate[0]);
        if (jsonNode2 == null || jsonNode2.isNull()) {
            return Flux.empty();
        }
        if (jsonNode2.isArray()) {
            return Flux.fromStream(StreamSupport.stream(Spliterators.spliteratorUnknownSize(jsonNode2.elements(), 16), false)).map(jsonNode3 -> {
                return LogicalSourceRecord.of(logicalSource, jsonNode3);
            });
        }
        if (jsonNode2.isObject() || jsonNode2.isValueNode()) {
            return Flux.just(LogicalSourceRecord.of(logicalSource, jsonNode2));
        }
        throw new LogicalSourceResolverException(String.format("Error interpreting expression result %s", jsonNode2));
    }

    public LogicalSourceResolver.ExpressionEvaluationFactory<JsonNode> getExpressionEvaluationFactory() {
        return jsonNode -> {
            return str -> {
                logEvaluateExpression(str, LOG);
                JsonNode jsonNode = (JsonNode) JsonPath.using(JSONPATH_CONF).parse(jsonNode).read(str, JsonNode.class, new Predicate[0]);
                if (jsonNode != null) {
                    try {
                        if (!jsonNode.isNull()) {
                            if (jsonNode.isArray()) {
                                return Optional.of(OBJECT_MAPPER.treeToValue(jsonNode, List.class));
                            }
                            if (jsonNode.isObject()) {
                                return Optional.of(OBJECT_MAPPER.treeToValue(jsonNode, Map.class));
                            }
                            if (jsonNode.isValueNode()) {
                                return Optional.of(jsonNode.asText());
                            }
                            throw new LogicalSourceResolverException(String.format("Error interpreting expression result %s", jsonNode));
                        }
                    } catch (JsonProcessingException e) {
                        throw new LogicalSourceResolverException(String.format("Error processing expression result %s", jsonNode), e);
                    }
                }
                return Optional.empty();
            };
        };
    }

    @Generated
    private JsonPathResolver(JsonSurfer jsonSurfer, int i) {
        this.jsonSurfer = jsonSurfer;
        this.bufferSize = i;
    }
}
