/*
 * Decompiled with CFR 0.152.
 */
package net.pincette.jes;

import com.mongodb.ClientSessionOptions;
import com.mongodb.ReadConcern;
import com.mongodb.WriteConcern;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;
import javax.json.JsonArray;
import javax.json.JsonNumber;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonStructure;
import javax.json.JsonValue;
import net.pincette.function.SupplierWithException;
import net.pincette.jes.Command;
import net.pincette.jes.Event;
import net.pincette.jes.Reducer;
import net.pincette.json.JsonUtil;
import net.pincette.mongo.BsonUtil;
import net.pincette.mongo.Collection;
import net.pincette.mongo.Expression;
import net.pincette.mongo.JsonClient;
import net.pincette.mongo.Patch;
import net.pincette.rs.Async;
import net.pincette.rs.BackpressureTimout;
import net.pincette.rs.Box;
import net.pincette.rs.Buffer;
import net.pincette.rs.Combine;
import net.pincette.rs.Fanout;
import net.pincette.rs.Filter;
import net.pincette.rs.Mapper;
import net.pincette.rs.Merge;
import net.pincette.rs.NotFilter;
import net.pincette.rs.PassThrough;
import net.pincette.rs.Pipe;
import net.pincette.rs.streams.Message;
import net.pincette.rs.streams.Streams;
import net.pincette.util.Builder;
import net.pincette.util.Collections;
import net.pincette.util.Or;
import net.pincette.util.Pair;
import net.pincette.util.Util;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;

public class Aggregate<T, U> {
    private static final String AGGREGATE_TOPIC = "aggregate";
    private static final Duration BACK_OFF = Duration.ofSeconds(5L);
    private static final String COMMAND_TOPIC = "command";
    private static final Duration DUPLICATE_WINDOW = Duration.ofSeconds(5L);
    private static final String EVENT_TOPIC = "event";
    private static final String EVENT_FULL_TOPIC = "event-full";
    private static final String EXCEPTION = "exception";
    private static final String MESSAGE = "message";
    private static final String REDUCER_COMMAND = "command";
    private static final String REDUCER_STATE = "state";
    private static final String REPLY_TOPIC = "reply";
    private static final String SET = "$set";
    private static final Set<String> TECHNICAL_FIELDS = Collections.set((Object[])new String[]{"_command", "_corr", "_id", "_jwt", "_languages", "_seq", "_test", "_timestamp", "_type"});
    private static final String UNIQUE_TOPIC = "unique";
    private final Map<String, Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>>> commandProcessors = new HashMap<String, Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>>>();
    private final Map<String, Reducer> reducers = new HashMap<String, Reducer>();
    private final Map<String, Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>>> reducerProcessors = new HashMap<String, Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>>>();
    private MongoCollection<Document> aggregateCollection;
    private String app;
    private Duration backpressureTimeout;
    private boolean breakingTheGlass;
    private Streams<String, JsonObject, T, U> builder;
    private MongoClient client;
    private Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> commandProcessor;
    private MongoDatabase database;
    private String environment;
    private Logger logger;
    private Reducer reducer;
    private ClientSession session;
    private String type;
    private JsonValue uniqueExpression;
    private Function<JsonObject, JsonValue> uniqueFunction;

    public Aggregate() {
        this.withReducer("delete", (command, currentState) -> Aggregate.delete(currentState));
        this.withReducer("patch", Aggregate::patch);
        this.withReducer("put", (command, currentState) -> Aggregate.put(command));
    }

    private static JsonObject accessError(JsonObject command) {
        return JsonUtil.createObjectBuilder((JsonObject)command).add("_error", true).add("_statusCode", 403).add(MESSAGE, "Forbidden").build();
    }

    private static Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> aggregates() {
        return Box.box((Flow.Processor)Filter.filter(m -> Event.isEvent((JsonObject)m.value) && ((JsonObject)m.value).containsKey((Object)"_after")), (Flow.Processor)Mapper.map(m -> m.withValue((Object)((JsonObjectBuilder)Builder.create(() -> JsonUtil.createObjectBuilder((JsonObject)((JsonObject)m.value).getJsonObject("_after"))).updateIf(() -> Optional.ofNullable(((JsonObject)m.value).getJsonObject("_jwt")), (b, v) -> b.add("_jwt", (JsonValue)v)).build()).build())));
    }

