package io.datakernel.dataflow.server;

import io.datakernel.codec.CodecSubtype;
import io.datakernel.codec.StructuredCodec;
import io.datakernel.codec.StructuredCodecs;
import io.datakernel.codegen.DefiningClassLoader;
import io.datakernel.common.Initializable;
import io.datakernel.common.parse.ParseException;
import io.datakernel.dataflow.graph.StreamId;
import io.datakernel.dataflow.node.Node;
import io.datakernel.dataflow.node.NodeConsumerToList;
import io.datakernel.dataflow.node.NodeDownload;
import io.datakernel.dataflow.node.NodeFilter;
import io.datakernel.dataflow.node.NodeJoin;
import io.datakernel.dataflow.node.NodeMap;
import io.datakernel.dataflow.node.NodeMerge;
import io.datakernel.dataflow.node.NodeReduce;
import io.datakernel.dataflow.node.NodeReduceSimple;
import io.datakernel.dataflow.node.NodeShard;
import io.datakernel.dataflow.node.NodeSort;
import io.datakernel.dataflow.node.NodeSupplierOfIterable;
import io.datakernel.dataflow.node.NodeUnion;
import io.datakernel.dataflow.node.NodeUpload;
import io.datakernel.dataflow.server.command.DatagraphCommand;
import io.datakernel.dataflow.server.command.DatagraphCommandDownload;
import io.datakernel.dataflow.server.command.DatagraphCommandExecute;
import io.datakernel.dataflow.server.command.DatagraphResponse;
import io.datakernel.dataflow.server.command.DatagraphResponseAck;
import io.datakernel.dataflow.server.command.DatagraphResponseDisconnect;
import io.datakernel.dataflow.server.command.DatagraphResponseExecute;
import io.datakernel.datastream.processor.StreamJoin;
import io.datakernel.datastream.processor.StreamReducers;
import io.datakernel.serializer.BinarySerializer;
import io.datakernel.serializer.SerializerBuilder;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/dataflow/server/DatagraphSerialization.class */
public final class DatagraphSerialization implements Initializable<DatagraphSerialization> {
    static final Logger logger = LoggerFactory.getLogger(DatagraphSerialization.class);
    static final StructuredCodec<StreamId> STREAM_ID_CODEC = StructuredCodec.of(structuredInput -> {
        return new StreamId(structuredInput.readLong());
    }, (structuredOutput, streamId) -> {
        structuredOutput.writeLong(streamId.getId());
    });
    static final StructuredCodec<InetSocketAddress> ADDRESS_CODEC = StructuredCodec.of(structuredInput -> {
        String[] split = structuredInput.readString().split(":");
        if (split.length != 2) {
            throw new ParseException("Address should be splitted with a single ':'");
        }
        try {
            return new InetSocketAddress(InetAddress.getByName(split[0]), Integer.parseInt(split[1]));
        } catch (UnknownHostException e) {
            throw new ParseException(DatagraphSerialization.class, "Failed to create InetSocketAdress", e);
        }
    }, (structuredOutput, inetSocketAddress) -> {
        structuredOutput.writeString(inetSocketAddress.getAddress().getHostAddress() + ':' + inetSocketAddress.getPort());
    });
    final CodecProvider<Predicate> predicate = providerOf(() -> {
        return createCodec(Predicate.class);
    });
    final CodecProvider<Function> function = providerOf(() -> {
        return createCodec(Function.class);
    });
    final CodecProvider<Comparator> comparator = providerOf(() -> {
        return createCodec(Comparator.class);
    });
    final CodecProvider<StreamJoin.Joiner> joiner = providerOf(() -> {
        return createCodec(StreamJoin.Joiner.class);
    });
    final CodecProvider<StreamReducers.ReducerToResult> REDUCER_TO_RESULT_PROVIDER = providerOf(() -> {
        return createCodec(StreamReducers.ReducerToResult.class);
    });
    final CodecProvider<StreamReducers.Reducer> reducer = providerOf(() -> {
        return createCodec(StreamReducers.Reducer.class).with(StreamReducers.ReducerToResult.InputToAccumulator.class, StructuredCodec.ofObject(structuredInput -> {
            return new StreamReducers.ReducerToResult.InputToAccumulator((StreamReducers.ReducerToResult) structuredInput.readKey("reducerToResult", this.REDUCER_TO_RESULT_PROVIDER.get()));
        }, (structuredOutput, inputToAccumulator) -> {
            structuredOutput.writeKey("reducerToResult", this.REDUCER_TO_RESULT_PROVIDER.get(), inputToAccumulator.getReducerToResult());
        })).with(StreamReducers.ReducerToResult.InputToOutput.class, StructuredCodec.ofObject(structuredInput2 -> {
            return new StreamReducers.ReducerToResult.InputToOutput((StreamReducers.ReducerToResult) structuredInput2.readKey("reducerToResult", this.REDUCER_TO_RESULT_PROVIDER.get()));
        }, (structuredOutput2, inputToOutput) -> {
            structuredOutput2.writeKey("reducerToResult", this.REDUCER_TO_RESULT_PROVIDER.get(), inputToOutput.getReducerToResult());
        })).with(StreamReducers.ReducerToResult.AccumulatorToAccumulator.class, StructuredCodec.ofObject(structuredInput3 -> {
            return new StreamReducers.ReducerToResult.AccumulatorToAccumulator((StreamReducers.ReducerToResult) structuredInput3.readKey("reducerToResult", this.REDUCER_TO_RESULT_PROVIDER.get()));
        }, (structuredOutput3, accumulatorToAccumulator) -> {
            structuredOutput3.writeKey("reducerToResult", this.REDUCER_TO_RESULT_PROVIDER.get(), accumulatorToAccumulator.getReducerToResult());
        })).with(StreamReducers.ReducerToResult.AccumulatorToOutput.class, StructuredCodec.ofObject(structuredInput4 -> {
            return new StreamReducers.ReducerToResult.AccumulatorToOutput((StreamReducers.ReducerToResult) structuredInput4.readKey("reducerToResult", this.REDUCER_TO_RESULT_PROVIDER.get()));
        }, (structuredOutput4, accumulatorToOutput) -> {
            structuredOutput4.writeKey("reducerToResult", this.REDUCER_TO_RESULT_PROVIDER.get(), accumulatorToOutput.getReducerToResult());
        })).with(StreamReducers.MergeDistinctReducer.class, StructuredCodec.ofObject(structuredInput5 -> {
            return new StreamReducers.MergeDistinctReducer();
        }, (structuredOutput5, mergeDistinctReducer) -> {
        })).with(StreamReducers.MergeSortReducer.class, StructuredCodec.ofObject(structuredInput6 -> {
            return new StreamReducers.MergeSortReducer();
        }, (structuredOutput6, mergeSortReducer) -> {
        }));
    });
    private final CodecProvider<Map<StreamId, NodeReduce.Input>> nodeReduceInput = providerOf(() -> {
        return StructuredCodecs.ofMap(STREAM_ID_CODEC, StructuredCodec.ofObject(structuredInput -> {
            return new NodeReduce.Input((StreamReducers.Reducer) structuredInput.readKey("reducer", this.reducer.get()), (Function) structuredInput.readKey("keyFunction", this.function.get()));
        }, (structuredOutput, input) -> {
            structuredOutput.writeKey("reducer", this.reducer.get(), input.getReducer());
            structuredOutput.writeKey("keyFunction", this.function.get(), input.getKeyFunction());
        }));
    });
    final CodecProvider<Node> node = providerOf(() -> {
        return createCodec(Node.class).with(NodeDownload.class, StructuredCodec.ofObject(structuredInput -> {
            return new NodeDownload((Class) structuredInput.readKey("type", StructuredCodecs.CLASS_CODEC), (InetSocketAddress) structuredInput.readKey("address", ADDRESS_CODEC), (StreamId) structuredInput.readKey("streamId", STREAM_ID_CODEC), (StreamId) structuredInput.readKey("output", STREAM_ID_CODEC));
        }, (structuredOutput, nodeDownload) -> {
            structuredOutput.writeKey("type", StructuredCodecs.ofClass(), nodeDownload.getType());
            structuredOutput.writeKey("address", ADDRESS_CODEC, nodeDownload.getAddress());
            structuredOutput.writeKey("streamId", STREAM_ID_CODEC, nodeDownload.getStreamId());
            structuredOutput.writeKey("output", STREAM_ID_CODEC, nodeDownload.getOutput());
        })).with(NodeUpload.class, StructuredCodec.ofObject(structuredInput2 -> {
            return new NodeUpload((Class) structuredInput2.readKey("type", StructuredCodecs.CLASS_CODEC), (StreamId) structuredInput2.readKey("streamId", STREAM_ID_CODEC));
        }, (structuredOutput2, nodeUpload) -> {
            structuredOutput2.writeKey("type", StructuredCodecs.CLASS_CODEC, nodeUpload.getType());
            structuredOutput2.writeKey("streamId", STREAM_ID_CODEC, nodeUpload.getStreamId());
        })).with(NodeMap.class, StructuredCodec.ofObject(structuredInput3 -> {
            return new NodeMap((Function) structuredInput3.readKey("function", this.function.get()), (StreamId) structuredInput3.readKey("input", STREAM_ID_CODEC), (StreamId) structuredInput3.readKey("output", STREAM_ID_CODEC));
        }, (structuredOutput3, nodeMap) -> {
            structuredOutput3.writeKey("function", this.function.get(), nodeMap.getFunction());
            structuredOutput3.writeKey("input", STREAM_ID_CODEC, nodeMap.getInput());
            structuredOutput3.writeKey("output", STREAM_ID_CODEC, nodeMap.getOutput());
        })).with(NodeFilter.class, StructuredCodec.ofObject(structuredInput4 -> {
            return new NodeFilter((Predicate) structuredInput4.readKey("predicate", this.predicate.get()), (StreamId) structuredInput4.readKey("input", STREAM_ID_CODEC), (StreamId) structuredInput4.readKey("output", STREAM_ID_CODEC));
        }, (structuredOutput4, nodeFilter) -> {
            structuredOutput4.writeKey("predicate", this.predicate.get(), nodeFilter.getPredicate());
            structuredOutput4.writeKey("input", STREAM_ID_CODEC, nodeFilter.getInput());
            structuredOutput4.writeKey("output", STREAM_ID_CODEC, nodeFilter.getOutput());
        })).with(NodeSort.class, StructuredCodec.ofObject(structuredInput5 -> {
            return new NodeSort((Function) structuredInput5.readKey("keyFunction", this.function.get()), (Comparator) structuredInput5.readKey("keyComparator", this.comparator.get()), ((Boolean) structuredInput5.readKey("deduplicate", StructuredCodecs.BOOLEAN_CODEC)).booleanValue(), ((Integer) structuredInput5.readKey("itemsInMemorySize", StructuredCodecs.INT_CODEC)).intValue(), (StreamId) structuredInput5.readKey("input", STREAM_ID_CODEC), (StreamId) structuredInput5.readKey("output", STREAM_ID_CODEC));
        }, (structuredOutput5, nodeSort) -> {
            structuredOutput5.writeKey("keyFunction", this.function.get(), nodeSort.getKeyFunction());
            structuredOutput5.writeKey("keyComparator", this.comparator.get(), nodeSort.getKeyComparator());
            structuredOutput5.writeKey("deduplicate", StructuredCodecs.BOOLEAN_CODEC, Boolean.valueOf(nodeSort.isDeduplicate()));
            structuredOutput5.writeKey("itemsInMemorySize", StructuredCodecs.INT_CODEC, Integer.valueOf(nodeSort.getItemsInMemorySize()));
            structuredOutput5.writeKey("input", STREAM_ID_CODEC, nodeSort.getInput());
            structuredOutput5.writeKey("output", STREAM_ID_CODEC, nodeSort.getOutput());
        })).with(NodeShard.class, StructuredCodec.ofObject(structuredInput6 -> {
            return new NodeShard((Function) structuredInput6.readKey("keyFunction", this.function.get()), (StreamId) structuredInput6.readKey("input", STREAM_ID_CODEC), (List) structuredInput6.readKey("outputs", StructuredCodecs.ofList(STREAM_ID_CODEC)));
        }, (structuredOutput6, nodeShard) -> {
            structuredOutput6.writeKey("keyFunction", this.function.get(), nodeShard.getKeyFunction());
            structuredOutput6.writeKey("input", STREAM_ID_CODEC, nodeShard.getInput());
            structuredOutput6.writeKey("outputs", StructuredCodecs.ofList(STREAM_ID_CODEC), nodeShard.getOutputs());
        })).with(NodeMerge.class, StructuredCodec.ofObject(structuredInput7 -> {
            return new NodeMerge((Function) structuredInput7.readKey("keyFunction", this.function.get()), (Comparator) structuredInput7.readKey("keyComparator", this.comparator.get()), ((Boolean) structuredInput7.readKey("deduplicate", StructuredCodecs.BOOLEAN_CODEC)).booleanValue(), (List) structuredInput7.readKey("inputs", StructuredCodecs.ofList(STREAM_ID_CODEC)), (StreamId) structuredInput7.readKey("output", STREAM_ID_CODEC));
        }, (structuredOutput7, nodeMerge) -> {
            structuredOutput7.writeKey("keyFunction", this.function.get(), nodeMerge.getKeyFunction());
            structuredOutput7.writeKey("keyComparator", this.comparator.get(), nodeMerge.getKeyComparator());
            structuredOutput7.writeKey("deduplicate", StructuredCodecs.BOOLEAN_CODEC, Boolean.valueOf(nodeMerge.isDeduplicate()));
            structuredOutput7.writeKey("inputs", StructuredCodecs.ofList(STREAM_ID_CODEC), nodeMerge.getInputs());
            structuredOutput7.writeKey("output", STREAM_ID_CODEC, nodeMerge.getOutput());
        })).with(NodeReduce.class, StructuredCodec.ofObject(structuredInput8 -> {
            return new NodeReduce((Comparator) structuredInput8.readKey("keyComparator", this.comparator.get()), (Map) structuredInput8.readKey("inputs", this.nodeReduceInput.get()), (StreamId) structuredInput8.readKey("output", STREAM_ID_CODEC));
        }, (structuredOutput8, nodeReduce) -> {
            structuredOutput8.writeKey("keyComparator", this.comparator.get(), nodeReduce.getKeyComparator());
            structuredOutput8.writeKey("inputs", this.nodeReduceInput.get(), nodeReduce.getInputs());
            structuredOutput8.writeKey("output", STREAM_ID_CODEC, nodeReduce.getOutput());
        })).with(NodeReduceSimple.class, StructuredCodec.ofObject(structuredInput9 -> {
            return new NodeReduceSimple((Function) structuredInput9.readKey("keyFunction", this.function.get()), (Comparator) structuredInput9.readKey("keyComparator", this.comparator.get()), (StreamReducers.Reducer) structuredInput9.readKey("reducer", this.reducer.get()), (List) structuredInput9.readKey("inputs", StructuredCodecs.ofList(STREAM_ID_CODEC)), (StreamId) structuredInput9.readKey("output", STREAM_ID_CODEC));
        }, (structuredOutput9, nodeReduceSimple) -> {
            structuredOutput9.writeKey("keyFunction", this.function.get(), nodeReduceSimple.getKeyFunction());
            structuredOutput9.writeKey("keyComparator", this.comparator.get(), nodeReduceSimple.getKeyComparator());
            structuredOutput9.writeKey("reducer", this.reducer.get(), nodeReduceSimple.getReducer());
            structuredOutput9.writeKey("inputs", StructuredCodecs.ofList(STREAM_ID_CODEC), nodeReduceSimple.getInputs());
            structuredOutput9.writeKey("output", STREAM_ID_CODEC, nodeReduceSimple.getOutput());
        })).with(NodeJoin.class, StructuredCodec.ofObject(structuredInput10 -> {
            return new NodeJoin((StreamId) structuredInput10.readKey("left", STREAM_ID_CODEC), (StreamId) structuredInput10.readKey("right", STREAM_ID_CODEC), (StreamId) structuredInput10.readKey("output", STREAM_ID_CODEC), (Comparator) structuredInput10.readKey("keyComparator", this.comparator.get()), (Function) structuredInput10.readKey("leftKeyFunction", this.function.get()), (Function) structuredInput10.readKey("rightKeyFunction", this.function.get()), (StreamJoin.Joiner) structuredInput10.readKey("joiner", this.joiner.get()));
        }, (structuredOutput10, nodeJoin) -> {
            structuredOutput10.writeKey("left", STREAM_ID_CODEC, nodeJoin.getLeft());
            structuredOutput10.writeKey("right", STREAM_ID_CODEC, nodeJoin.getRight());
            structuredOutput10.writeKey("output", STREAM_ID_CODEC, nodeJoin.getOutput());
            structuredOutput10.writeKey("keyComparator", this.comparator.get(), nodeJoin.getKeyComparator());
            structuredOutput10.writeKey("leftKeyFunction", this.function.get(), nodeJoin.getLeftKeyFunction());
            structuredOutput10.writeKey("rightKeyFunction", this.function.get(), nodeJoin.getRightKeyFunction());
            structuredOutput10.writeKey("joiner", this.joiner.get(), nodeJoin.getJoiner());
        })).with(NodeUnion.class, StructuredCodec.ofObject(structuredInput11 -> {
            return new NodeUnion((List) structuredInput11.readKey("inputs", StructuredCodecs.ofList(STREAM_ID_CODEC)), (StreamId) structuredInput11.readKey("output", STREAM_ID_CODEC));
        }, (structuredOutput11, nodeUnion) -> {
            structuredOutput11.writeKey("inputs", StructuredCodecs.ofList(STREAM_ID_CODEC), nodeUnion.getInputs());
            structuredOutput11.writeKey("output", STREAM_ID_CODEC, nodeUnion.getOutput());
        })).with(NodeSupplierOfIterable.class, StructuredCodec.ofObject(structuredInput12 -> {
            return new NodeSupplierOfIterable(structuredInput12.readKey("iterableId", StructuredCodecs.STRING_CODEC), (StreamId) structuredInput12.readKey("output", STREAM_ID_CODEC));
        }, (structuredOutput12, nodeSupplierOfIterable) -> {
            structuredOutput12.writeKey("iterableId", StructuredCodecs.STRING_CODEC, (String) nodeSupplierOfIterable.getIterableId());
            structuredOutput12.writeKey("output", STREAM_ID_CODEC, nodeSupplierOfIterable.getOutput());
        })).with(NodeConsumerToList.class, StructuredCodec.ofObject(structuredInput13 -> {
            return new NodeConsumerToList((StreamId) structuredInput13.readKey("input", STREAM_ID_CODEC), structuredInput13.readKey("listId", StructuredCodecs.STRING_CODEC));
        }, (structuredOutput13, nodeConsumerToList) -> {
            structuredOutput13.writeKey("input", STREAM_ID_CODEC, nodeConsumerToList.getInput());
            structuredOutput13.writeKey("listId", StructuredCodecs.STRING_CODEC, (String) nodeConsumerToList.getListId());
        }));
    });
    final CodecProvider<DatagraphCommand> command = providerOf(() -> {
        return createCodec(DatagraphCommand.class).with(DatagraphCommandDownload.class, "Download", StructuredCodecs.object(DatagraphCommandDownload::new, "streamId", (v0) -> {
            return v0.getStreamId();
        }, STREAM_ID_CODEC)).with(DatagraphCommandExecute.class, "Execute", StructuredCodecs.object(DatagraphCommandExecute::new, "nodes", (v0) -> {
            return v0.getNodes();
        }, StructuredCodecs.ofList(this.node.get())));
    });
    final CodecProvider<DatagraphResponse> response = providerOf(() -> {
        return createCodec(DatagraphResponse.class).with(DatagraphResponseAck.class, "Ack", StructuredCodecs.object(DatagraphResponseAck::new)).with(DatagraphResponseDisconnect.class, "Disconnect", StructuredCodecs.object(DatagraphResponseDisconnect::new)).with(DatagraphResponseExecute.class, "Execute", StructuredCodecs.object(DatagraphResponseExecute::new, "nodeIds", (v0) -> {
            return v0.getNodeIds();
        }, StructuredCodecs.ofList(StructuredCodecs.INT_CODEC)));
    });
    private final Map<Class<?>, StructuredCodec<?>> userDefinedTypes = new HashMap();
    private final Map<Class<?>, BinarySerializer<?>> serializers = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/datakernel/dataflow/server/DatagraphSerialization$CodecProvider.class */
    public static final class CodecProvider<T> {
        private StructuredCodec<T> ref;
        private final Supplier<StructuredCodec<T>> supplier;

