/*
* Copyright 2014 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/** @module vertx-amqp-client-js/amqp_sender */
var utils = require('vertx-js/util/utils');
var AmqpMessage = require('vertx-amqp-client-js/amqp_message');
var WriteStream = require('vertx-js/write_stream');
var AmqpConnection = require('vertx-amqp-client-js/amqp_connection');
var Future = require('vertx-js/future');
var Promise = require('vertx-js/promise');
var io = Packages.io;
var JsonObject = io.vertx.core.json.JsonObject;
var JsonArray = io.vertx.core.json.JsonArray;
var JAmqpSender = Java.type('io.vertx.amqp.AmqpSender');
/**
AMQP Sender interface used to send messages.
@class
*/
var AmqpSender = function(j_val) {
var j_amqpSender = j_val;
var that = this;
WriteStream.call(this, j_val);
var __super_write = this.write;
var __super_end = this.end;
var __super_end = this.end;
var __super_writeQueueFull = this.writeQueueFull;
var __super_drainHandler = this.drainHandler;
var __super_exceptionHandler = this.exceptionHandler;
var __super_setWriteQueueMaxSize = this.setWriteQueueMaxSize;
var __super_send = this.send;
var __super_sendWithAck = this.sendWithAck;
var __super_close = this.close;
var __super_address = this.address;
var __super_connection = this.connection;
/**
Same as but with an <code>handler</code> called when the operation completes
@public
@param data {AmqpMessage}
@param handler {function}
*/
this.write = function(data, handler) {
var __args = arguments;
if (__args.length === 2 && typeof __args[0] === 'object' && __args[0]._jdel && typeof __args[1] === 'function') {
j_amqpSender["write(io.vertx.amqp.AmqpMessage,io.vertx.core.Handler)"](data._jdel, function(ar) {
if (ar.succeeded()) {
handler(null, null);
} else {
handler(null, ar.cause());
}
});
} else if (__args.length === 1 && typeof __args[0] === 'object' && __args[0]._jdel) {
var __prom = Promise.promise();
var __prom_completer_handler = function (result, cause) { if (cause === null) { __prom.complete(result); } else { __prom.fail(cause); } };
j_amqpSender["write(io.vertx.amqp.AmqpMessage,io.vertx.core.Handler)"](data._jdel, function(ar) {
if (ar.succeeded()) {
__prom_completer_handler(null, null);
} else {
__prom_completer_handler(null, ar.cause());
}
});
return __prom.future();
} else if (typeof __super_write != 'undefined') {
return __super_write.apply(this, __args);
}
else throw new TypeError('function invoked with invalid arguments');
};
/**
Same as but with an <code>handler</code> called when the operation completes
@public
@param data {AmqpMessage}
@param handler {function}
*/
this.end = function() {
var __args = arguments;
if (__args.length === 1 && typeof __args[0] === 'function') {
j_amqpSender["end(io.vertx.core.Handler)"](function(ar) {
if (ar.succeeded()) {
__args[0](null, null);
} else {
__args[0](null, ar.cause());
}
});
} else if (__args.length === 0) {
var __prom = Promise.promise();
var __prom_completer_handler = function (result, cause) { if (cause === null) { __prom.complete(result); } else { __prom.fail(cause); } };
j_amqpSender["end(io.vertx.core.Handler)"](function(ar) {
if (ar.succeeded()) {
__prom_completer_handler(null, null);
} else {
__prom_completer_handler(null, ar.cause());
}
});
return __prom.future();
} else if (__args.length === 2 && typeof __args[0] === 'object' && __args[0]._jdel && typeof __args[1] === 'function') {
j_amqpSender["end(io.vertx.amqp.AmqpMessage,io.vertx.core.Handler)"](__args[0]._jdel, function(ar) {
if (ar.succeeded()) {
__args[1](null, null);
} else {
__args[1](null, ar.cause());
}
});
} else if (__args.length === 1 && typeof __args[0] === 'object' && __args[0]._jdel) {
var __prom = Promise.promise();
var __prom_completer_handler = function (result, cause) { if (cause === null) { __prom.complete(result); } else { __prom.fail(cause); } };
j_amqpSender["end(io.vertx.amqp.AmqpMessage,io.vertx.core.Handler)"](__args[0]._jdel, function(ar) {
if (ar.succeeded()) {
__prom_completer_handler(null, null);
} else {
__prom_completer_handler(null, ar.cause());
}
});
return __prom.future();
} else if (typeof __super_end != 'undefined') {
return __super_end.apply(this, __args);
}
else throw new TypeError('function invoked with invalid arguments');
};
/**
This will return <code>true</code> if there are more bytes in the write queue than the value set using {@link AmqpSender#setWriteQueueMaxSize}
@public
@return {boolean} true if write queue is full
*/
this.writeQueueFull = function() {
var __args = arguments;
if (__args.length === 0) {
return j_amqpSender["writeQueueFull()"]() ;
} else if (typeof __super_writeQueueFull != 'undefined') {
return __super_writeQueueFull.apply(this, __args);
}
else throw new TypeError('function invoked with invalid arguments');
};
/**
Set a drain handler on the stream. If the write queue is full, then the handler will be called when the write
queue is ready to accept buffers again. See for an example of this being used.
<p/>
The stream implementation defines when the drain handler, for example it could be when the queue size has been
reduced to <code>maxSize / 2</code>.
@public
@param handler {function} the handler
@return {WriteStream} a reference to this, so the API can be used fluently
*/
this.drainHandler = function(handler) {
var __args = arguments;
if (__args.length === 1 && (typeof __args[0] === 'function' || __args[0] == null)) {
j_amqpSender["drainHandler(io.vertx.core.Handler)"](handler) ;
return that;
} else if (typeof __super_drainHandler != 'undefined') {
return __super_drainHandler.apply(this, __args);
}
else throw new TypeError('function invoked with invalid arguments');
};
/**
@public
@param handler {function}
@return {AmqpSender}
*/
this.exceptionHandler = function(handler) {
var __args = arguments;
if (__args.length === 1 && (typeof __args[0] === 'function' || __args[0] == null)) {
j_amqpSender["exceptionHandler(io.vertx.core.Handler)"](handler == null ? null : function(jVal) {
handler(utils.convReturnThrowable(jVal));
}) ;
return that;
} else if (typeof __super_exceptionHandler != 'undefined') {
return __super_exceptionHandler.apply(this, __args);
}
else throw new TypeError('function invoked with invalid arguments');
};
/**
@public
@param maxSize {number}
@return {AmqpSender}
*/
this.setWriteQueueMaxSize = function(maxSize) {
var __args = arguments;
if (__args.length === 1 && typeof __args[0] ==='number') {
j_amqpSender["setWriteQueueMaxSize(int)"](maxSize) ;
return that;
} else if (typeof __super_setWriteQueueMaxSize != 'undefined') {
return __super_setWriteQueueMaxSize.apply(this, __args);
}
else throw new TypeError('function invoked with invalid arguments');
};
/**
Sends an AMQP message. The destination the configured sender address or the address configured in the message.
@public
@param message {AmqpMessage} the message, must not be <code>null</code>
@return {AmqpSender} the current sender
*/
this.send = function(message) {
var __args = arguments;
if (__args.length === 1 && typeof __args[0] === 'object' && __args[0]._jdel) {
j_amqpSender["send(io.vertx.amqp.AmqpMessage)"](message._jdel) ;
return that;
} else if (typeof __super_send != 'undefined') {
return __super_send.apply(this, __args);
}
else throw new TypeError('function invoked with invalid arguments');
};
/**
Sends an AMQP message and waits for an acknowledgement. The acknowledgement handler is called with an
marked as failed if the message has been rejected or re-routed. If the message has been accepted,
the handler is called with a success.
@public
@param message {AmqpMessage} the message, must not be <code>null</code>
@param acknowledgementHandler {function} the acknowledgement handler, must not be <code>null</code>
@return {AmqpSender} the current sender
*/
this.sendWithAck = function(message, acknowledgementHandler) {
var __args = arguments;
if (__args.length === 2 && typeof __args[0] === 'object' && __args[0]._jdel && typeof __args[1] === 'function') {
j_amqpSender["sendWithAck(io.vertx.amqp.AmqpMessage,io.vertx.core.Handler)"](message._jdel, function(ar) {
if (ar.succeeded()) {
acknowledgementHandler(null, null);
} else {
acknowledgementHandler(null, ar.cause());
}
}) ;
return that;
} else if (__args.length === 1 && typeof __args[0] === 'object' && __args[0]._jdel) {
var __prom = Promise.promise();
var __prom_completer_handler = function (result, cause) { if (cause === null) { __prom.complete(result); } else { __prom.fail(cause); } };
j_amqpSender["sendWithAck(io.vertx.amqp.AmqpMessage,io.vertx.core.Handler)"](message._jdel, function(ar) {
if (ar.succeeded()) {
__prom_completer_handler(null, null);
} else {
__prom_completer_handler(null, ar.cause());
}
});
return __prom.future();
} else if (typeof __super_sendWithAck != 'undefined') {
return __super_sendWithAck.apply(this, __args);
}
else throw new TypeError('function invoked with invalid arguments');
};
/**
Closes the sender.
@public
@param handler {function} called when the sender has been closed, must not be <code>null</code>
*/
this.close = function(handler) {
var __args = arguments;
if (__args.length === 1 && typeof __args[0] === 'function') {
j_amqpSender["close(io.vertx.core.Handler)"](function(ar) {
if (ar.succeeded()) {
handler(null, null);
} else {
handler(null, ar.cause());
}
});
} else if (__args.length === 0) {
var __prom = Promise.promise();
var __prom_completer_handler = function (result, cause) { if (cause === null) { __prom.complete(result); } else { __prom.fail(cause); } };
j_amqpSender["close(io.vertx.core.Handler)"](function(ar) {
if (ar.succeeded()) {
__prom_completer_handler(null, null);
} else {
__prom_completer_handler(null, ar.cause());
}
});
return __prom.future();
} else if (typeof __super_close != 'undefined') {
return __super_close.apply(this, __args);
}
else throw new TypeError('function invoked with invalid arguments');
};
/**
@public
@return {string} the configured address.
*/
this.address = function() {
var __args = arguments;
if (__args.length === 0) {
return j_amqpSender["address()"]() ;
} else if (typeof __super_address != 'undefined') {
return __super_address.apply(this, __args);
}
else throw new TypeError('function invoked with invalid arguments');
};
/**
Gets the connection having created the sender. Cannot be <code>null</code>
@public
@return {AmqpConnection} the connection having created the sender.
*/
this.connection = function() {
var __args = arguments;
if (__args.length === 0) {
return utils.convReturnVertxGen(AmqpConnection, j_amqpSender["connection()"]()) ;
} else if (typeof __super_connection != 'undefined') {
return __super_connection.apply(this, __args);
}
else throw new TypeError('function invoked with invalid arguments');
};
// A reference to the underlying Java delegate
// NOTE! This is an internal API and must not be used in user code.
// If you rely on this property your code is likely to break if we change it / remove it without warning.
this._jdel = j_amqpSender;
};
AmqpSender._jclass = utils.getJavaClass("io.vertx.amqp.AmqpSender");
AmqpSender._jtype = {accept: function(obj) {
return AmqpSender._jclass.isInstance(obj._jdel);
},wrap: function(jdel) {
var obj = Object.create(AmqpSender.prototype, {});
AmqpSender.apply(obj, arguments);
return obj;
},
unwrap: function(obj) {
return obj._jdel;
}
};
AmqpSender._create = function(jdel) {var obj = Object.create(AmqpSender.prototype, {});
AmqpSender.apply(obj, arguments);
return obj;
}
module.exports = AmqpSender;