package org.apache.camel.component.beanstalk;

import com.surftools.BeanstalkClient.BeanstalkException;
import com.surftools.BeanstalkClient.Client;
import com.surftools.BeanstalkClient.Job;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.beanstalk.processors.BuryCommand;
import org.apache.camel.component.beanstalk.processors.Command;
import org.apache.camel.component.beanstalk.processors.DeleteCommand;
import org.apache.camel.component.beanstalk.processors.ReleaseCommand;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.ScheduledPollConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/beanstalk/BeanstalkConsumer.class */
public class BeanstalkConsumer extends ScheduledPollConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(BeanstalkConsumer.class);
    private static final String[] STATS_KEY_STR = {"tube", "state"};
    private static final String[] STATS_KEY_INT = {"age", "time-left", "timeouts", "releases", "buries", "kicks"};
    private BeanstalkCommand onFailure;
    private boolean useBlockIO;
    private boolean awaitJob;
    private Client client;
    private ExecutorService executor;
    private Synchronization sync;
    private final Runnable initTask;
    private final Callable<Exchange> pollTask;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/camel/component/beanstalk/BeanstalkConsumer$Sync.class */
    public class Sync implements Synchronization {
        protected final Command successCommand;
        protected final Command failureCommand;

        /* loaded from: input_file:org/apache/camel/component/beanstalk/BeanstalkConsumer$Sync$RunCommand.class */
        class RunCommand implements Runnable {
            private final Command command;
            private final Exchange exchange;

            RunCommand(Command command, Exchange exchange) {
                this.command = command;
                this.exchange = exchange;
            }

            @Override // java.lang.Runnable
            public void run() {
                try {
                    doPostProcess();
                } catch (Exception e) {
                    BeanstalkConsumer.LOG.error("{} could not post-process exchange {}", new Object[]{this.command.getClass().getName(), this.exchange, e});
                    this.exchange.setException(e);
                }
            }

            private void doPostProcess() throws Exception {
                try {
                    this.command.act(BeanstalkConsumer.this.client, this.exchange);
                } catch (BeanstalkException e) {
                    BeanstalkConsumer.LOG.warn("Post-processing {} of exchange {} failed, retrying.", new Object[]{this.command.getClass().getName(), this.exchange, e});
                    BeanstalkConsumer.this.resetClient();
                    this.command.act(BeanstalkConsumer.this.client, this.exchange);
                }
            }
        }

        Sync() {
            this.successCommand = new DeleteCommand(BeanstalkConsumer.this.m3getEndpoint());
            if (BeanstalkComponent.COMMAND_BURY.equals(BeanstalkConsumer.this.onFailure.name())) {
                this.failureCommand = new BuryCommand(BeanstalkConsumer.this.m3getEndpoint());
            } else if (BeanstalkComponent.COMMAND_RELEASE.equals(BeanstalkConsumer.this.onFailure.name())) {
                this.failureCommand = new ReleaseCommand(BeanstalkConsumer.this.m3getEndpoint());
            } else {
                if (!BeanstalkComponent.COMMAND_DELETE.equals(BeanstalkConsumer.this.onFailure.name())) {
                    throw new IllegalArgumentException(String.format("Unknown failure command: %s", BeanstalkConsumer.this.onFailure));
                }
                this.failureCommand = new DeleteCommand(BeanstalkConsumer.this.m3getEndpoint());
            }
        }

        public void onComplete(Exchange exchange) {
            try {
                BeanstalkConsumer.this.executor.submit(new RunCommand(this.successCommand, exchange)).get();
            } catch (Exception e) {
                BeanstalkConsumer.LOG.error("Could not run completion of exchange {}", exchange, e);
            }
        }

        public void onFailure(Exchange exchange) {
            try {
                BeanstalkConsumer.this.executor.submit(new RunCommand(this.failureCommand, exchange)).get();
            } catch (Exception e) {
                BeanstalkConsumer.LOG.error("{} could not run failure of exchange {}", new Object[]{this.failureCommand.getClass().getName(), exchange, e});
            }
        }
    }

    public BeanstalkConsumer(BeanstalkEndpoint beanstalkEndpoint, Processor processor) {
        super(beanstalkEndpoint, processor);
        this.initTask = new Runnable() { // from class: org.apache.camel.component.beanstalk.BeanstalkConsumer.1
            @Override // java.lang.Runnable
            public void run() {
                BeanstalkConsumer.this.client = BeanstalkConsumer.this.m3getEndpoint().getConnection().newReadingClient(BeanstalkConsumer.this.useBlockIO);
            }
        };
        this.pollTask = new Callable<Exchange>() { // from class: org.apache.camel.component.beanstalk.BeanstalkConsumer.2
            final Integer noWait = 0;

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Exchange call() throws Exception {
                if (BeanstalkConsumer.this.client == null) {
                    throw new RuntimeCamelException("Beanstalk client not initialized");
                }
                try {
                    Job reserve = BeanstalkConsumer.this.client.reserve(this.noWait);
                    if (reserve == null) {
                        return null;
                    }
                    if (BeanstalkConsumer.LOG.isDebugEnabled()) {
                        BeanstalkConsumer.LOG.debug(String.format("Received job ID %d (data length %d)", Long.valueOf(reserve.getJobId()), Integer.valueOf(reserve.getData().length)));
                    }
                    Exchange createExchange = BeanstalkConsumer.this.createExchange(true);
                    createExchange.getIn().setHeader(Headers.JOB_ID, Long.valueOf(reserve.getJobId()));
                    createExchange.getIn().setBody(reserve.getData(), byte[].class);
                    Map statsJob = BeanstalkConsumer.this.client.statsJob(reserve.getJobId());
                    if (statsJob != null && !statsJob.isEmpty()) {
                        for (String str : BeanstalkConsumer.STATS_KEY_STR) {
                            if (statsJob.containsKey(str)) {
                                createExchange.getIn().setHeader(Headers.PREFIX + str, ((String) statsJob.get(str)).trim());
                            }
                        }
                        if (statsJob.containsKey("pri")) {
                            createExchange.getIn().setHeader(Headers.PRIORITY, Long.valueOf(Long.parseLong(((String) statsJob.get("pri")).trim())));
                        }
                        for (String str2 : BeanstalkConsumer.STATS_KEY_INT) {
                            if (statsJob.containsKey(str2)) {
                                createExchange.getIn().setHeader(Headers.PREFIX + str2, Integer.valueOf(Integer.parseInt(((String) statsJob.get(str2)).trim())));
                            }
                        }
                    }
                    if (BeanstalkConsumer.this.awaitJob) {
                        createExchange.adapt(ExtendedExchange.class).addOnCompletion(BeanstalkConsumer.this.sync);
                    } else {
                        BeanstalkConsumer.this.client.delete(reserve.getJobId());
                    }
                    return createExchange;
                } catch (BeanstalkException e) {
                    BeanstalkConsumer.this.getExceptionHandler().handleException("Beanstalk client error", e);
                    BeanstalkConsumer.this.resetClient();
                    return null;
                }
            }
        };
    }

    protected int poll() throws Exception {
        Exchange exchange;
        int i = 0;
        while (isPollAllowed() && (exchange = (Exchange) this.executor.submit(this.pollTask).get()) != null) {
            i++;
            getProcessor().process(exchange);
        }
        return i;
    }

    public BeanstalkCommand getOnFailure() {
        return this.onFailure;
    }

    public void setOnFailure(BeanstalkCommand beanstalkCommand) {
        this.onFailure = beanstalkCommand;
    }

    public boolean isUseBlockIO() {
        return this.useBlockIO;
    }

    public void setUseBlockIO(boolean z) {
        this.useBlockIO = z;
    }

    public boolean isAwaitJob() {
        return this.awaitJob;
    }

    public void setAwaitJob(boolean z) {
        this.awaitJob = z;
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public BeanstalkEndpoint m3getEndpoint() {
        return super.getEndpoint();
    }

    protected void doStart() throws Exception {
        this.executor = m3getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "Beanstalk-Consumer");
        this.executor.execute(this.initTask);
        this.sync = new Sync();
        super.doStart();
    }

    protected void doStop() throws Exception {
        super.doStop();
        if (this.executor != null) {
            m3getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(this.executor);
        }
    }

    protected void resetClient() {
        if (this.client != null) {
            this.client.close();
        }
        this.initTask.run();
    }
}