    private static Optional<JsonObject> beforeWithoutTechnical(JsonObject event) {
        return Optional.ofNullable(event.getJsonObject("_before")).map(Aggregate::removeTechnical).map(JsonObjectBuilder::build);
    }

    private static JsonObject command(Message<String, JsonObject> message) {
        return ((JsonObject)message.value).getJsonObject("command");
    }

    private static String commandDuplicateKey(JsonObject command) {
        return command.getString("_id") + command.getString("_corr") + command.getString("_command");
    }

    private static Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> commands() {
        return Pipe.pipe((Flow.Processor)Filter.filter(m -> Command.isCommand((JsonObject)m.value))).then(Mapper.map(m -> m.withKey((Object)((String)m.key).toLowerCase()).withValue((Object)Aggregate.idsToLowerCase((JsonObject)m.value)))).then(Mapper.map(m -> m.withValue((Object)Aggregate.completeCommand((JsonObject)m.value)))).then(net.pincette.rs.Util.duplicateFilter(m -> Aggregate.commandDuplicateKey((JsonObject)m.value), (Duration)DUPLICATE_WINDOW));
    }

    private static JsonObject completeCommand(JsonObject command) {
        return !command.containsKey((Object)"_timestamp") ? JsonUtil.createObjectBuilder((JsonObject)command).add("_timestamp", Optional.ofNullable(command.getJsonNumber("_timestamp")).map(JsonNumber::longValue).orElseGet(() -> Instant.now().toEpochMilli()).longValue()).build() : command;
    }

    private static JsonObjectBuilder createAfter(JsonObject newState, String corr, int seq, long now) {
        return JsonUtil.createObjectBuilder((JsonObject)newState).add("_corr", corr).add("_seq", seq).add("_timestamp", now).remove("_jwt");
    }

    private static JsonObject createEvent(JsonObject oldState, JsonObject newState, JsonObject command, JsonArray ops) {
        String corr = command.getString("_corr");
        long now = Instant.now().toEpochMilli();
        int seq = oldState.getInt("_seq", -1) + 1;
        return ((JsonObjectBuilder)Builder.create(() -> JsonUtil.createObjectBuilder().add("_corr", corr).add("_id", newState.getString("_id").toLowerCase()).add("_type", newState.getString("_type")).add("_seq", seq).add("_command", command.getString("_command")).add("_timestamp", now).add("_before", (JsonValue)oldState).add("_after", Aggregate.createAfter(newState, corr, seq, now)).add("_ops", (JsonValue)ops)).updateIf(b -> command.containsKey((Object)"_languages"), b -> b.add("_languages", (JsonValue)command.getJsonArray("_languages"))).updateIf(b -> command.containsKey((Object)"_jwt"), b -> b.add("_jwt", (JsonValue)command.getJsonObject("_jwt"))).build()).build();
    }

    private static JsonArray createOps(JsonObject oldState, JsonObject newState) {
        return JsonUtil.createArrayBuilder((JsonArray)JsonUtil.createDiff((JsonStructure)Aggregate.removeTechnical(oldState).build(), (JsonStructure)Aggregate.removeTechnical(newState).build()).toJsonArray()).build();
    }

