/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.ignite.compute;

import java.util.ArrayList;
import java.util.Collection;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.ignite.compute.IgniteComputeEndpoint;
import org.apache.camel.component.ignite.compute.IgniteComputeExecutionType;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.util.MessageHelper;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;

public class IgniteComputeProducer
extends DefaultAsyncProducer {
    private IgniteComputeEndpoint endpoint;

    public IgniteComputeProducer(IgniteComputeEndpoint endpoint) {
        super((Endpoint)endpoint);
        this.endpoint = endpoint;
    }

    public boolean process(Exchange exchange, AsyncCallback callback) {
        IgniteCompute compute = this.endpoint.createIgniteCompute().withAsync();
        try {
            switch (this.executionTypeFor(exchange)) {
                case CALL: {
                    this.doCall(exchange, callback, compute);
                    break;
                }
                case BROADCAST: {
                    this.doBroadcast(exchange, callback, compute);
                    break;
                }
                case EXECUTE: {
                    this.doExecute(exchange, callback, compute);
                    break;
                }
                case RUN: {
                    this.doRun(exchange, callback, compute);
                    break;
                }
                case APPLY: {
                    this.doApply(exchange, callback, compute);
                    break;
                }
                case AFFINITY_CALL: {
                    this.doAffinityCall(exchange, callback, compute);
                    break;
                }
                case AFFINITY_RUN: {
                    this.doAffinityRun(exchange, callback, compute);
                    break;
                }
                default: {
                    exchange.setException((Throwable)new UnsupportedOperationException("Operation not supported by Ignite Compute producer."));
                    return true;
                }
            }
            compute.future().listen((IgniteInClosure)IgniteInCamelClosure.create(exchange, callback));
        }
        catch (Exception e) {
            exchange.setException((Throwable)e);
            return true;
        }
        return false;
    }

    private void doCall(Exchange exchange, AsyncCallback callback, IgniteCompute compute) throws Exception {
        Object job = exchange.getIn().getBody();
        IgniteReducer reducer = (IgniteReducer)exchange.getIn().getHeader("CamelIgniteComputeReducer", IgniteReducer.class);
        if (Collection.class.isAssignableFrom(job.getClass())) {
            Collection col = (Collection)job;
            TypeConverter tc = exchange.getContext().getTypeConverter();
            ArrayList<Object> callables = new ArrayList<Object>(col.size());
            for (Object o : col) {
                callables.add(tc.mandatoryConvertTo(IgniteCallable.class, o));
            }
            if (reducer != null) {
                compute.call(callables, reducer);
            } else {
                compute.call(callables);
            }
        } else if (IgniteCallable.class.isAssignableFrom(job.getClass())) {
            compute.call((IgniteCallable)job);
        } else {
            throw new RuntimeCamelException(String.format("Ignite Compute endpoint with CALL executionType is only supported for IgniteCallable payloads, or collections of them. The payload type was: %s.", job.getClass().getName()));
        }
    }

    private void doBroadcast(Exchange exchange, AsyncCallback callback, IgniteCompute compute) throws Exception {
        Object job = exchange.getIn().getBody();
        if (IgniteCallable.class.isAssignableFrom(job.getClass())) {
            compute.broadcast((IgniteCallable)job);
        } else if (IgniteRunnable.class.isAssignableFrom(job.getClass())) {
            compute.broadcast((IgniteRunnable)job);
        } else if (IgniteClosure.class.isAssignableFrom(job.getClass())) {
            compute.broadcast((IgniteClosure)job, exchange.getIn().getHeader("CamelIgniteComputeParameters"));
        } else {
            throw new RuntimeCamelException(String.format("Ignite Compute endpoint with BROADCAST executionType is only supported for IgniteCallable, IgniteRunnable or IgniteClosure payloads. The payload type was: %s.", job.getClass().getName()));
        }
    }

    private void doExecute(Exchange exchange, AsyncCallback callback, IgniteCompute compute) throws Exception {
        Object job = exchange.getIn().getBody();
        Object params = exchange.getIn().getHeader("CamelIgniteComputeParameters");
        if (job instanceof Class && ComputeTask.class.isAssignableFrom((Class)job)) {
            Class task = (Class)job;
            compute.execute(task, params);
        } else if (ComputeTask.class.isAssignableFrom(job.getClass())) {
            compute.execute((ComputeTask)job, params);
        } else if (this.endpoint.getTaskName() != null) {
            if (exchange.getIn().getBody() != null) {
                params = exchange.getIn().getBody();
            }
            compute.execute(this.endpoint.getTaskName(), params);
        } else {
            throw new RuntimeCamelException(String.format("Ignite Compute endpoint with EXECUTE executionType is only supported for ComputeTask payloads, Class<ComputeTask> or any payload in conjunction with the task name option. The payload type was: %s.", job.getClass().getName()));
        }
    }

    private void doRun(Exchange exchange, AsyncCallback callback, IgniteCompute compute) throws Exception {
        Object job = exchange.getIn().getBody();
        if (Collection.class.isAssignableFrom(job.getClass())) {
            Collection col = (Collection)job;
            TypeConverter tc = exchange.getContext().getTypeConverter();
            ArrayList<Object> runnables = new ArrayList<Object>(col.size());
            for (Object o : col) {
                runnables.add(tc.mandatoryConvertTo(IgniteRunnable.class, o));
            }
            compute.run(runnables);
        } else if (IgniteRunnable.class.isAssignableFrom(job.getClass())) {
            compute.run((IgniteRunnable)job);
        } else {
            throw new RuntimeCamelException(String.format("Ignite Compute endpoint with RUN executionType is only supported for IgniteRunnable payloads, or collections of them. The payload type was: %s.", job.getClass().getName()));
        }
    }

    private <T, R1, R2> void doApply(Exchange exchange, AsyncCallback callback, IgniteCompute compute) throws Exception {
        IgniteClosure job = (IgniteClosure)exchange.getIn().getBody(IgniteClosure.class);
        Object params = exchange.getIn().getHeader("CamelIgniteComputeParameters");
        if (job == null || params == null) {
            throw new RuntimeCamelException(String.format("Ignite Compute endpoint with APPLY executionType is only supported for IgniteClosure payloads with parameters. The payload type was: %s.", exchange.getIn().getBody().getClass().getName()));
        }
        IgniteReducer reducer = (IgniteReducer)exchange.getIn().getHeader("CamelIgniteComputeReducer", IgniteReducer.class);
        if (Collection.class.isAssignableFrom(params.getClass())) {
            Collection colParams = (Collection)params;
            if (reducer == null) {
                compute.apply(job, colParams);
            } else {
                compute.apply(job, colParams, reducer);
            }
        } else {
            compute.apply(job, params);
        }
    }

    private void doAffinityCall(Exchange exchange, AsyncCallback callback, IgniteCompute compute) throws Exception {
        IgniteCallable job = (IgniteCallable)exchange.getIn().getBody(IgniteCallable.class);
        String affinityCache = (String)exchange.getIn().getHeader("CamelIgniteComputeAffinityCacheName", String.class);
        Object affinityKey = exchange.getIn().getHeader("CamelIgniteComputeAffinityKey", Object.class);
        if (job == null || affinityCache == null || affinityKey == null) {
            throw new RuntimeCamelException(String.format("Ignite Compute endpoint with AFFINITY_CALL executionType is only supported for IgniteCallable payloads, along with an affinity cache and key. The payload type was: %s.", exchange.getIn().getBody().getClass().getName()));
        }
        compute.affinityCall(affinityCache, affinityKey, job);
    }

    private void doAffinityRun(Exchange exchange, AsyncCallback callback, IgniteCompute compute) throws Exception {
        IgniteRunnable job = (IgniteRunnable)exchange.getIn().getBody(IgniteRunnable.class);
        String affinityCache = (String)exchange.getIn().getHeader("CamelIgniteComputeAffinityCacheName", String.class);
        Object affinityKey = exchange.getIn().getHeader("CamelIgniteComputeAffinityKey", Object.class);
        if (job == null || affinityCache == null || affinityKey == null) {
            throw new RuntimeCamelException(String.format("Ignite Compute endpoint with AFFINITY_RUN executionType is only supported for IgniteRunnable payloads, along with an affinity cache and key. The payload type was: %s.", exchange.getIn().getBody().getClass().getName()));
        }
        compute.affinityRun(affinityCache, affinityKey, job);
    }

    private IgniteComputeExecutionType executionTypeFor(Exchange exchange) {
        return (IgniteComputeExecutionType)((Object)exchange.getIn().getHeader("CamelIgniteComputeExecutionType", (Object)this.endpoint.getExecutionType(), IgniteComputeExecutionType.class));
    }

    private static class IgniteInCamelClosure
    implements IgniteInClosure<IgniteFuture<Object>> {
        private static final long serialVersionUID = 7486030906412223384L;
        private Exchange exchange;
        private AsyncCallback callback;

        private IgniteInCamelClosure() {
        }

        private static IgniteInCamelClosure create(Exchange exchange, AsyncCallback callback) {
            IgniteInCamelClosure answer = new IgniteInCamelClosure();
            answer.exchange = exchange;
            answer.callback = callback;
            return answer;
        }

        public void apply(IgniteFuture<Object> future) {
            Message in = this.exchange.getIn();
            Message out = this.exchange.getOut();
            MessageHelper.copyHeaders((Message)in, (Message)out, (boolean)true);
            Object result = null;
            try {
                result = future.get();
            }
            catch (Exception e) {
                this.exchange.setException((Throwable)e);
                this.callback.done(false);
                return;
            }
            this.exchange.getOut().setBody(result);
            this.callback.done(false);
        }
    }
}

