package org.apache.camel.component.etcd;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import mousio.client.promises.ResponsePromise;
import mousio.etcd4j.requests.EtcdKeyGetRequest;
import mousio.etcd4j.responses.EtcdKeysResponse;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/etcd/EtcdWatchConsumer.class */
public class EtcdWatchConsumer extends AbstractEtcdConsumer implements ResponsePromise.IsSimplePromiseResponseHandler<EtcdKeysResponse> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EtcdWatchConsumer.class);
    private final EtcdWatchEndpoint endpoint;
    private final EtcdConfiguration configuration;

    public EtcdWatchConsumer(EtcdWatchEndpoint etcdWatchEndpoint, Processor processor, EtcdConfiguration etcdConfiguration, EtcdNamespace etcdNamespace, String str) {
        super(etcdWatchEndpoint, processor, etcdConfiguration, etcdNamespace, str);
        this.endpoint = etcdWatchEndpoint;
        this.configuration = etcdConfiguration;
    }

    protected void doStart() throws Exception {
        super.doStart();
        watch();
    }

    @Override // org.apache.camel.component.etcd.AbstractEtcdConsumer
    protected void doStop() throws Exception {
        super.doStop();
    }

    public void onResponse(ResponsePromise<EtcdKeysResponse> responsePromise) {
        if (isRunAllowed()) {
            try {
                EtcdKeysResponse etcdKeysResponse = (EtcdKeysResponse) responsePromise.get();
                Exchange createExchange = this.endpoint.createExchange();
                createExchange.getIn().setHeader(EtcdConstants.ETCD_NAMESPACE, getNamespace());
                createExchange.getIn().setHeader(EtcdConstants.ETCD_PATH, etcdKeysResponse.node.key);
                createExchange.getIn().setBody(etcdKeysResponse);
                getProcessor().process(createExchange);
                watch();
            } catch (TimeoutException e) {
                LOGGER.debug("Timeout watching for {}", getPath());
                if (this.configuration.isSendEmptyExchangeOnTimeout()) {
                    Exchange createExchange2 = this.endpoint.createExchange();
                    try {
                        createExchange2.getIn().setHeader(EtcdConstants.ETCD_NAMESPACE, getNamespace());
                        createExchange2.getIn().setHeader(EtcdConstants.ETCD_TIMEOUT, true);
                        createExchange2.getIn().setHeader(EtcdConstants.ETCD_PATH, getPath());
                        createExchange2.getIn().setBody((Object) null);
                        getProcessor().process(createExchange2);
                    } catch (Exception e2) {
                        getExceptionHandler().handleException("Error processing exchange", createExchange2, e2);
                    }
                }
            } catch (Exception e3) {
                throw new IllegalArgumentException(e3);
            }
        }
    }

    private void watch() throws Exception {
        if (isRunAllowed()) {
            EtcdKeyGetRequest waitForChange = getClient().get(getPath()).waitForChange();
            if (this.configuration.isRecursive()) {
                waitForChange.recursive();
            }
            if (this.configuration.getTimeout() != null) {
                waitForChange.timeout(this.configuration.getTimeout().longValue(), TimeUnit.MILLISECONDS);
            }
            try {
                waitForChange.send().addListener(this);
            } catch (IOException e) {
                throw new IllegalArgumentException(e);
            }
        }
    }
}