    private static ClientSession createSession(MongoClient client) {
        return (ClientSession)net.pincette.rs.Util.asValue((Flow.Publisher)FlowAdapters.toFlowPublisher((Publisher)client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build())));
    }

    private static JsonObject createSource(JsonObject command, JsonObject state) {
        return JsonUtil.createObjectBuilder().add(REDUCER_STATE, (JsonValue)state).add("command", (JsonValue)command).build();
    }

    public static CompletionStage<JsonObject> delete(JsonObject currentState) {
        return CompletableFuture.completedFuture(JsonUtil.createObjectBuilder((JsonObject)currentState).add("_deleted", true).build());
    }

    private static Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> errors() {
        return Filter.filter(m -> Command.isCommand((JsonObject)m.value) && Command.hasError((JsonObject)m.value));
    }

    private static Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> events() {
        return Mapper.map(m -> m.withValue((Object)Aggregate.plainEvent((JsonObject)m.value)));
    }

    private static Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> eventsFull() {
        return Filter.filter(m -> Event.isEvent((JsonObject)m.value));
    }

    private static JsonObject exception(JsonObject command, Throwable e) {
        return JsonUtil.createObjectBuilder((JsonObject)command).add("_error", true).add(EXCEPTION, Util.getStackTrace((Throwable)e)).build();
    }

    private static Message<String, JsonObject> exceptionMessage(Throwable t) {
        return Message.message(null, (Object)Aggregate.exception(JsonUtil.emptyObject(), t));
    }

    private static Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> forCommand(String command) {
        return Filter.filter(m -> command.equals(((JsonObject)m.value).getString("_command")));
    }

    private static JsonObject idsToLowerCase(JsonObject json) {
        return JsonUtil.createObjectBuilder((JsonObject)json).add("_id", json.getString("_id").toLowerCase()).add("_corr", json.getString("_corr").toLowerCase()).build();
    }

    private static boolean isMatchingCommand(JsonObject command, JsonObject currentState) {
        return Optional.of(command.getInt("_seq", -1)).map(seq -> seq == -1 || seq.intValue() == currentState.getInt("_seq", -1)).orElse(true);
    }

    private static JsonObject makeManaged(JsonObject command, JsonObject state) {
        return ((JsonObjectBuilder)Builder.create(() -> JsonUtil.createObjectBuilder((JsonObject)state)).updateIf(b -> !state.containsKey((Object)"_id"), b -> b.add("_id", command.getString("_id"))).updateIf(b -> !state.containsKey((Object)"_type"), b -> b.add("_type", command.getString("_type"))).build()).build();
    }

    private static Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> matchingFilter() {
        return Filter.filter(m -> Aggregate.isMatchingCommand(Aggregate.command((Message<String, JsonObject>)m), Aggregate.state((Message<String, JsonObject>)m)));
    }

    public static CompletionStage<JsonObject> patch(JsonObject command, JsonObject currentState) {
        return CompletableFuture.completedFuture(Optional.ofNullable(command.getJsonArray("_ops")).map(ops -> (JsonObject)JsonUtil.createPatch((JsonArray)ops).apply((JsonStructure)currentState)).orElse(currentState));
    }

    private static JsonObject plainEvent(JsonObject fullEvent) {
        return JsonUtil.createObjectBuilder((JsonObject)fullEvent).remove("_after").remove("_before").build();
    }

    private static MongoDatabase prepareDatabase(MongoDatabase database) {
        return database.withReadConcern(ReadConcern.MAJORITY).withWriteConcern(WriteConcern.MAJORITY);
    }

    public static CompletionStage<JsonObject> put(JsonObject command) {
        return CompletableFuture.completedFuture(JsonUtil.createObjectBuilder((JsonObject)command).remove("_command").build());
    }

    public static Reducer reducer(BinaryOperator<JsonObject> transformer) {
        return (command, state) -> CompletableFuture.completedFuture((JsonObject)transformer.apply((JsonObject)command, state));
    }

    @SafeVarargs
    public static Reducer reducer(UnaryOperator<JsonObject> ... transformers) {
        UnaryOperator<JsonObject> function = Arrays.stream(transformers).reduce(json -> json, (result, t) -> j -> (JsonObject)t.apply((JsonObject)result.apply(j)));
        return (command, state) -> CompletableFuture.completedFuture((JsonObject)function.apply(Aggregate.createSource(command, state)));
    }

    private static JsonObjectBuilder removeTechnical(JsonObject json) {
        return TECHNICAL_FIELDS.stream().reduce(JsonUtil.createObjectBuilder((JsonObject)json), JsonObjectBuilder::remove, (b1, b2) -> b1);
    }

    private static JsonObject state(Message<String, JsonObject> message) {
        return ((JsonObject)message.value).getJsonObject(REDUCER_STATE);
    }

    private static JsonObject technicalUpdateOperator(JsonObject event) {
        return JsonUtil.createObjectBuilder().add(SET, JsonUtil.createObjectBuilder().add("_seq", event.getInt("_seq")).add("_corr", event.getString("_corr")).add("_timestamp", (JsonValue)event.getJsonNumber("_timestamp"))).build();
    }

    private static Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> unique(Function<JsonObject, JsonValue> uniqueFunction) {
        return Pipe.pipe((Flow.Processor)Mapper.map(m -> Pair.pair((Object)m, (Object)((JsonValue)uniqueFunction.apply((JsonObject)m.value))))).then(NotFilter.notFilter(pair -> ((JsonValue)pair.second).equals((Object)JsonValue.NULL))).then(Mapper.map(pair -> ((Message)pair.first).withKey((Object)JsonUtil.string((JsonValue)((JsonValue)pair.second)))));
    }

    private static CompletionStage<Boolean> updateAggregate(MongoCollection<Document> collection, JsonObject currentState, JsonObject event, ClientSession session) {
        Bson filter = Filters.eq((String)"_id", (Object)currentState.getString("_id"));
        List<UpdateOneModel> operators = Stream.concat(Stream.of(Aggregate.technicalUpdateOperator(event)), Patch.updateOperators((JsonObject)currentState, event.getJsonArray("_ops").stream().filter(JsonUtil::isObject).map(JsonValue::asJsonObject))).map(op -> new UpdateOneModel(filter, (Bson)BsonUtil.fromJson((JsonObject)op))).toList();
        BulkWriteOptions options = new BulkWriteOptions().ordered(true);
        return Collection.exec(collection, c -> c.bulkWrite(session, operators, options)).thenApply(BulkWriteResult::wasAcknowledged).thenApply(result -> (Boolean)Util.must((Object)result, r -> r));
    }

    public String app() {
        return this.app;
    }

    private Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> allowedProcessor() {
        return Mapper.map(m -> Command.isAllowed(Aggregate.state((Message<String, JsonObject>)m), Aggregate.command((Message<String, JsonObject>)m), this.breakingTheGlass) ? m : m.withValue((Object)Aggregate.accessError(Aggregate.command((Message<String, JsonObject>)m))));
    }

    public Streams<String, JsonObject, T, U> build() {
        Util.must((this.app != null && this.builder != null && this.type != null && this.database != null && this.client != null ? 1 : 0) != 0);
        this.aggregateCollection = Optional.of(this.database).map(d -> d.getCollection(this.mongoAggregateCollection())).orElse(null);
        this.session = Aggregate.createSession(this.client);
        Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> errors = Aggregate.errors();
        Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> eventsFull = Aggregate.eventsFull();
        return this.commandSource(this.createCommands()).process(BackpressureTimout.backpressureTimeout((Duration)this.backpressureTimeout, () -> "No backpressure signal from the reducer of the app " + this.fullType())).process(this.reducer()).subscribe(Fanout.of((Flow.Subscriber[])new Flow.Subscriber[]{eventsFull, errors})).to(this.topic(EVENT_FULL_TOPIC), eventsFull).to(this.topic(REPLY_TOPIC), errors).from(this.topic(EVENT_FULL_TOPIC), Aggregate.aggregates()).to(this.topic(REPLY_TOPIC)).from(this.topic(EVENT_FULL_TOPIC), Aggregate.aggregates()).to(this.topic(AGGREGATE_TOPIC)).from(this.topic(EVENT_FULL_TOPIC), Aggregate.events()).to(this.topic(EVENT_TOPIC));
    }

    private Streams<String, JsonObject, T, U> commandSource(Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> commands) {
        return this.uniqueFunction != null ? this.builder.from(this.topic("command"), Aggregate.unique(this.uniqueFunction)).to(this.topic(UNIQUE_TOPIC)).from(this.topic(UNIQUE_TOPIC), Box.box((Flow.Processor)Filter.filter(m -> Command.isCommand((JsonObject)m.value)), commands)) : this.builder.from(this.topic("command"), commands);
    }

    private Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> createCommands() {
        return this.commandProcessor != null ? Box.box(Aggregate.commands(), this.commandProcessor) : Aggregate.commands();
    }

    private Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> currentStateProcessor() {
        return Async.mapAsyncSequential(m -> this.getForever(() -> this.getCurrentState((JsonObject)m.value).thenApply(state -> m.withValue((Object)Aggregate.createSource(this.keepId((JsonObject)m.value, (JsonObject)state), state)))));
    }

    private CompletionStage<Boolean> deleteMongoAggregate(JsonObject aggregate) {
        this.trace(aggregate, a -> true, () -> "Deleting aggregate " + JsonUtil.string((JsonValue)aggregate));
        return Collection.deleteOne(this.aggregateCollection, (ClientSession)this.session, (Bson)Filters.eq((String)"_id", (Object)aggregate.getString("_id"))).thenApply(DeleteResult::wasAcknowledged).thenApply(result -> (Boolean)Util.must((Object)result, r -> r));
    }

    private Optional<CompletionStage<Boolean>> deleteReduction(JsonObject reduction) {
        return JsonUtil.getBoolean((JsonStructure)reduction, (String)"/_after/_deleted").filter(deleted -> deleted).map(deleted -> this.deleteMongoAggregate(reduction.getJsonObject("_after")));
    }

    public String environment() {
        return this.environment;
    }

    private String environmentSuffix() {
        return this.environment != null ? "-" + this.environment : "";
    }

    private Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> epilogueProcessor(Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> preprocessor) {
        return Pipe.pipe(preprocessor).then(this.currentStateProcessor()).then(Aggregate.matchingFilter()).then(this.allowedProcessor());
    }

    public String fullType() {
        return this.app + "-" + this.type;
    }

    private CompletionStage<JsonObject> getCurrentState(JsonObject command) {
        return this.getMongoCurrentState(command).thenApply(state -> Aggregate.makeManaged(command, state));
    }

    private <R> CompletionStage<R> getForever(SupplierWithException<CompletionStage<R>> get) {
        return Util.tryToGetForever(get, (Duration)BACK_OFF, this::logException);
    }

    private CompletionStage<JsonObject> getMongoCurrentState(JsonObject command) {
        return JsonClient.findOne(this.aggregateCollection, (ClientSession)this.session, (Bson)this.mongoStateCriterion(command)).thenApply(currentState -> currentState.orElseGet(JsonUtil::emptyObject));
    }

    private CompletionStage<JsonObject> handleAggregate(JsonObject reduction) {
        return Or.tryWith(() -> this.deleteReduction(reduction).orElse(null)).or(() -> this.updateReductionNew(reduction).orElse(null)).or(() -> this.updateReductionExisting(reduction).orElse(null)).get().map(result -> result.thenApply(res -> (Boolean)Util.must((Object)res, r -> r)).thenApply(res -> reduction)).orElseGet(() -> CompletableFuture.completedFuture(reduction));
    }

    private JsonObject keepId(JsonObject command, JsonObject currentState) {
        return this.uniqueFunction != null ? JsonUtil.createObjectBuilder((JsonObject)command).add("_id", currentState.getString("_id")).build() : command;
    }

    private void logException(Throwable e) {
        if (this.logger != null) {
            this.logger.log(Level.SEVERE, e, () -> this.type() + ": " + e.getMessage());
        }
    }

    private String mongoAggregateCollection() {
        return this.fullType() + this.environmentSuffix();
    }

    private Bson mongoStateCriterion(JsonObject command) {
        return Optional.ofNullable(this.uniqueFunction).map(f -> (JsonValue)f.apply(command)).map(this::mongoStateQuery).orElseGet(() -> Filters.eq((String)"_id", (Object)command.getString("_id")));
    }

    private Bson mongoStateQuery(JsonValue value) {
        return BsonUtil.fromJson((JsonObject)(JsonUtil.isObject((JsonValue)value) ? value.asJsonObject() : JsonUtil.createObjectBuilder().add("$expr", JsonUtil.createObjectBuilder().add("$eq", JsonUtil.createArrayBuilder().add(this.uniqueExpression).add(value))).build()));
    }

    private JsonObject processNewState(JsonObject oldState, JsonObject newState, JsonObject command) {
        return Optional.ofNullable(newState).filter(state -> !Command.hasError(state)).map(state -> Aggregate.createOps(oldState, newState)).filter(ops -> !ops.isEmpty()).map(ops -> Aggregate.createEvent(oldState, newState, command, ops)).orElse(newState);
    }

    private Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> reduceProcessor(Reducer reducer) {
        return Async.mapAsyncSequential(m -> Util.tryToGet(() -> ((CompletionStage)reducer.apply(Aggregate.command((Message<String, JsonObject>)m), Aggregate.state((Message<String, JsonObject>)m))).thenApply(arg_0 -> ((Message)m).withValue(arg_0)).exceptionally(Aggregate::exceptionMessage), t -> CompletableFuture.completedFuture(Aggregate.exceptionMessage(t))).orElse(null));
    }

    private Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> reducer() {
        return Pipe.pipe((Flow.Processor)Mapper.map(m -> m.withValue((Object)this.trace((JsonObject)m.value, j -> true, () -> "Reducing " + JsonUtil.string((JsonValue)((JsonValue)m.value)))))).then(this.reducer != null ? this.reducerGeneral() : this.reducerSpecific()).then(Mapper.map(m -> m.withValue((Object)this.trace((JsonObject)m.value, j -> true, () -> "Reduction result: " + (m.value != null ? JsonUtil.string((JsonValue)((JsonValue)m.value)) : "null"))))).then(Filter.filter(m -> m.value != null && !((JsonObject)m.value).isEmpty()));
    }

    private Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> reducerGeneral() {
        return this.reducerProcessor(PassThrough.passThrough(), this.reduceProcessor(this.reducer));
    }

    private Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> reducerProcessor(Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> preprocessor, Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> reducer) {
        Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> epilogue = this.epilogueProcessor(preprocessor);
        Flow.Processor errors = Filter.filter(m -> Command.hasError((JsonObject)m.value));
        Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> red = this.reducerProcessor(reducer);
        epilogue.subscribe(Fanout.of((Flow.Subscriber[])new Flow.Subscriber[]{errors, red}));
        return Combine.combine(epilogue, (Flow.Publisher)Merge.of((Flow.Publisher[])new Flow.Publisher[]{errors, red}));
    }

    private Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> reducerProcessor(Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> reducer) {
        return Pipe.pipe((Flow.Processor)Filter.filter(m -> !Command.hasError((JsonObject)m.value))).then(net.pincette.rs.Util.carryOver(reducer, m -> Pair.pair((Object)Aggregate.command((Message<String, JsonObject>)m), (Object)Aggregate.state((Message<String, JsonObject>)m)), (newState, pair) -> Command.hasError((JsonObject)newState.value) ? newState.withValue((Object)JsonUtil.merge((JsonObject[])new JsonObject[]{(JsonObject)pair.first, (JsonObject)newState.value})) : newState.withValue((Object)this.processNewState((JsonObject)pair.second, (JsonObject)newState.value, (JsonObject)pair.first)))).then(this.saveProcessor()).then(Buffer.buffer((int)1));
    }

    private Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> reducerProcessor(String command, Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> reducer) {
        return Pipe.pipe(Aggregate.forCommand(command)).then(this.reducerProcessor(Optional.ofNullable(this.commandProcessors.get(command)).orElseGet(PassThrough::passThrough), reducer));
    }

    private Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> reducerSpecific() {
        List<Flow.Processor> processors = Stream.concat(this.reducerProcessors.entrySet().stream().map(e -> this.reducerProcessor((String)e.getKey(), (Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>>)((Flow.Processor)e.getValue()))), this.reducers.entrySet().stream().map(e -> this.reducerProcessor((String)e.getKey(), this.reduceProcessor((Reducer)e.getValue())))).toList();
        Flow.Processor all = PassThrough.passThrough();
        all.subscribe(Fanout.of(processors));
        return Combine.combine((Flow.Subscriber)all, (Flow.Publisher)Merge.of(processors));
    }

    private CompletionStage<JsonObject> save(JsonObject newState) {
        return this.saveReduction(newState, this::handleAggregate);
    }

    private Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> saveProcessor() {
        return Async.mapAsyncSequential(m -> !Command.hasError((JsonObject)m.value) ? this.getForever(() -> this.save((JsonObject)m.value).thenApply(arg_0 -> ((Message)m).withValue(arg_0))) : CompletableFuture.completedFuture(m));
    }

    private CompletionStage<JsonObject> saveReduction(JsonObject reduction, Function<JsonObject, CompletionStage<JsonObject>> handleAggregate) {
        return Event.isEvent(reduction) ? handleAggregate.apply(reduction) : CompletableFuture.completedFuture(reduction);
    }

    public String topic(String purpose) {
        return this.fullType() + "-" + purpose + this.environmentSuffix();
    }

    private <V> V trace(V value, Predicate<V> test, Supplier<String> message) {
        if (this.logger != null && test.test(value)) {
            this.logger.finest(() -> System.currentTimeMillis() + " " + (String)message.get());
        }
        return value;
    }

    public String type() {
        return this.type;
    }

    private CompletionStage<Boolean> updateMongoAggregate(JsonObject aggregate) {
        this.trace(aggregate, a -> true, () -> "Replacing aggregate " + JsonUtil.string((JsonValue)aggregate));
        return JsonClient.update(this.aggregateCollection, (JsonObject)aggregate, (ClientSession)this.session).thenApply(result -> (Boolean)Util.must((Object)result, r -> r));
    }

    private Optional<CompletionStage<Boolean>> updateReductionNew(JsonObject reduction) {
        return Aggregate.beforeWithoutTechnical(reduction).filter(Map::isEmpty).map(before -> this.updateMongoAggregate(reduction.getJsonObject("_after")));
    }

    private Optional<CompletionStage<Boolean>> updateReductionExisting(JsonObject reduction) {
        return Aggregate.beforeWithoutTechnical(reduction).filter(before -> !before.isEmpty()).map(before -> this.trace(before, b -> true, () -> "Updating aggregate " + JsonUtil.string((JsonValue)reduction.getJsonObject("_after")))).map(before -> Aggregate.updateAggregate(this.aggregateCollection, reduction.getJsonObject("_before"), reduction, this.session));
    }

    public Aggregate<T, U> withApp(String app) {
        this.app = app;
        return this;
    }

    public Aggregate<T, U> withBackpressureTimeout(Duration backpressureTimeout) {
        this.backpressureTimeout = backpressureTimeout;
        return this;
    }

    public Aggregate<T, U> withBreakingTheGlass() {
        this.breakingTheGlass = true;
        return this;
    }

    public Aggregate<T, U> withBuilder(Streams<String, JsonObject, T, U> builder) {
        this.builder = builder;
        return this;
    }

    public Aggregate<T, U> withCommandProcessor(Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> commandProcessor) {
        this.commandProcessor = commandProcessor;
        return this;
    }

    public Aggregate<T, U> withCommandProcessor(String command, Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> commandProcessor) {
        this.commandProcessors.put(command, commandProcessor);
        return this;
    }

    public Aggregate<T, U> withEnvironment(String environment) {
        this.environment = environment;
        return this;
    }

    public Aggregate<T, U> withLogger(Logger logger) {
        this.logger = logger;
        return this;
    }

    public Aggregate<T, U> withMongoClient(MongoClient client) {
        this.client = client;
        return this;
    }

    public Aggregate<T, U> withMongoDatabase(MongoDatabase database) {
        this.database = Aggregate.prepareDatabase(database);
        return this;
    }

    public Aggregate<T, U> withReducer(String command, Reducer reducer) {
        if (!this.reducerProcessors.containsKey(command)) {
            this.reducers.put(command, reducer);
        }
        return this;
    }

    public Aggregate<T, U> withReducer(String command, Flow.Processor<Message<String, JsonObject>, Message<String, JsonObject>> reducer) {
        this.reducerProcessors.put(command, reducer);
        this.reducers.remove(command);
        return this;
    }

    public Aggregate<T, U> withReducer(Reducer reducer) {
        this.reducer = reducer;
        return this;
    }

    public Aggregate<T, U> withType(String type) {
        this.type = type;
        return this;
    }

    public Aggregate<T, U> withUniqueExpression(JsonValue expression) {
        this.uniqueExpression = expression;
        this.uniqueFunction = Expression.function((JsonValue)expression);
        return this;
    }
}

