summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJaromil <jaromil@dyne.org>2012-05-19 14:47:16 (GMT)
committer Jaromil <jaromil@dyne.org>2012-05-19 14:47:16 (GMT)
commitbb9db29e1a547e2b6ef524c6e459aaaeec1bd94f (patch)
tree89ff0969c2cf4dd3d389f3a2ef7d27778b2e35b2
parentfbf002e38b2993276e39972affd1df5009555675 (diff)
0mq updated to 2.2.0
-rw-r--r--src/0mq/blob.hpp4
-rw-r--r--src/0mq/clock.cpp19
-rw-r--r--src/0mq/command.hpp4
-rw-r--r--src/0mq/ctx.hpp2
-rw-r--r--src/0mq/encoder.cpp4
-rw-r--r--src/0mq/gcc_421_char_traits.hpp119
-rw-r--r--src/0mq/options.cpp50
-rw-r--r--src/0mq/options.hpp4
-rw-r--r--src/0mq/socket_base.cpp8
-rw-r--r--src/0mq/trie.cpp31
-rw-r--r--src/0mq/trie.hpp3
-rw-r--r--src/include/zmq.h8
-rw-r--r--src/include/zmq.hpp349
13 files changed, 583 insertions, 22 deletions
diff --git a/src/0mq/blob.hpp b/src/0mq/blob.hpp
index 3c54ac3..8f5ee8d 100644
--- a/src/0mq/blob.hpp
+++ b/src/0mq/blob.hpp
@@ -23,6 +23,10 @@
#include <string>
+#if (defined(__GNUC__) && (__GNUC__ == 3))
+#include "gcc_421_char_traits.hpp"
+#endif
+
namespace zmq
{
diff --git a/src/0mq/clock.cpp b/src/0mq/clock.cpp
index f1da091..96dc40c 100644
--- a/src/0mq/clock.cpp
+++ b/src/0mq/clock.cpp
@@ -34,7 +34,7 @@
#include <sys/time.h>
#endif
-#if defined HAVE_CLOCK_GETTIME
+#if defined HAVE_CLOCK_GETTIME || defined HAVE_GETHRTIME
#include <time.h>
#endif
@@ -65,14 +65,27 @@ uint64_t zmq::clock_t::now_us ()
double ticks_div = (double) (ticksPerSecond.QuadPart / 1000000);
return (uint64_t) (tick.QuadPart / ticks_div);
-#elif defined HAVE_CLOCK_GETTIME
+#elif defined HAVE_CLOCK_GETTIME && defined CLOCK_MONOTONIC
// Use POSIX clock_gettime function to get precise monotonic time.
struct timespec tv;
int rc = clock_gettime (CLOCK_MONOTONIC, &tv);
- errno_assert (rc == 0);
+ // Fix case where system has clock_gettime but CLOCK_MONOTONIC is not supported.
+ // Done at runtime because a ./configure check is bad for
+ // cross-compiling.
+ if( rc != 0) {
+ // Use POSIX gettimeofday function to get precise time.
+ struct timeval tv;
+ int rc = gettimeofday (&tv, NULL);
+ errno_assert (rc == 0);
+ return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec);
+ }
return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_nsec / 1000);
+#elif defined HAVE_GETHRTIME
+
+ return (gethrtime () / 1000);
+
#else
// Use POSIX gettimeofday function to get precise time.
diff --git a/src/0mq/command.hpp b/src/0mq/command.hpp
index 35aed0f..49f5642 100644
--- a/src/0mq/command.hpp
+++ b/src/0mq/command.hpp
@@ -72,7 +72,7 @@ namespace zmq
// session that the connection have failed.
struct {
struct i_engine *engine;
- unsigned char peer_identity_size;
+ size_t peer_identity_size;
unsigned char *peer_identity;
} attach;
@@ -81,7 +81,7 @@ namespace zmq
struct {
class reader_t *in_pipe;
class writer_t *out_pipe;
- unsigned char peer_identity_size;
+ size_t peer_identity_size;
unsigned char *peer_identity;
} bind;
diff --git a/src/0mq/ctx.hpp b/src/0mq/ctx.hpp
index 33d5dad..b44d569 100644
--- a/src/0mq/ctx.hpp
+++ b/src/0mq/ctx.hpp
@@ -97,9 +97,9 @@ namespace zmq
reaper_tid = 1
};
+ ~ctx_t ();
private:
- ~ctx_t ();
// Used to check whether the object is a context.
uint32_t tag;
diff --git a/src/0mq/encoder.cpp b/src/0mq/encoder.cpp
index 88e1dff..8f7b4b6 100644
--- a/src/0mq/encoder.cpp
+++ b/src/0mq/encoder.cpp
@@ -75,14 +75,14 @@ bool zmq::encoder_t::message_ready ()
// message size. In both cases 'flags' field follows.
if (size < 255) {
tmpbuf [0] = (unsigned char) size;
- tmpbuf [1] = (in_progress.flags & ~ZMQ_MSG_SHARED);
+ tmpbuf [1] = (in_progress.flags & ZMQ_MSG_MORE);
next_step (tmpbuf, 2, &encoder_t::size_ready,
!(in_progress.flags & ZMQ_MSG_MORE));
}
else {
tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size);
- tmpbuf [9] = (in_progress.flags & ~ZMQ_MSG_SHARED);
+ tmpbuf [9] = (in_progress.flags & ZMQ_MSG_MORE);
next_step (tmpbuf, 10, &encoder_t::size_ready,
!(in_progress.flags & ZMQ_MSG_MORE));
}
diff --git a/src/0mq/gcc_421_char_traits.hpp b/src/0mq/gcc_421_char_traits.hpp
new file mode 100644
index 0000000..e8f457d
--- /dev/null
+++ b/src/0mq/gcc_421_char_traits.hpp
@@ -0,0 +1,119 @@
+// Character Traits for use by standard string and iostream -*- C++ -*-
+
+// Copyright (C) 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005
+// Free Software Foundation, Inc.
+//
+// This file is part of the GNU ISO C++ Library. This library is free
+// software; you can redistribute it and/or modify it under the
+// terms of the GNU General Public License as published by the
+// Free Software Foundation; either version 2, or (at your option)
+// any later version.
+
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License along
+// with this library; see the file COPYING. If not, write to the Free
+// Software Foundation, 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
+// USA.
+
+// As a special exception, you may use this file as part of a free software
+// library without restriction. Specifically, if other files instantiate
+// templates or use macros or inline functions from this file, or you compile
+// this file and link it with other files to produce an executable, this
+// file does not by itself cause the resulting executable to be covered by
+// the GNU General Public License. This exception does not however
+// invalidate any other reasons why the executable file might be covered by
+// the GNU General Public License.
+
+/** @file char_traits.h
+ * This is an internal header file, included by other library headers.
+ * You should not attempt to use it directly.
+ */
+
+//
+// ISO C++ 14882: 21 Strings library
+//
+
+// This file has been snipped for 0mq to provide a definition for
+// struct char_traits<unsigned char>
+// Which is not implemented in GNU ISO C++ Library as of version 3.2.3
+// And is required by blob.hpp
+
+#ifndef _ZMQ_CHAR_TRAITS_H
+#define _ZMQ_CHAR_TRAITS_H 1
+
+namespace std
+{
+ /// @brief 21.1.3.1 char_traits specializations
+ template<>
+ struct char_traits<unsigned char>
+ {
+ typedef unsigned char char_type;
+ typedef int int_type;
+ typedef streampos pos_type;
+ typedef streamoff off_type;
+ typedef mbstate_t state_type;
+
+ static void
+ assign(char_type& __c1, const char_type& __c2)
+ { __c1 = __c2; }
+
+ static bool
+ eq(const char_type& __c1, const char_type& __c2)
+ { return __c1 == __c2; }
+
+ static bool
+ lt(const char_type& __c1, const char_type& __c2)
+ { return __c1 < __c2; }
+
+ static int
+ compare(const char_type* __s1, const char_type* __s2, size_t __n)
+ { return memcmp(__s1, __s2, __n); }
+
+ static size_t
+ length(const char_type* __s)
+ { return strlen((const char*)__s); }
+
+ static const char_type*
+ find(const char_type* __s, size_t __n, const char_type& __a)
+ { return static_cast<const char_type*>(memchr(__s, __a, __n)); }
+
+ static char_type*
+ move(char_type* __s1, const char_type* __s2, size_t __n)
+ { return static_cast<char_type*>(memmove(__s1, __s2, __n)); }
+
+ static char_type*
+ copy(char_type* __s1, const char_type* __s2, size_t __n)
+ { return static_cast<char_type*>(memcpy(__s1, __s2, __n)); }
+
+ static char_type*
+ assign(char_type* __s, size_t __n, char_type __a)
+ { return static_cast<char_type*>(memset(__s, __a, __n)); }
+
+ static char_type
+ to_char_type(const int_type& __c)
+ { return static_cast<char_type>(__c); }
+
+ // To keep both the byte 0xff and the eof symbol 0xffffffff
+ // from ending up as 0xffffffff.
+ static int_type
+ to_int_type(const char_type& __c)
+ { return static_cast<int_type>(static_cast<unsigned char>(__c)); }
+
+ static bool
+ eq_int_type(const int_type& __c1, const int_type& __c2)
+ { return __c1 == __c2; }
+
+ static int_type
+ eof() { return static_cast<int_type>(EOF); }
+
+ static int_type
+ not_eof(const int_type& __c)
+ { return (__c == eof()) ? 0 : __c; }
+ };
+}
+
+#endif
diff --git a/src/0mq/options.cpp b/src/0mq/options.cpp
index 952907b..fc6bd06 100644
--- a/src/0mq/options.cpp
+++ b/src/0mq/options.cpp
@@ -18,6 +18,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <stdio.h>
#include <string.h>
#ifndef ZMQ_HAVE_WINDOWS
#include <sys/stat.h>
@@ -44,7 +45,9 @@ zmq::options_t::options_t () :
backlog (100),
requires_in (false),
requires_out (false),
- immediate_connect (true)
+ immediate_connect (true),
+ rcvtimeo (-1),
+ sndtimeo (-1)
{
}
@@ -68,7 +71,12 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
// Check that SWAP directory (.) is writable
struct stat stat_buf;
- if (stat (".", &stat_buf) || ((stat_buf.st_mode & S_IWRITE) == 0)) {
+// #if (ZMQ_HAVE_ANDROID | ZMQ_HAVE_LINUX)
+#ifdef ZMQ_HAVE_LINUX
+ if (stat (".", &stat_buf) || ((stat_buf.st_mode & S_IWUSR) == 0)) {
+#else
+ if (stat (".", &stat_buf) || ((stat_buf.st_mode & S_IWRITE) == 0)) {
+#endif
errno = EACCES;
return -1;
}
@@ -103,7 +111,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
rate = (uint32_t) *((int64_t*) optval_);
return 0;
-
+
case ZMQ_RECOVERY_IVL:
if (optvallen_ != sizeof (int64_t) || *((int64_t*) optval_) < 0) {
errno = EINVAL;
@@ -191,6 +199,22 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
backlog = *((int*) optval_);
return 0;
+ case ZMQ_RCVTIMEO:
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ rcvtimeo = *((int*) optval_);
+ return 0;
+
+ case ZMQ_SNDTIMEO:
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ sndtimeo = *((int*) optval_);
+ return 0;
+
}
errno = EINVAL;
@@ -246,7 +270,7 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*((int64_t*) optval_) = rate;
*optvallen_ = sizeof (int64_t);
return 0;
-
+
case ZMQ_RECOVERY_IVL:
if (*optvallen_ < sizeof (int64_t)) {
errno = EINVAL;
@@ -337,6 +361,24 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int);
return 0;
+ case ZMQ_RCVTIMEO:
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = rcvtimeo;
+ *optvallen_ = sizeof (int);
+ return 0;
+
+ case ZMQ_SNDTIMEO:
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = sndtimeo;
+ *optvallen_ = sizeof (int);
+ return 0;
+
}
errno = EINVAL;
diff --git a/src/0mq/options.hpp b/src/0mq/options.hpp
index b2e29d0..537ea06 100644
--- a/src/0mq/options.hpp
+++ b/src/0mq/options.hpp
@@ -80,6 +80,10 @@ namespace zmq
// is not aware of the peer's identity, however, it is able to send
// messages straight away.
bool immediate_connect;
+
+ // The timeout for send/recv operations for this socket.
+ int rcvtimeo;
+ int sndtimeo;
};
}
diff --git a/src/0mq/socket_base.cpp b/src/0mq/socket_base.cpp
index 335a858..cd4152c 100644
--- a/src/0mq/socket_base.cpp
+++ b/src/0mq/socket_base.cpp
@@ -510,13 +510,13 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
// In case of non-blocking send we'll simply propagate
// the error - including EAGAIN - upwards.
- if (flags_ & ZMQ_NOBLOCK)
+ if (flags_ & ZMQ_NOBLOCK || options.sndtimeo == 0)
return -1;
// Compute the time when the timeout should occur.
// If the timeout is infite, don't care.
clock_t clock ;
- int timeout = -1;
+ int timeout = options.sndtimeo;
uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
// Oops, we couldn't send the message. Wait for the next
@@ -589,7 +589,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// For non-blocking recv, commands are processed in case there's an
// activate_reader command already waiting int a command pipe.
// If it's not, return EAGAIN.
- if (flags_ & ZMQ_NOBLOCK) {
+ if (flags_ & ZMQ_NOBLOCK || options.rcvtimeo == 0) {
if (errno != EAGAIN)
return -1;
if (unlikely (process_commands (0, false) != 0))
@@ -608,7 +608,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// Compute the time when the timeout should occur.
// If the timeout is infite, don't care.
clock_t clock ;
- int timeout = -1;
+ int timeout = options.rcvtimeo;
uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
// In blocking scenario, commands are processed over and over again until
diff --git a/src/0mq/trie.cpp b/src/0mq/trie.cpp
index 4198ff3..883b23e 100644
--- a/src/0mq/trie.cpp
+++ b/src/0mq/trie.cpp
@@ -1,5 +1,6 @@
/*
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011-2012 Spotify AB
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -34,14 +35,18 @@
zmq::trie_t::trie_t () :
refcnt (0),
min (0),
- count (0)
+ count (0),
+ live_nodes (0)
{
}
zmq::trie_t::~trie_t ()
{
- if (count == 1)
+ if (count == 1) {
+ zmq_assert (next.node);
delete next.node;
+ next.node = 0;
+ }
else if (count > 1) {
for (unsigned short i = 0; i != count; ++i)
if (next.table [i])
@@ -112,6 +117,7 @@ void zmq::trie_t::add (unsigned char *prefix_, size_t size_)
if (!next.node) {
next.node = new (std::nothrow) trie_t;
alloc_assert (next.node);
+ ++live_nodes;
}
next.node->add (prefix_ + 1, size_ - 1);
}
@@ -119,6 +125,7 @@ void zmq::trie_t::add (unsigned char *prefix_, size_t size_)
if (!next.table [c - min]) {
next.table [c - min] = new (std::nothrow) trie_t;
alloc_assert (next.table [c - min]);
+ ++live_nodes;
}
next.table [c - min]->add (prefix_ + 1, size_ - 1);
}
@@ -143,7 +150,20 @@ bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_)
if (!next_node)
return false;
- return next_node->rm (prefix_ + 1, size_ - 1);
+ bool ret = next_node->rm (prefix_ + 1, size_ - 1);
+
+ if (next_node->is_redundant ()) {
+ delete next_node;
+ if (count == 1) {
+ next.node = 0;
+ count = 0;
+ }
+ else
+ next.table [c - min] = 0;
+ --live_nodes;
+ }
+
+ return ret;
}
bool zmq::trie_t::check (unsigned char *data_, size_t size_)
@@ -179,3 +199,8 @@ bool zmq::trie_t::check (unsigned char *data_, size_t size_)
size_--;
}
}
+
+bool zmq::trie_t::is_redundant() const
+{
+ return refcnt == 0 && live_nodes == 0;
+}
diff --git a/src/0mq/trie.hpp b/src/0mq/trie.hpp
index dbf1cb1..7b1b62c 100644
--- a/src/0mq/trie.hpp
+++ b/src/0mq/trie.hpp
@@ -1,5 +1,6 @@
/*
Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2011-2012 Spotify AB
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -40,10 +41,12 @@ namespace zmq
bool check (unsigned char *data_, size_t size_);
private:
+ bool is_redundant () const;
uint32_t refcnt;
unsigned char min;
unsigned short count;
+ unsigned short live_nodes;
union {
class trie_t *node;
class trie_t **table;
diff --git a/src/include/zmq.h b/src/include/zmq.h
index a507156..3352b26 100644
--- a/src/include/zmq.h
+++ b/src/include/zmq.h
@@ -54,8 +54,8 @@ extern "C" {
/* Version macros for compile-time API version detection */
#define ZMQ_VERSION_MAJOR 2
-#define ZMQ_VERSION_MINOR 1
-#define ZMQ_VERSION_PATCH 11
+#define ZMQ_VERSION_MINOR 2
+#define ZMQ_VERSION_PATCH 0
#define ZMQ_MAKE_VERSION(major, minor, patch) \
((major) * 10000 + (minor) * 100 + (patch))
@@ -211,7 +211,9 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_BACKLOG 19
#define ZMQ_RECOVERY_IVL_MSEC 20 /* opt. recovery time, reconcile in 3.x */
#define ZMQ_RECONNECT_IVL_MAX 21
-
+#define ZMQ_RCVTIMEO 27
+#define ZMQ_SNDTIMEO 28
+
/* Send/recv options. */
#define ZMQ_NOBLOCK 1
#define ZMQ_SNDMORE 2
diff --git a/src/include/zmq.hpp b/src/include/zmq.hpp
new file mode 100644
index 0000000..0db6738
--- /dev/null
+++ b/src/include/zmq.hpp
@@ -0,0 +1,349 @@
+/*
+ Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_HPP_INCLUDED__
+#define __ZMQ_HPP_INCLUDED__
+
+#include "zmq.h"
+
+#include <algorithm>
+#include <cassert>
+#include <cstring>
+#include <exception>
+
+// Detect whether the compiler supports C++11 rvalue references.
+#if (defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 2)) && defined(__GXX_EXPERIMENTAL_CXX0X__))
+# define ZMQ_HAS_RVALUE_REFS
+#endif
+#if (defined(__clang__))
+# if __has_feature(cxx_rvalue_references)
+# define ZMQ_HAS_RVALUE_REFS
+# endif
+#endif
+#if (defined(_MSC_VER) && (_MSC_VER >= 1600))
+# define ZMQ_HAS_RVALUE_REFS
+#endif
+
+// In order to prevent unused variable warnings when building in non-debug
+// mode use this macro to make assertions.
+#ifndef NDEBUG
+# define ZMQ_ASSERT(expression) assert(expression)
+#else
+# define ZMQ_ASSERT(expression) (expression)
+#endif
+
+namespace zmq
+{
+
+ typedef zmq_free_fn free_fn;
+ typedef zmq_pollitem_t pollitem_t;
+
+ class error_t : public std::exception
+ {
+ public:
+
+ error_t () : errnum (zmq_errno ()) {}
+
+ virtual const char *what () const throw ()
+ {
+ return zmq_strerror (errnum);
+ }
+
+ int num () const
+ {
+ return errnum;
+ }
+
+ private:
+
+ int errnum;
+ };
+
+ inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1)
+ {
+ int rc = zmq_poll (items_, nitems_, timeout_);
+ if (rc < 0)
+ throw error_t ();
+ return rc;
+ }
+
+ inline void device (int device_, void * insocket_, void* outsocket_)
+ {
+ int rc = zmq_device (device_, insocket_, outsocket_);
+ if (rc != 0)
+ throw error_t ();
+ }
+
+ inline void version (int *major_, int *minor_, int *patch_)
+ {
+ zmq_version (major_, minor_, patch_);
+ }
+
+ class message_t : private zmq_msg_t
+ {
+ friend class socket_t;
+
+ public:
+
+ inline message_t ()
+ {
+ int rc = zmq_msg_init (this);
+ if (rc != 0)
+ throw error_t ();
+ }
+
+ inline message_t (size_t size_)
+ {
+ int rc = zmq_msg_init_size (this, size_);
+ if (rc != 0)
+ throw error_t ();
+ }
+
+ inline message_t (void *data_, size_t size_, free_fn *ffn_,
+ void *hint_ = NULL)
+ {
+ int rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_);
+ if (rc != 0)
+ throw error_t ();
+ }
+
+ inline ~message_t ()
+ {
+ int rc = zmq_msg_close (this);
+ ZMQ_ASSERT (rc == 0);
+ }
+
+ inline void rebuild ()
+ {
+ int rc = zmq_msg_close (this);
+ if (rc != 0)
+ throw error_t ();
+ rc = zmq_msg_init (this);
+ if (rc != 0)
+ throw error_t ();
+ }
+
+ inline void rebuild (size_t size_)
+ {
+ int rc = zmq_msg_close (this);
+ if (rc != 0)
+ throw error_t ();
+ rc = zmq_msg_init_size (this, size_);
+ if (rc != 0)
+ throw error_t ();
+ }
+
+ inline void rebuild (void *data_, size_t size_, free_fn *ffn_,
+ void *hint_ = NULL)
+ {
+ int rc = zmq_msg_close (this);
+ if (rc != 0)
+ throw error_t ();
+ rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_);
+ if (rc != 0)
+ throw error_t ();
+ }
+
+ inline void move (message_t *msg_)
+ {
+ int rc = zmq_msg_move (this, (zmq_msg_t*) msg_);
+ if (rc != 0)
+ throw error_t ();
+ }
+
+ inline void copy (message_t *msg_)
+ {
+ int rc = zmq_msg_copy (this, (zmq_msg_t*) msg_);
+ if (rc != 0)
+ throw error_t ();
+ }
+
+ inline void *data ()
+ {
+ return zmq_msg_data (this);
+ }
+
+ inline size_t size ()
+ {
+ return zmq_msg_size (this);
+ }
+
+ private:
+
+ // Disable implicit message copying, so that users won't use shared
+ // messages (less efficient) without being aware of the fact.
+ message_t (const message_t&);
+ void operator = (const message_t&);
+ };
+
+ class context_t
+ {
+ friend class socket_t;
+
+ public:
+
+ inline context_t (int io_threads_)
+ {
+ ptr = zmq_init (io_threads_);
+ if (ptr == NULL)
+ throw error_t ();
+ }
+
+#ifdef ZMQ_HAS_RVALUE_REFS
+ inline context_t(context_t&& rhs) : ptr(rhs.ptr)
+ {
+ rhs.ptr = NULL;
+ }
+ inline context_t& operator=(context_t&& rhs)
+ {
+ std::swap(ptr, rhs.ptr);
+ return *this;
+ }
+#endif
+
+ inline ~context_t ()
+ {
+ if (ptr == NULL)
+ return;
+ int rc = zmq_term (ptr);
+ ZMQ_ASSERT (rc == 0);
+ }
+
+ // Be careful with this, it's probably only useful for
+ // using the C api together with an existing C++ api.
+ // Normally you should never need to use this.
+ inline operator void* ()
+ {
+ return ptr;
+ }
+
+ private:
+
+ void *ptr;
+
+ context_t (const context_t&);
+ void operator = (const context_t&);
+ };
+
+ class socket_t
+ {
+ public:
+
+ inline socket_t (context_t &context_, int type_)
+ {
+ ptr = zmq_socket (context_.ptr, type_);
+ if (ptr == NULL)
+ throw error_t ();
+ }
+
+#ifdef ZMQ_HAS_RVALUE_REFS
+ inline socket_t(socket_t&& rhs) : ptr(rhs.ptr)
+ {
+ rhs.ptr = NULL;
+ }
+ inline socket_t& operator=(socket_t&& rhs)
+ {
+ std::swap(ptr, rhs.ptr);
+ return *this;
+ }
+#endif
+
+ inline ~socket_t ()
+ {
+ close();
+ }
+
+ inline operator void* ()
+ {
+ return ptr;
+ }
+
+ inline void close()
+ {
+ if(ptr == NULL)
+ // already closed
+ return ;
+ int rc = zmq_close (ptr);
+ if (rc != 0)
+ throw error_t ();
+ ptr = 0 ;
+ }
+
+ inline void setsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+ {
+ int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_);
+ if (rc != 0)
+ throw error_t ();
+ }
+
+ inline void getsockopt (int option_, void *optval_,
+ size_t *optvallen_)
+ {
+ int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_);
+ if (rc != 0)
+ throw error_t ();
+ }
+
+ inline void bind (const char *addr_)
+ {
+ int rc = zmq_bind (ptr, addr_);
+ if (rc != 0)
+ throw error_t ();
+ }
+
+ inline void connect (const char *addr_)
+ {
+ int rc = zmq_connect (ptr, addr_);
+ if (rc != 0)
+ throw error_t ();
+ }
+
+ inline bool send (message_t &msg_, int flags_ = 0)
+ {
+ int rc = zmq_send (ptr, &msg_, flags_);
+ if (rc == 0)
+ return true;
+ if (rc == -1 && zmq_errno () == EAGAIN)
+ return false;
+ throw error_t ();
+ }
+
+ inline bool recv (message_t *msg_, int flags_ = 0)
+ {
+ int rc = zmq_recv (ptr, msg_, flags_);
+ if (rc == 0)
+ return true;
+ if (rc == -1 && zmq_errno () == EAGAIN)
+ return false;
+ throw error_t ();
+ }
+
+ private:
+
+ void *ptr;
+
+ socket_t (const socket_t&);
+ void operator = (const socket_t&);
+ };
+
+}
+
+#endif