package org.apache.ignite.stream.camel;

import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ServiceStatus;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.stream.StreamAdapter;

/* loaded from: input_file:org/apache/ignite/stream/camel/CamelStreamer.class */
public class CamelStreamer<K, V> extends StreamAdapter<Exchange, K, V> implements Processor {
    private IgniteLogger log;
    private CamelContext camelCtx;
    private String endpointUri;
    private Endpoint endpoint;
    private Consumer consumer;
    private Processor resProc;

    public void start() throws IgniteException {
        A.notNullOrEmpty(this.endpointUri, "endpoint URI must be provided");
        A.ensure((getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null) ? false : true, "tuple extractor missing");
        if (this.camelCtx == null) {
            this.camelCtx = new DefaultCamelContext();
        }
        if (!this.camelCtx.isStarted()) {
            this.camelCtx.start();
        }
        if (!this.camelCtx.isRunAllowed()) {
            throw new IgniteException("Failed to start Camel streamer (CamelContext not in a runnable state).");
        }
        this.log = getIgnite().log();
        try {
            this.endpoint = CamelContextHelper.getMandatoryEndpoint(this.camelCtx, this.endpointUri);
            try {
                this.consumer = this.endpoint.createConsumer(this);
                try {
                    ServiceHelper.startService(new Object[]{this.camelCtx, this.endpoint, this.consumer});
                    U.log(this.log, "Started Camel streamer consuming from endpoint URI: " + this.endpointUri);
                } catch (Exception e) {
                    U.error(this.log, e);
                    this.camelCtx.stop();
                    try {
                        ServiceHelper.stopAndShutdownServices(new Object[]{this.camelCtx, this.endpoint, this.consumer});
                        this.consumer = null;
                        this.endpoint = null;
                        throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
                    } catch (Exception e2) {
                        throw new IgniteException("Failed to start Camel streamer; failed to stop the context, endpoint or consumer during rollback of failed initialization [errMsg=" + e.getMessage() + ", stopErrMsg=" + e2.getMessage() + ']');
                    }
                }
            } catch (Exception e3) {
                U.error(this.log, e3);
                this.camelCtx.stop();
                throw new IgniteException("Failed to start Camel streamer [errMsg=" + e3.getMessage() + ']');
            }
        } catch (Exception e4) {
            U.error(this.log, e4);
            this.camelCtx.stop();
            throw new IgniteException("Failed to start Camel streamer [errMsg=" + e4.getMessage() + ']');
        }
    }

    public void stop() throws IgniteException {
        if (this.camelCtx.getStatus() == ServiceStatus.Stopped || this.camelCtx.getStatus() == ServiceStatus.Stopping) {
            throw new IgniteException("Failed to stop Camel streamer (CamelContext already stopped or stopping).");
        }
        try {
            ServiceHelper.stopAndShutdownServices(new Object[]{this.camelCtx, this.endpoint, this.consumer});
            U.log(this.log, "Stopped Camel streamer, formerly consuming from endpoint URI: " + this.endpointUri);
        } catch (Exception e) {
            throw new IgniteException("Failed to stop Camel streamer [errMsg=" + e.getMessage() + ']');
        }
    }

    public void process(Exchange exchange) throws Exception {
        if (getMultipleTupleExtractor() == null) {
            getStreamer().addData(getSingleTupleExtractor().extract(exchange));
        } else {
            getStreamer().addData(getMultipleTupleExtractor().extract(exchange));
        }
        if (this.resProc != null) {
            this.resProc.process(exchange);
        }
    }

    public CamelContext getCamelContext() {
        return this.camelCtx;
    }

    public void setCamelContext(CamelContext camelContext) {
        this.camelCtx = camelContext;
    }

    public String getEndpointUri() {
        return this.endpointUri;
    }

    public void setEndpointUri(String str) {
        this.endpointUri = str;
    }

    public Processor getResponseProcessor() {
        return this.resProc;
    }

    public void setResponseProcessor(Processor processor) {
        this.resProc = processor;
    }
}
