/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hedwig.client.netty;

import com.google.protobuf.ByteString;
import org.apache.hedwig.client.api.Publisher;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.handlers.PubSubCallback;
import org.apache.hedwig.client.netty.HChannelManager;
import org.apache.hedwig.client.netty.HedwigClientImpl;
import org.apache.hedwig.client.netty.VoidCallbackAdapter;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HedwigPublisher
implements Publisher {
    private static Logger logger = LoggerFactory.getLogger(HedwigPublisher.class);
    private final HChannelManager channelManager;

    protected HedwigPublisher(HedwigClientImpl client) {
        this.channelManager = client.getHChannelManager();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public PubSubProtocol.PublishResponse publish(ByteString topic, PubSubProtocol.Message msg) throws PubSubException.CouldNotConnectException, PubSubException.ServiceDownException {
        PubSubData pubSubData;
        if (logger.isDebugEnabled()) {
            logger.debug("Calling a sync publish for topic: {}, msg: {}.", (Object)topic.toStringUtf8(), (Object)msg);
        }
        PubSubData pubSubData2 = pubSubData = new PubSubData(topic, msg, null, PubSubProtocol.OperationType.PUBLISH, null, null, null);
        synchronized (pubSubData2) {
            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
            this.asyncPublishWithResponseImpl(topic, msg, pubSubCallback, null);
            try {
                while (!pubSubData.isDone) {
                    pubSubData.wait();
                }
            }
            catch (InterruptedException e) {
                throw new PubSubException.ServiceDownException("Interrupted Exception while waiting for async publish call");
            }
            if (!pubSubCallback.getIsCallSuccessful()) {
                PubSubException failureException = pubSubCallback.getFailureException();
                if (failureException == null) {
                    logger.error("Sync Publish operation failed but no PubSubException was passed!");
                    throw new PubSubException.ServiceDownException("Server ack response to publish request is not successful");
                }
                if (failureException instanceof PubSubException.CouldNotConnectException) {
                    throw (PubSubException.CouldNotConnectException)failureException;
                }
                if (failureException instanceof PubSubException.ServiceDownException) {
                    throw (PubSubException.ServiceDownException)failureException;
                }
                logger.error("Unexpected exception type when a sync publish operation failed: ", (Throwable)failureException);
                throw new PubSubException.ServiceDownException("Server ack response to publish request is not successful");
            }
            PubSubProtocol.ResponseBody respBody = pubSubCallback.getResponseBody();
            if (null == respBody) {
                return null;
            }
            return respBody.hasPublishResponse() ? respBody.getPublishResponse() : null;
        }
    }

    @Override
    public void asyncPublish(ByteString topic, PubSubProtocol.Message msg, Callback<Void> callback, Object context) {
        this.asyncPublishWithResponseImpl(topic, msg, new VoidCallbackAdapter<PubSubProtocol.ResponseBody>(callback), context);
    }

    @Override
    public void asyncPublishWithResponse(ByteString topic, PubSubProtocol.Message msg, Callback<PubSubProtocol.PublishResponse> callback, Object context) {
        this.asyncPublishWithResponseImpl(topic, msg, new PublishResponseCallbackAdapter(callback), context);
    }

    private void asyncPublishWithResponseImpl(ByteString topic, PubSubProtocol.Message msg, Callback<PubSubProtocol.ResponseBody> callback, Object context) {
        if (logger.isDebugEnabled()) {
            logger.debug("Calling an async publish for topic: {}, msg: {}.", (Object)topic.toStringUtf8(), (Object)msg);
        }
        PubSubData pubSubData = new PubSubData(topic, msg, null, PubSubProtocol.OperationType.PUBLISH, null, callback, context);
        this.channelManager.submitOp(pubSubData);
    }

    private static class PublishResponseCallbackAdapter
    implements Callback<PubSubProtocol.ResponseBody> {
        private final Callback<PubSubProtocol.PublishResponse> delegate;

        private PublishResponseCallbackAdapter(Callback<PubSubProtocol.PublishResponse> delegate) {
            this.delegate = delegate;
        }

        @Override
        public void operationFinished(Object ctx, PubSubProtocol.ResponseBody resultOfOperation) {
            if (null == resultOfOperation) {
                this.delegate.operationFinished(ctx, null);
            } else {
                this.delegate.operationFinished(ctx, resultOfOperation.getPublishResponse());
            }
        }

        @Override
        public void operationFailed(Object ctx, PubSubException exception) {
            this.delegate.operationFailed(ctx, exception);
        }
    }
}