        CodecProvider(Supplier<StructuredCodec<T>> supplier) {
            this.supplier = supplier;
        }

        StructuredCodec<T> get() {
            if (this.ref != null) {
                return this.ref;
            }
            this.ref = this.supplier.get();
            return this.ref;
        }
    }

    private <T> CodecProvider providerOf(Supplier<StructuredCodec<T>> supplier) {
        return new CodecProvider(supplier);
    }

    private <T> CodecSubtype<T> createCodec(Class<T> cls) {
        CodecSubtype<T> create = CodecSubtype.create();
        for (Map.Entry<Class<?>, StructuredCodec<?>> entry : this.userDefinedTypes.entrySet()) {
            Class<?> key = entry.getKey();
            StructuredCodec<?> value = entry.getValue();
            if (cls.isAssignableFrom(key)) {
                create.with(key, value);
            }
        }
        return create;
    }

    private DatagraphSerialization() {
    }

    public static DatagraphSerialization create() {
        return new DatagraphSerialization();
    }

    public <T> DatagraphSerialization withCodec(Class<T> cls, StructuredCodec<T> structuredCodec) {
        this.userDefinedTypes.put(cls, structuredCodec);
        return this;
    }

    public <T> DatagraphSerialization withBufferSerializer(Class<T> cls, BinarySerializer<T> binarySerializer) {
        this.serializers.put(cls, binarySerializer);
        return this;
    }

    public StructuredCodec<DatagraphCommand> getCommandCodec() {
        return this.command.get();
    }

    public StructuredCodec<DatagraphResponse> getResponseCodec() {
        return this.response.get();
    }

    public StructuredCodec<Node> getNodeCodec() {
        return this.node.get();
    }

    public synchronized <T> BinarySerializer<T> getSerializer(Class<T> cls) {
        BinarySerializer<?> binarySerializer = this.serializers.get(cls);
        if (binarySerializer == null) {
            logger.info("Creating serializer for {}", cls);
            binarySerializer = SerializerBuilder.create(DefiningClassLoader.create(ClassLoader.getSystemClassLoader())).build(cls);
            this.serializers.put(cls, binarySerializer);
        }
        return (BinarySerializer<T>) binarySerializer;
    }
}
