summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJaromil <jaromil@dyne.org>2013-03-05 03:07:33 (GMT)
committer Jaromil <jaromil@dyne.org>2013-03-05 03:07:33 (GMT)
commitfede1451017c1e481e9eb0ac64c5464a4f4a198b (patch)
treeeedc41e477176550fc79f815d4ad9b14b3cd1138
parent3bf56f3728d89265e05d254a0c564027ff26cf75 (diff)
updated to latest 0mq 2.2 in git (contains fixes to pgm)
-rw-r--r--src/0mq/Makefile.am3
-rw-r--r--src/0mq/decoder.cpp7
-rw-r--r--src/0mq/decoder.hpp9
-rw-r--r--src/0mq/devpoll.cpp4
-rw-r--r--src/0mq/dist.cpp20
-rw-r--r--src/0mq/err.cpp74
-rw-r--r--src/0mq/err.hpp4
-rw-r--r--src/0mq/ip.cpp6
-rw-r--r--src/0mq/options.cpp6
-rw-r--r--src/0mq/pgm_socket.cpp71
-rw-r--r--src/0mq/pgm_socket.hpp3
-rw-r--r--src/0mq/poll.cpp4
-rw-r--r--src/0mq/signaler.cpp8
-rw-r--r--src/0mq/socket_base.cpp20
-rw-r--r--src/0mq/tcp_connecter.cpp12
-rw-r--r--src/0mq/tcp_listener.cpp8
-rw-r--r--src/0mq/tcp_socket.cpp36
-rw-r--r--src/0mq/uuid.cpp45
-rw-r--r--src/0mq/uuid.hpp24
-rw-r--r--src/0mq/zmq.cpp39
-rw-r--r--src/0mq/zmq.hpp18
-rw-r--r--src/0mq/zmq_connecter.cpp6
-rw-r--r--src/0mq/zmq_engine.cpp34
-rw-r--r--src/0mq/zmq_engine.hpp3
24 files changed, 367 insertions, 97 deletions
diff --git a/src/0mq/Makefile.am b/src/0mq/Makefile.am
index 1b8b5fa..62288ca 100644
--- a/src/0mq/Makefile.am
+++ b/src/0mq/Makefile.am
@@ -1,4 +1,5 @@
-INCLUDES = -I$(top_builddir)/src -I$(top_builddir)/src/include
+AM_CPPFLAGS = -I$(top_builddir)/src -I$(top_builddir)/src/include
+AM_CFLAGS = -DZMQ_HAVE_LINUX
noinst_LTLIBRARIES = libzmq.la
diff --git a/src/0mq/decoder.cpp b/src/0mq/decoder.cpp
index 84ecd92..643e64d 100644
--- a/src/0mq/decoder.cpp
+++ b/src/0mq/decoder.cpp
@@ -46,6 +46,11 @@ void zmq::decoder_t::set_inout (i_inout *destination_)
destination = destination_;
}
+bool zmq::decoder_t::stalled () const
+{
+ return next == &decoder_t::message_ready;
+}
+
bool zmq::decoder_t::one_byte_size_ready ()
{
// First byte of size is read. If it is 0xff read 8-byte size.
@@ -109,7 +114,7 @@ bool zmq::decoder_t::eight_byte_size_ready ()
bool zmq::decoder_t::flags_ready ()
{
// Store the flags from the wire into the message structure.
- in_progress.flags = tmpbuf [0] | ~ZMQ_MSG_MASK;
+ in_progress.flags = (tmpbuf [0] & ZMQ_MSG_MORE) | ~ZMQ_MSG_MASK;
next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
&decoder_t::message_ready);
diff --git a/src/0mq/decoder.hpp b/src/0mq/decoder.hpp
index f554e46..5a54003 100644
--- a/src/0mq/decoder.hpp
+++ b/src/0mq/decoder.hpp
@@ -49,9 +49,9 @@ namespace zmq
public:
inline decoder_base_t (size_t bufsize_) :
+ next (NULL),
read_pos (NULL),
to_read (0),
- next (NULL),
bufsize (bufsize_)
{
buf = (unsigned char*) malloc (bufsize_);
@@ -162,11 +162,12 @@ namespace zmq
next = NULL;
}
+ step_t next;
+
private:
unsigned char *read_pos;
size_t to_read;
- step_t next;
size_t bufsize;
unsigned char *buf;
@@ -186,6 +187,10 @@ namespace zmq
void set_inout (struct i_inout *destination_);
+ // Returns true if there is a decoded message
+ // waiting to be delivered to the message consumer.
+ bool stalled () const;
+
private:
bool one_byte_size_ready ();
diff --git a/src/0mq/devpoll.cpp b/src/0mq/devpoll.cpp
index 25763c6..1b9f265 100644
--- a/src/0mq/devpoll.cpp
+++ b/src/0mq/devpoll.cpp
@@ -70,7 +70,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_,
}
}
- assert (!fd_table [fd_].valid);
+ zmq_assert (!fd_table [fd_].valid);
fd_table [fd_].events = 0;
fd_table [fd_].reactor = reactor_;
@@ -88,7 +88,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_,
void zmq::devpoll_t::rm_fd (handle_t handle_)
{
- assert (fd_table [handle_].valid);
+ zmq_assert (fd_table [handle_].valid);
devpoll_ctl (handle_, POLLREMOVE);
fd_table [handle_].valid = false;
diff --git a/src/0mq/dist.cpp b/src/0mq/dist.cpp
index d74e69f..3f2ca2f 100644
--- a/src/0mq/dist.cpp
+++ b/src/0mq/dist.cpp
@@ -167,14 +167,32 @@ void zmq::dist_t::distribute (zmq_msg_t *msg_, int flags_)
msg_->flags |= ZMQ_MSG_SHARED;
}
+ int write_fail_cnt = 0;
+
// Push the message to all destinations.
for (pipes_t::size_type i = 0; i < active;) {
if (!write (pipes [i], msg_))
- content->refcnt.sub (1);
+ write_fail_cnt++;
else
i++;
}
+ // One or more writes have failed.
+ // Take this number off the reference counter.
+ if (unlikely (write_fail_cnt > 0)) {
+ // No one is referencing the message.
+ // We can release its content now.
+ if (!content->refcnt.sub (write_fail_cnt)) {
+ // We used "placement new" operator to initialize the reference
+ // counter so we call its destructor now.
+ content->refcnt.~atomic_counter_t ();
+
+ if (content->ffn)
+ content->ffn (content->data, content->hint);
+ free (content);
+ }
+ }
+
// Detach the original message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
diff --git a/src/0mq/err.cpp b/src/0mq/err.cpp
index ddc08ee..5d790eb 100644
--- a/src/0mq/err.cpp
+++ b/src/0mq/err.cpp
@@ -213,36 +213,90 @@ void zmq::wsa_error_to_errno ()
{
int errcode = WSAGetLastError ();
switch (errcode) {
- case WSAEINPROGRESS:
- errno = EAGAIN;
- return;
+// 10009 - File handle is not valid.
case WSAEBADF:
errno = EBADF;
return;
+// 10013 - Permission denied.
+ case WSAEACCES:
+ errno = EACCES;
+ return;
+// 10014 - Bad address.
+ case WSAEFAULT:
+ errno = EFAULT;
+ return;
+// 10022 - Invalid argument.
case WSAEINVAL:
errno = EINVAL;
return;
+// 10024 - Too many open files.
case WSAEMFILE:
errno = EMFILE;
return;
- case WSAEFAULT:
- errno = EFAULT;
+// 10036 - Operation now in progress.
+ case WSAEINPROGRESS:
+ errno = EAGAIN;
+ return;
+// 10040 - Message too long.
+ case WSAEMSGSIZE:
+ errno = EMSGSIZE;
return;
+// 10043 - Protocol not supported.
case WSAEPROTONOSUPPORT:
errno = EPROTONOSUPPORT;
return;
- case WSAENOBUFS:
- errno = ENOBUFS;
- return;
- case WSAENETDOWN:
- errno = ENETDOWN;
+// 10047 - Address family not supported by protocol family.
+ case WSAEAFNOSUPPORT:
+ errno = EAFNOSUPPORT;
return;
+// 10048 - Address already in use.
case WSAEADDRINUSE:
errno = EADDRINUSE;
return;
+// 10049 - Cannot assign requested address.
case WSAEADDRNOTAVAIL:
errno = EADDRNOTAVAIL;
return;
+// 10050 - Network is down.
+ case WSAENETDOWN:
+ errno = ENETDOWN;
+ return;
+// 10051 - Network is unreachable.
+ case WSAENETUNREACH:
+ errno = ENETUNREACH;
+ return;
+// 10052 - Network dropped connection on reset.
+ case WSAENETRESET:
+ errno = ENETRESET;
+ return;
+// 10053 - Software caused connection abort.
+ case WSAECONNABORTED:
+ errno = ECONNABORTED;
+ return;
+// 10054 - Connection reset by peer.
+ case WSAECONNRESET:
+ errno = ECONNRESET;
+ return;
+// 10055 - No buffer space available.
+ case WSAENOBUFS:
+ errno = ENOBUFS;
+ return;
+// 10057 - Socket is not connected.
+ case WSAENOTCONN:
+ errno = ENOTCONN;
+ return;
+// 10060 - Connection timed out.
+ case WSAETIMEDOUT:
+ errno = ETIMEDOUT;
+ return;
+// 10061 - Connection refused.
+ case WSAECONNREFUSED:
+ errno = ECONNREFUSED;
+ return;
+// 10065 - No route to host.
+ case WSAEHOSTUNREACH:
+ errno = EHOSTUNREACH;
+ return;
default:
wsa_assert (false);
}
diff --git a/src/0mq/err.hpp b/src/0mq/err.hpp
index 7c7a9d8..d7ac793 100644
--- a/src/0mq/err.hpp
+++ b/src/0mq/err.hpp
@@ -103,7 +103,7 @@ namespace zmq
__FILE__, __LINE__);\
zmq::zmq_abort (#x);\
}\
- } while (false)
+ } while (false)
// Provides convenient way to check for errno-style errors.
#define errno_assert(x) \
@@ -146,5 +146,3 @@ namespace zmq
} while (false)
#endif
-
-
diff --git a/src/0mq/ip.cpp b/src/0mq/ip.cpp
index 7ef698b..5b9a247 100644
--- a/src/0mq/ip.cpp
+++ b/src/0mq/ip.cpp
@@ -204,6 +204,12 @@ int zmq::open_socket (int domain_, int type_, int protocol_)
errno_assert (rc != -1);
#endif
+ // On Windows, preventing sockets to be inherited by child processes.
+#if defined ZMQ_HAVE_WINDOWS && defined HANDLE_FLAG_INHERIT
+ BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
+ win_assert (brc);
+#endif
+
return s;
}
diff --git a/src/0mq/options.cpp b/src/0mq/options.cpp
index fc6bd06..dbfb174 100644
--- a/src/0mq/options.cpp
+++ b/src/0mq/options.cpp
@@ -71,12 +71,8 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
// Check that SWAP directory (.) is writable
struct stat stat_buf;
-// #if (ZMQ_HAVE_ANDROID | ZMQ_HAVE_LINUX)
-#ifdef ZMQ_HAVE_LINUX
if (stat (".", &stat_buf) || ((stat_buf.st_mode & S_IWUSR) == 0)) {
-#else
- if (stat (".", &stat_buf) || ((stat_buf.st_mode & S_IWRITE) == 0)) {
-#endif
+// if (stat (".", &stat_buf) || ((stat_buf.st_mode & S_IWRITE) == 0)) {
errno = EACCES;
return -1;
}
diff --git a/src/0mq/pgm_socket.cpp b/src/0mq/pgm_socket.cpp
index 29ff3e6..381f96d 100644
--- a/src/0mq/pgm_socket.cpp
+++ b/src/0mq/pgm_socket.cpp
@@ -57,16 +57,14 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
{
}
-// Create, bind and connect PGM socket.
+// Resolve PGM socket address.
// network_ of the form <interface & multicast group decls>:<IP port>
// e.g. eth0;239.192.0.1:7500
// link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000
// ;[fe80::1%en0]:7500
-int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
+int zmq::pgm_socket_t::init_address (const char *network_,
+ struct pgm_addrinfo_t **addr, uint16_t *port_number)
{
- // Can not open transport before destroying old one.
- zmq_assert (sock == NULL);
-
// Parse port number, start from end for IPv6
const char *port_delim = strrchr (network_, ':');
if (!port_delim) {
@@ -74,7 +72,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
return -1;
}
- uint16_t port_number = atoi (port_delim + 1);
+ *port_number = atoi (port_delim + 1);
char network [256];
if (port_delim - network_ >= (int) sizeof (network) - 1) {
@@ -83,14 +81,46 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
}
memset (network, '\0', sizeof (network));
memcpy (network, network_, port_delim - network_);
-
- // Validate socket options
+
+ pgm_error_t *pgm_error = NULL;
+ struct pgm_addrinfo_t hints;
+ memset (&hints, 0, sizeof (hints));
+ hints.ai_family = AF_UNSPEC;
+ if (!pgm_getaddrinfo (network, NULL, addr, &pgm_error)) {
+
+ // Invalid parameters don't set pgm_error_t.
+ zmq_assert (pgm_error != NULL);
+ if (pgm_error->domain == PGM_ERROR_DOMAIN_IF &&
+
+ // NB: cannot catch EAI_BADFLAGS.
+ ( pgm_error->code != PGM_ERROR_SERVICE &&
+ pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) {
+
+ // User, host, or network configuration or transient error.
+ pgm_error_free (pgm_error);
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Fatal OpenPGM internal error.
+ zmq_assert (false);
+ }
+ return 0;
+}
+
+// Create, bind and connect PGM socket.
+int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
+{
+ // Can not open transport before destroying old one.
+ zmq_assert (sock == NULL);
+
+ // Validate socket options
// Data rate is in [B/s]. options.rate is in [kb/s].
if (options.rate <= 0) {
errno = EINVAL;
return -1;
}
- // Recovery interval [s] or [ms] - based on the user's call
+ // Recovery interval [s] or [ms] - based on the user's call
if ((options.recovery_ivl <= 0) && (options.recovery_ivl_msec <= 0)) {
errno = EINVAL;
return -1;
@@ -101,27 +131,14 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
nbytes_processed = 0;
pgm_msgv_processed = 0;
- pgm_error_t *pgm_error = NULL;
- struct pgm_addrinfo_t hints, *res = NULL;
+ uint16_t port_number;
+ struct pgm_addrinfo_t *res = NULL;
sa_family_t sa_family;
- memset (&hints, 0, sizeof (hints));
- hints.ai_family = AF_UNSPEC;
- if (!pgm_getaddrinfo (network, NULL, &res, &pgm_error)) {
-
- // Invalid parameters don't set pgm_error_t.
- zmq_assert (pgm_error != NULL);
- if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && (
-
- // NB: cannot catch EAI_BADFLAGS.
- pgm_error->code != PGM_ERROR_SERVICE &&
- pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT))
-
- // User, host, or network configuration or transient error.
- goto err_abort;
+ pgm_error_t *pgm_error = NULL;
- // Fatal OpenPGM internal error.
- zmq_assert (false);
+ if (init_address(network_, &res, &port_number) < 0) {
+ goto err_abort;
}
zmq_assert (res != NULL);
diff --git a/src/0mq/pgm_socket.hpp b/src/0mq/pgm_socket.hpp
index 5ba8387..dedbb48 100644
--- a/src/0mq/pgm_socket.hpp
+++ b/src/0mq/pgm_socket.hpp
@@ -55,6 +55,9 @@ namespace zmq
// Initialize PGM network structures (GSI, GSRs).
int init (bool udp_encapsulation_, const char *network_);
+
+ // Resolve PGM socket address.
+ static int init_address(const char *network_, struct pgm_addrinfo_t **addr, uint16_t *port_number);
// Get receiver fds and store them into user allocated memory.
void get_receiver_fds (fd_t *receive_fd_, fd_t *waiting_pipe_fd_);
diff --git a/src/0mq/poll.cpp b/src/0mq/poll.cpp
index 6a84d2e..da391ef 100644
--- a/src/0mq/poll.cpp
+++ b/src/0mq/poll.cpp
@@ -61,7 +61,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
pollfd pfd = {fd_, 0, 0};
pollset.push_back (pfd);
- assert (fd_table [fd_].index == retired_fd);
+ zmq_assert (fd_table [fd_].index == retired_fd);
fd_table [fd_].index = pollset.size() - 1;
fd_table [fd_].events = events_;
@@ -75,7 +75,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
void zmq::poll_t::rm_fd (handle_t handle_)
{
fd_t index = fd_table [handle_].index;
- assert (index != retired_fd);
+ zmq_assert (index != retired_fd);
// Mark the fd as unused.
pollset [index].fd = retired_fd;
diff --git a/src/0mq/signaler.cpp b/src/0mq/signaler.cpp
index 7b66ea7..4a2242f 100644
--- a/src/0mq/signaler.cpp
+++ b/src/0mq/signaler.cpp
@@ -249,6 +249,10 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Create the writer socket.
*w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0);
wsa_assert (*w_ != INVALID_SOCKET);
+
+ // On Windows, preventing sockets to be inherited by child processes.
+ BOOL brc = SetHandleInformation ((HANDLE) *w_, HANDLE_FLAG_INHERIT, 0);
+ win_assert (brc);
// Set TCP_NODELAY on writer socket.
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
@@ -263,6 +267,10 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
*r_ = accept (listener, NULL, NULL);
wsa_assert (*r_ != INVALID_SOCKET);
+ // On Windows, preventing sockets to be inherited by child processes.
+ brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
+ win_assert (brc);
+
// We don't need the listening socket anymore. Close it.
rc = closesocket (listener);
wsa_assert (rc != SOCKET_ERROR);
diff --git a/src/0mq/socket_base.cpp b/src/0mq/socket_base.cpp
index cd4152c..bd1009d 100644
--- a/src/0mq/socket_base.cpp
+++ b/src/0mq/socket_base.cpp
@@ -48,6 +48,9 @@
#include "platform.hpp"
#include "likely.hpp"
#include "uuid.hpp"
+#ifdef ZMQ_HAVE_OPENPGM
+#include "pgm_socket.hpp"
+#endif
#include "pair.hpp"
#include "pub.hpp"
@@ -133,8 +136,6 @@ zmq::socket_base_t::~socket_base_t ()
zmq_assert (sessions.empty ());
sessions_sync.unlock ();
- // Mark the socket as dead.
- tag = 0xdeadbeef;
}
zmq::mailbox_t *zmq::socket_base_t::get_mailbox ()
@@ -440,6 +441,18 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
+#ifdef ZMQ_HAVE_OPENPGM
+ if (protocol == "pgm" || protocol == "epgm") {
+ struct pgm_addrinfo_t *res = NULL;
+ uint16_t port_number = 0;
+ int rc = pgm_socket_t::init_address(address.c_str(), &res, &port_number);
+ if (res != NULL)
+ pgm_freeaddrinfo (res);
+ if (rc != 0 || port_number == 0)
+ return -1;
+ }
+#endif
+
// Create session.
connect_session_t *session = new (std::nothrow) connect_session_t (
io_thread, this, options, protocol.c_str (), address.c_str ());
@@ -642,6 +655,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::close ()
{
+ // Mark the socket as dead.
+ tag = 0xdeadbeef;
+
// Transfer the ownership of the socket from this application thread
// to the reaper thread which will take care of the rest of shutdown
// process.
diff --git a/src/0mq/tcp_connecter.cpp b/src/0mq/tcp_connecter.cpp
index 6bc1b2d..22cb92a 100644
--- a/src/0mq/tcp_connecter.cpp
+++ b/src/0mq/tcp_connecter.cpp
@@ -85,7 +85,7 @@ int zmq::tcp_connecter_t::open ()
// Asynchronous connect was launched.
if (rc == SOCKET_ERROR && (WSAGetLastError () == WSAEINPROGRESS ||
WSAGetLastError () == WSAEWOULDBLOCK)) {
- errno = EAGAIN;
+ errno = EINPROGRESS;
return -1;
}
@@ -216,11 +216,9 @@ int zmq::tcp_connecter_t::open ()
if (rc == 0)
return 0;
- // Asynchronous connect was launched.
- if (rc == -1 && errno == EINPROGRESS) {
- errno = EAGAIN;
+ // Connection still in progress.
+ if (errno == EINPROGRESS)
return -1;
- }
// Error occured.
int err = errno;
@@ -252,6 +250,10 @@ int zmq::tcp_connecter_t::open ()
if (rc == 0)
return 0;
+ // Connection still in progress.
+ if (errno == EINPROGRESS)
+ return -1;
+
// Error occured.
int err = errno;
close ();
diff --git a/src/0mq/tcp_listener.cpp b/src/0mq/tcp_listener.cpp
index 4bfaa85..4016241 100644
--- a/src/0mq/tcp_listener.cpp
+++ b/src/0mq/tcp_listener.cpp
@@ -65,6 +65,10 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_,
return -1;
}
+ // On Windows, preventing sockets to be inherited by child processes.
+ BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
+ win_assert (brc);
+
// Allow reusing of the address.
int flag = 1;
rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
@@ -120,6 +124,10 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
zmq_assert (sock != INVALID_SOCKET);
+ // On Windows, preventing sockets to be inherited by child processes.
+ BOOL brc = SetHandleInformation ((HANDLE) sock, HANDLE_FLAG_INHERIT, 0);
+ win_assert (brc);
+
// Set to non-blocking mode.
unsigned long argp = 1;
int rc = ioctlsocket (sock, FIONBIO, &argp);
diff --git a/src/0mq/tcp_socket.cpp b/src/0mq/tcp_socket.cpp
index 775a7a6..e1d7941 100644
--- a/src/0mq/tcp_socket.cpp
+++ b/src/0mq/tcp_socket.cpp
@@ -79,7 +79,7 @@ int zmq::tcp_socket_t::write (const void *data, int size)
// we'll get an error (this may happen during the speculative write).
if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
return 0;
-
+
// Signalise peer failure.
if (nbytes == -1 && (
WSAGetLastError () == WSAENETDOWN ||
@@ -119,7 +119,7 @@ int zmq::tcp_socket_t::read (void *data, int size)
// Orderly shutdown by the other peer.
if (nbytes == 0)
- return -1;
+ return -1;
return (size_t) nbytes;
}
@@ -147,7 +147,7 @@ zmq::tcp_socket_t::~tcp_socket_t ()
int zmq::tcp_socket_t::open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_)
{
- assert (s == retired_fd);
+ zmq_assert (s == retired_fd);
s = fd_;
if (sndbuf_) {
@@ -197,10 +197,20 @@ int zmq::tcp_socket_t::write (const void *data, int size)
return 0;
// Signalise peer failure.
- if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE))
+ if (nbytes == -1) {
+ errno_assert (errno != EACCES
+ && errno != EBADF
+ && errno != EDESTADDRREQ
+ && errno != EFAULT
+ && errno != EINVAL
+ && errno != EISCONN
+ && errno != EMSGSIZE
+ && errno != ENOMEM
+ && errno != ENOTSOCK
+ && errno != EOPNOTSUPP);
return -1;
+ }
- errno_assert (nbytes != -1);
return (size_t) nbytes;
}
@@ -218,15 +228,14 @@ int zmq::tcp_socket_t::read (void *data, int size)
return 0;
// Signal peer failure.
- if (nbytes == -1
- && (errno == ECONNRESET
- || errno == ECONNREFUSED
- || errno == ETIMEDOUT
- || errno == EHOSTUNREACH
- || errno == ENOTCONN))
+ if (nbytes == -1) {
+ errno_assert (errno != EBADF
+ && errno != EFAULT
+ && errno != EINVAL
+ && errno != ENOMEM
+ && errno != ENOTSOCK);
return -1;
-
- errno_assert (nbytes != -1);
+ }
// Orderly shutdown by the other peer.
if (nbytes == 0)
@@ -236,4 +245,3 @@ int zmq::tcp_socket_t::read (void *data, int size)
}
#endif
-
diff --git a/src/0mq/uuid.cpp b/src/0mq/uuid.cpp
index 35d854f..c1466c8 100644
--- a/src/0mq/uuid.cpp
+++ b/src/0mq/uuid.cpp
@@ -39,10 +39,19 @@ zmq::uuid_t::~uuid_t ()
RpcStringFree (&string_buf);
}
+#ifdef UNICODE
+const short *zmq::uuid_t::to_string ()
+{
+ return (short*) string_buf;
+}
+#else
const char *zmq::uuid_t::to_string ()
{
return (char*) string_buf;
}
+#endif
+
+
#elif defined ZMQ_HAVE_FREEBSD || defined ZMQ_HAVE_NETBSD || (defined ZMQ_HAVE_HPUX && defined HAVE_LIBDCEKT)
@@ -184,7 +193,7 @@ const unsigned char *zmq::uuid_t::to_blob ()
return blob_buf;
}
-unsigned char zmq::uuid_t::convert_byte (const char *hexa_)
+unsigned char zmq::uuid_t::convert_byte (char *hexa_)
{
unsigned char byte;
@@ -201,7 +210,12 @@ unsigned char zmq::uuid_t::convert_byte (const char *hexa_)
byte *= 16;
+#ifdef UNICODE
+ hexa_++; // since hexa_ really refers to a short... lets skip one.
+#endif
hexa_++;
+
+
if (*hexa_ >= '0' && *hexa_ <= '9')
byte += *hexa_ - '0';
else if (*hexa_ >= 'A' && *hexa_ <= 'F')
@@ -216,9 +230,31 @@ unsigned char zmq::uuid_t::convert_byte (const char *hexa_)
void zmq::uuid_t::create_blob ()
{
- const char *buf = (const char*) string_buf;
+ char *buf = reinterpret_cast<char*>(string_buf);
+
+#ifdef UNICODE // we need to jump twice as far.
+ blob_buf [0] = convert_byte (buf + 0);
+ blob_buf [1] = convert_byte (buf + 4);
+ blob_buf [2] = convert_byte (buf + 8);
+ blob_buf [3] = convert_byte (buf + 12);
+
+ blob_buf [4] = convert_byte (buf + 18);
+ blob_buf [5] = convert_byte (buf + 22);
+
+ blob_buf [6] = convert_byte (buf + 28);
+ blob_buf [7] = convert_byte (buf + 32);
- blob_buf [0] = convert_byte (buf + 0);
+ blob_buf [8] = convert_byte (buf + 38);
+ blob_buf [9] = convert_byte (buf + 42);
+
+ blob_buf [10] = convert_byte (buf + 48);
+ blob_buf [11] = convert_byte (buf + 52);
+ blob_buf [12] = convert_byte (buf + 56);
+ blob_buf [13] = convert_byte (buf + 60);
+ blob_buf [14] = convert_byte (buf + 64);
+ blob_buf [15] = convert_byte (buf + 68);
+#else
+ blob_buf [0] = convert_byte (buf + 0);
blob_buf [1] = convert_byte (buf + 2);
blob_buf [2] = convert_byte (buf + 4);
blob_buf [3] = convert_byte (buf + 6);
@@ -238,4 +274,7 @@ void zmq::uuid_t::create_blob ()
blob_buf [13] = convert_byte (buf + 30);
blob_buf [14] = convert_byte (buf + 32);
blob_buf [15] = convert_byte (buf + 34);
+#endif
+
+ buf = NULL; delete buf;
}
diff --git a/src/0mq/uuid.hpp b/src/0mq/uuid.hpp
index 525dbcb..7014946 100644
--- a/src/0mq/uuid.hpp
+++ b/src/0mq/uuid.hpp
@@ -63,7 +63,13 @@ namespace zmq
// Returns a pointer to buffer containing the textual
// representation of the UUID. The callee is reponsible to
// free the allocated memory.
- const char *to_string ();
+
+ #ifdef UNICODE
+ const short *to_string ();
+ #else
+ const char *to_string ();
+ #endif
+
// The length of binary representation of UUID.
enum { uuid_blob_len = 16 };
@@ -73,7 +79,7 @@ namespace zmq
private:
// Converts one byte from hexa representation to binary.
- unsigned char convert_byte (const char *hexa_);
+ unsigned char convert_byte (char *hexa_);
// Converts string representation of UUID into standardised BLOB.
// The function is endianness agnostic.
@@ -83,8 +89,20 @@ namespace zmq
#ifdef ZMQ_HAVE_MINGW32
typedef unsigned char* RPC_CSTR;
#endif
+
+
::UUID uuid;
- RPC_CSTR string_buf;
+
+#ifdef UNICODE
+ RPC_WSTR string_buf;
+#else
+ RPC_CSTR string_buf;
+#endif
+
+
+
+
+
#elif defined ZMQ_HAVE_FREEBSD || defined ZMQ_HAVE_NETBSD
::uuid_t uuid;
char *string_buf;
diff --git a/src/0mq/zmq.cpp b/src/0mq/zmq.cpp
index d6363b9..39b0783 100644
--- a/src/0mq/zmq.cpp
+++ b/src/0mq/zmq.cpp
@@ -218,6 +218,18 @@ void *zmq_msg_data (zmq_msg_t *msg_)
return ((zmq::msg_content_t*) msg_->content)->data;
}
+const void *zmq_msg_cdata (const zmq_msg_t *msg_)
+{
+ zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff);
+
+ if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
+ return msg_->vsm_data;
+ if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
+ return NULL;
+
+ return ((zmq::msg_content_t*) msg_->content)->data;
+}
+
size_t zmq_msg_size (zmq_msg_t *msg_)
{
zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff);
@@ -230,6 +242,18 @@ size_t zmq_msg_size (zmq_msg_t *msg_)
return ((zmq::msg_content_t*) msg_->content)->size;
}
+size_t zmq_msg_csize (const zmq_msg_t *msg_)
+{
+ zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff);
+
+ if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
+ return msg_->vsm_size;
+ if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
+ return 0;
+
+ return ((zmq::msg_content_t*) msg_->content)->size;
+}
+
void *zmq_init (int io_threads_)
{
if (io_threads_ < 0) {
@@ -292,17 +316,20 @@ int zmq_term (void *ctx_)
int rc = ((zmq::ctx_t*) ctx_)->terminate ();
int en = errno;
+ // Shut down only if termination was not interrupted by a signal.
+ if (!rc || en != EINTR) {
#ifdef ZMQ_HAVE_WINDOWS
- // On Windows, uninitialise socket layer.
- rc = WSACleanup ();
- wsa_assert (rc != SOCKET_ERROR);
+ // On Windows, uninitialise socket layer.
+ rc = WSACleanup ();
+ wsa_assert (rc != SOCKET_ERROR);
#endif
#if defined ZMQ_HAVE_OPENPGM
- // Shut down the OpenPGM library.
- if (pgm_shutdown () != TRUE)
- zmq_assert (false);
+ // Shut down the OpenPGM library.
+ if (pgm_shutdown () != TRUE)
+ zmq_assert (false);
#endif
+ }
errno = en;
return rc;
diff --git a/src/0mq/zmq.hpp b/src/0mq/zmq.hpp
index bfba0f6..d137d10 100644
--- a/src/0mq/zmq.hpp
+++ b/src/0mq/zmq.hpp
@@ -23,6 +23,7 @@
#include "zmq.h"
+#include <algorithm>
#include <cassert>
#include <cstring>
#include <exception>
@@ -42,7 +43,6 @@
namespace zmq
{
-
typedef zmq_free_fn free_fn;
typedef zmq_pollitem_t pollitem_t;
@@ -118,7 +118,8 @@ namespace zmq
inline ~message_t ()
{
int rc = zmq_msg_close (this);
- assert (rc == 0);
+ if (rc != 0)
+ assert(0);
}
inline void rebuild ()
@@ -171,11 +172,21 @@ namespace zmq
return zmq_msg_data (this);
}
+ inline const void *cdata () const
+ {
+ return zmq_msg_cdata (this);
+ }
+
inline size_t size ()
{
return zmq_msg_size (this);
}
+ inline size_t csize () const
+ {
+ return zmq_msg_csize (this);
+ }
+
private:
// Disable implicit message copying, so that users won't use shared
@@ -214,7 +225,8 @@ namespace zmq
if (ptr == NULL)
return;
int rc = zmq_term (ptr);
- assert (rc == 0);
+ if (rc != 0)
+ assert(0);
}
// Be careful with this, it's probably only useful for
diff --git a/src/0mq/zmq_connecter.cpp b/src/0mq/zmq_connecter.cpp
index fb77cdc..3d784ed 100644
--- a/src/0mq/zmq_connecter.cpp
+++ b/src/0mq/zmq_connecter.cpp
@@ -121,8 +121,8 @@ void zmq::zmq_connecter_t::start_connecting ()
return;
}
- // Connection establishment may be dealyed. Poll for its completion.
- else if (rc == -1 && errno == EAGAIN) {
+ // Connection establishment may be delayed. Poll for its completion.
+ else if (rc == -1 && errno == EINPROGRESS) {
handle = add_fd (tcp_connecter.get_fd ());
handle_valid = true;
set_pollout (handle);
@@ -130,6 +130,8 @@ void zmq::zmq_connecter_t::start_connecting ()
}
// Handle any other error condition by eventual reconnect.
+ if (tcp_connecter.get_fd() != retired_fd)
+ tcp_connecter.close ();
wait = true;
add_reconnect_timer();
}
diff --git a/src/0mq/zmq_engine.cpp b/src/0mq/zmq_engine.cpp
index 57a8baf..b43607f 100644
--- a/src/0mq/zmq_engine.cpp
+++ b/src/0mq/zmq_engine.cpp
@@ -43,7 +43,8 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) :
inout (NULL),
ephemeral_inout (NULL),
options (options_),
- plugged (false)
+ plugged (false),
+ input_error (false)
{
// Initialise the underlying socket.
int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf);
@@ -152,8 +153,18 @@ void zmq::zmq_engine_t::in_event ()
inout->flush ();
}
- if (inout && disconnection)
- error ();
+ // Input error has occurred. If the last decoded
+ // message has already been accepted, we terminate
+ // the engine immediately. Otherwise, we stop
+ // waiting for input events and postpone the termination
+ // until after the session has accepted the message.
+ if (inout && disconnection) {
+ input_error = true;
+ if (decoder.stalled ())
+ reset_pollin (handle);
+ else
+ error ();
+ }
}
void zmq::zmq_engine_t::out_event ()
@@ -182,9 +193,11 @@ void zmq::zmq_engine_t::out_event ()
// possible to the socket.
int nbytes = tcp_socket.write (outpos, outsize);
- // Handle problems with the connection.
+ // IO error has occurred. We stop waiting for output events.
+ // The engine is not terminated until we detect input error;
+ // this is necessary to prevent losing incomming messages.
if (nbytes == -1) {
- error ();
+ reset_pollout (handle);
return;
}
@@ -205,6 +218,17 @@ void zmq::zmq_engine_t::activate_out ()
void zmq::zmq_engine_t::activate_in ()
{
+ if (input_error) {
+ // There was an input error but the engine could not
+ // be terminated (due to the stalled decoder).
+ // Flush the pending message and terminate the engine now.
+ decoder.process_buffer (inpos, 0);
+ zmq_assert (!decoder.stalled ());
+ inout->flush ();
+ error ();
+ return;
+ }
+
set_pollin (handle);
// Speculative read.
diff --git a/src/0mq/zmq_engine.hpp b/src/0mq/zmq_engine.hpp
index 47073cc..a2b753c 100644
--- a/src/0mq/zmq_engine.hpp
+++ b/src/0mq/zmq_engine.hpp
@@ -78,6 +78,9 @@ namespace zmq
bool plugged;
+ // True if input error has occurred.
+ bool input_error;
+
zmq_engine_t (const zmq_engine_t&);
const zmq_engine_t &operator = (const zmq_engine_t&);
};