summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorevoskuil <eric@voskuil.org>2014-03-16 22:12:35 (GMT)
committer evoskuil <eric@voskuil.org>2014-03-16 22:12:35 (GMT)
commit26c4702836c1294212981d546a07abc65cd44129 (patch)
tree341ec9459c7f261baa40c8931e882f81ba6b5518
parentd0f3b35ce103b0917dd2e13f5cf8859129bf6da0 (diff)
parente626e1c1ca44fb2c5dfa5a0285e82b7504827624 (diff)
Merge branch 'origin/master'refs/pull/9/head
-rw-r--r--configure.ac3
-rw-r--r--doc/prot-sphinx/api.rst10
-rw-r--r--doc/prot-sphinx/connecting.rst6
-rw-r--r--doc/prot-sphinx/messages.rst44
-rw-r--r--include/obelisk/Makefile.am3
-rw-r--r--include/obelisk/client/backend.hpp11
-rw-r--r--include/obelisk/client/blockchain.hpp3
-rw-r--r--include/obelisk/client/interface.hpp16
-rw-r--r--include/obelisk/message.hpp11
-rw-r--r--include/obelisk/obelisk.hpp2
-rw-r--r--include/obelisk/zmq_message.hpp26
-rw-r--r--libobelisk.pc.in2
-rw-r--r--src/Makefile.am14
-rw-r--r--src/balancer/Makefile10
-rw-r--r--src/balancer/balancer.cfg5
-rw-r--r--src/balancer/balancer.cpp322
-rw-r--r--src/balancer/config.cpp37
-rw-r--r--src/balancer/config.hpp15
-rw-r--r--src/client/backend.cpp30
-rw-r--r--src/client/interface/blockchain.cpp45
-rw-r--r--src/client/interface/fetch_x.cpp2
-rw-r--r--src/client/interface/interface.cpp57
-rw-r--r--src/message.cpp64
-rw-r--r--src/worker/config.cpp23
-rw-r--r--src/worker/config.hpp7
-rw-r--r--src/worker/main.cpp35
-rw-r--r--src/worker/node_impl.cpp2
-rw-r--r--src/worker/publisher.cpp35
-rw-r--r--src/worker/publisher.hpp12
-rw-r--r--src/worker/service/blockchain.cpp44
-rw-r--r--src/worker/service/blockchain.hpp3
-rw-r--r--src/worker/service/fetch_x.cpp2
-rw-r--r--src/worker/worker.cfg19
-rw-r--r--src/worker/worker.cpp184
-rw-r--r--src/worker/worker.hpp35
-rw-r--r--src/zmq_message.cpp72
36 files changed, 370 insertions, 841 deletions
diff --git a/configure.ac b/configure.ac
index 93d87fe..1eb6019 100644
--- a/configure.ac
+++ b/configure.ac
@@ -13,6 +13,9 @@ m4_ifdef([AM_SILENT_RULES], [AM_SILENT_RULES([yes])])
PKG_PROG_PKG_CONFIG
PKG_CHECK_MODULES([libbitcoin], [libbitcoin])
PKG_CHECK_MODULES([libzmq], [libzmq])
+PKG_CHECK_MODULES([libsodium], [libsodium])
+PKG_CHECK_MODULES([libczmq], [libczmq])
+PKG_CHECK_MODULES([libczmqpp], [libczmq++])
PKG_CHECK_MODULES([libconfigxx], [libconfig++])
AC_ARG_WITH([pkgconfigdir], AS_HELP_STRING([--with-pkgconfigdir=PATH],
diff --git a/doc/prot-sphinx/api.rst b/doc/prot-sphinx/api.rst
index 521b419..eebd4a9 100644
--- a/doc/prot-sphinx/api.rst
+++ b/doc/prot-sphinx/api.rst
@@ -84,6 +84,16 @@ Request block_hash(32)
Reply ec(4) + block_height(4)
================== =======================
+Fetch stealth information.
+
+============= ==============================================================
+fetch_stealth
+============= ==============================================================
+============= ==============================================================
+Request prefix(number_bits(1) + bitfield(4)) + from_height(4)
+Reply ec(4) + stealth_list(ephemkey(33) + address(21) + tx_hash(32))
+============= ==============================================================
+
Transaction pool
================
diff --git a/doc/prot-sphinx/connecting.rst b/doc/prot-sphinx/connecting.rst
index 7972ae3..2aa286a 100644
--- a/doc/prot-sphinx/connecting.rst
+++ b/doc/prot-sphinx/connecting.rst
@@ -4,10 +4,8 @@
Connecting
**********
-We use the ZeroMQ ROUTER-DEALER combination with the load balancer to perform
-asynchronous request-reply pairs. If the balancer server does not respond in
-time then the client can resend the request to a different worker. A good
-timeout value to use is 30 seconds.
+We use the ZeroMQ ROUTER-DEALER combination with the backend worker to perform
+asynchronous request-reply pairs.
To connect to the server using ZeroMQ in C++, we use:
::
diff --git a/doc/prot-sphinx/messages.rst b/doc/prot-sphinx/messages.rst
index 6da7999..9767809 100644
--- a/doc/prot-sphinx/messages.rst
+++ b/doc/prot-sphinx/messages.rst
@@ -43,8 +43,6 @@ Imagine we send 5 messages:
socket.send(msg1, ZMQ_SNDMORE)
socket.send(msg2, ZMQ_SNDMORE)
- socket.send(msg3, ZMQ_SNDMORE)
- socket.send(msg4, ZMQ_SNDMORE)
socket.send(msg5, 0)
When the message 5 arrives to the server, then message 1-4 will be available
@@ -53,22 +51,22 @@ too for the server to receive. ZeroMQ calls this a *frame*.
Different ZeroMQ socket types can modify the frame by popping off beginning
messages before passing them to the API.
-In this document, we will imagine an abstract message type called
-:class:`obelisk_message` with the folowing methods.
+In this document, we use the class:`czmqpp::message` class
+which implements the folowing methods.
-.. cpp:function:: void obelisk_message::append(part)
+.. cpp:function:: void czmqpp::append(part)
Add a new field to the message.
-.. cpp:function:: parts_list obelisk_message::parts()
+.. cpp:function:: parts_list czmqpp::parts()
Return array of all appended parts.
-.. cpp:function:: bool obelisk_message::send(socket)
+.. cpp:function:: bool czmqpp::send(socket)
Send entire frame to socket ending on last appended part.
-.. cpp:function:: bool obelisk_message::recv(socket)
+.. cpp:function:: bool czmqpp::receive(socket)
Receive from the socket until no more messages are available in this frame.
@@ -93,25 +91,11 @@ The format of request and reply fields are very similar.
============== ====================
Request Fields Type(Size)
============== ====================
-destination worker_uuid(0 or 17)
command string
id uint32(4)
data data
-checksum uint32(4)
============== ====================
-`destination` describes which backend worker the load balancer should direct
-the message to. If empty, then the load balancer picks a random backend
-worker. This should only be set in specific conditions where you want to
-avoid race conditions. In general it's better to write more resilient code
-that is able to handle asynchronity without demanding total consistency.
-The worker_uuid usually should be 17 bytes if specifying a destination.
-If not then it is 0 bytes (load balancer selects a worker).
-
-Note: that there is a feature to name the workers. If so, then this field
-size can vary depending on the number of bytes needed for the custom
-worker_uuid.
-
`command` is the remote method invoked on the worker.
`id` is a random value chosen by the client for corralating server replies with
@@ -119,27 +103,11 @@ requests the client sent.
`data` is the remote method parameter serialized as binary data.
-`checksum` is a 4 byte checksum of `data`, calculated using the Bitcoin
-checksum method. Checksum = first 4 bytes of `sha256(sha256(data))`.
-
============== ====================
Reply Fields Type(Size)
============== ====================
-origin worker_uuid(0 or 17)
command string
id uint32(4)
data data
-checksum uint32(4)
============== ====================
-The only difference with replies, is the first field indicates which worker
-responded back. This is useful for if we want to batch a series of requests
-to the same worker. An example might be subscribing to an address, and fetching
-the history for the same address. Such an operation should be called on the
-same worker to guarantee against a race condition.
-
-But this feature should not be abused. Taking the example futher, if we are
-iterating a list of addresses in a wallet, we should **not** be sending all
-requests to the same worker, overloading the same worker with operations that
-aren't interlinked.
-
diff --git a/include/obelisk/Makefile.am b/include/obelisk/Makefile.am
index 17b205a..9cd1fb4 100644
--- a/include/obelisk/Makefile.am
+++ b/include/obelisk/Makefile.am
@@ -1,8 +1,7 @@
obelisk_includedir = $(includedir)/obelisk
obelisk_include_HEADERS = \
obelisk.hpp \
- message.hpp \
- zmq_message.hpp
+ message.hpp
obelisk_client_includedir = $(includedir)/obelisk/client
obelisk_client_include_HEADERS = \
diff --git a/include/obelisk/client/backend.hpp b/include/obelisk/client/backend.hpp
index ee23d50..4e88d8b 100644
--- a/include/obelisk/client/backend.hpp
+++ b/include/obelisk/client/backend.hpp
@@ -5,7 +5,6 @@
#include <unordered_map>
#include <system_error>
#include <boost/date_time/posix_time/posix_time.hpp>
-#include <zmq.hpp>
#include <obelisk/message.hpp>
#include <bitcoin/threadpool.hpp>
@@ -20,7 +19,8 @@ public:
const bc::data_chunk&, const worker_uuid&)> response_handler;
backend_cluster(bc::threadpool& pool,
- zmq::context_t& context, const std::string& connection);
+ czmqpp::context& context, const std::string& connection,
+ const std::string& cert_filename, const std::string& server_pubkey);
void request(
const std::string& command, const bc::data_chunk& data,
@@ -45,6 +45,8 @@ private:
typedef std::unordered_map<std::string, response_handler> filter_map;
+ void enable_crypto(
+ const std::string& cert_filename, const std::string& server_pubkey);
void send(const outgoing_message& message);
void receive_incoming();
@@ -54,8 +56,9 @@ private:
void resend_expired();
- zmq::context_t& context_;
- zmq::socket_t socket_;
+ czmqpp::context& context_;
+ czmqpp::socket socket_;
+ czmqpp::certificate cert_;
// Requests
bc::async_strand strand_;
response_handler_map handlers_;
diff --git a/include/obelisk/client/blockchain.hpp b/include/obelisk/client/blockchain.hpp
index b69ea38..125cb8c 100644
--- a/include/obelisk/client/blockchain.hpp
+++ b/include/obelisk/client/blockchain.hpp
@@ -23,6 +23,9 @@ public:
bc::blockchain::fetch_handler_block_header handle_fetch);
void fetch_transaction_index(const bc::hash_digest& tx_hash,
bc::blockchain::fetch_handler_transaction_index handle_fetch);
+ void fetch_stealth(const bc::stealth_prefix& prefix,
+ bc::blockchain::fetch_handler_stealth handle_fetch,
+ size_t from_height=0);
private:
backend_cluster& backend_;
};
diff --git a/include/obelisk/client/interface.hpp b/include/obelisk/client/interface.hpp
index 9042489..d009827 100644
--- a/include/obelisk/client/interface.hpp
+++ b/include/obelisk/client/interface.hpp
@@ -16,7 +16,7 @@ public:
typedef std::function<void (const bc::transaction_type&)>
transaction_notify_callback;
- subscriber_part(zmq::context_t& context);
+ subscriber_part(czmqpp::context& context);
// Non-copyable
subscriber_part(const subscriber_part&) = delete;
@@ -29,16 +29,13 @@ public:
void update();
private:
- typedef std::unique_ptr<zmq::socket_t> zmq_socket_uniqptr;
-
- bool setup_socket(const std::string& connection,
- zmq_socket_uniqptr& socket);
+ bool setup_socket(
+ const std::string& connection, czmqpp::socket socket);
void recv_tx();
void recv_block();
- zmq::context_t& context_;
- zmq_socket_uniqptr socket_block_, socket_tx_;
+ czmqpp::socket socket_block_, socket_tx_;
block_notify_callback notify_block_;
transaction_notify_callback notify_tx_;
};
@@ -108,7 +105,8 @@ private:
class fullnode_interface
{
public:
- fullnode_interface(bc::threadpool& pool, const std::string& connection);
+ fullnode_interface(bc::threadpool& pool, const std::string& connection,
+ const std::string& cert_filename, const std::string& server_pubkey);
// Non-copyable
fullnode_interface(const fullnode_interface&) = delete;
@@ -122,7 +120,7 @@ public:
subscriber_part::transaction_notify_callback notify_tx);
private:
- zmq::context_t context_;
+ czmqpp::context context_;
backend_cluster backend_;
subscriber_part subscriber_;
diff --git a/include/obelisk/message.hpp b/include/obelisk/message.hpp
index 96ef6ba..45a01ed 100644
--- a/include/obelisk/message.hpp
+++ b/include/obelisk/message.hpp
@@ -1,20 +1,17 @@
#ifndef OBELISK_CLIENT_MESSAGE
#define OBELISK_CLIENT_MESSAGE
-#include <zmq.hpp>
+#include <czmq++/czmq.hpp>
#include <bitcoin/types.hpp>
namespace obelisk {
namespace bc = libbitcoin;
-typedef std::shared_ptr<zmq::socket_t> zmq_socket_ptr;
-
class incoming_message
{
public:
- bool recv(zmq::socket_t& socket);
- bool is_signal() const;
+ bool recv(czmqpp::socket& socket);
const bc::data_chunk origin() const;
const std::string& command() const;
const uint32_t id() const;
@@ -36,12 +33,10 @@ public:
const bc::data_chunk& data);
outgoing_message(
const incoming_message& request, const bc::data_chunk& data);
- // Control messages sent to the bouncer.
- outgoing_message(const std::string& command);
// Default constructor provided for containers and copying.
outgoing_message();
- void send(zmq::socket_t& socket) const;
+ void send(czmqpp::socket& socket) const;
const uint32_t id() const;
private:
diff --git a/include/obelisk/obelisk.hpp b/include/obelisk/obelisk.hpp
index 61d84fd..9e6efd2 100644
--- a/include/obelisk/obelisk.hpp
+++ b/include/obelisk/obelisk.hpp
@@ -3,8 +3,6 @@
// Convenience header that includes everything
// Not to be used internally. For API users.
-#include <obelisk/zmq_message.hpp>
-#include <obelisk/obelisk.hpp>
#include <obelisk/client/transaction_pool.hpp>
#include <obelisk/client/backend.hpp>
#include <obelisk/client/blockchain.hpp>
diff --git a/include/obelisk/zmq_message.hpp b/include/obelisk/zmq_message.hpp
deleted file mode 100644
index d4e828a..0000000
--- a/include/obelisk/zmq_message.hpp
+++ /dev/null
@@ -1,26 +0,0 @@
-#ifndef OBELISK_ZMQ_MESSAGE
-#define OBELISK_ZMQ_MESSAGE
-
-#include <zmq.hpp>
-#include <bitcoin/types.hpp>
-
-namespace obelisk {
-
-namespace bc = libbitcoin;
-
-class zmq_message
-{
-public:
- void append(const bc::data_chunk& part);
- bool send(zmq::socket_t& socket) const;
- bool recv(zmq::socket_t& socket);
- const bc::data_stack& parts() const;
-
-private:
- bc::data_stack parts_;
-};
-
-} // namespace obelisk
-
-#endif
-
diff --git a/libobelisk.pc.in b/libobelisk.pc.in
index 9dac6eb..8386a89 100644
--- a/libobelisk.pc.in
+++ b/libobelisk.pc.in
@@ -6,7 +6,7 @@ includedir=@includedir@
Name: libobelisk
Description: Obelisk network interface for scalable asynchronous blockchain queries.
Version: @PACKAGE_VERSION@
-Requires: libzmq libconfig++ libbitcoin
+Requires: libczmq++ libconfig++ libbitcoin
Cflags: -I${includedir} -std=c++11
Libs: -L${libdir} -lobelisk
diff --git a/src/Makefile.am b/src/Makefile.am
index c89d619..04b7dcd 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1,11 +1,10 @@
AUTOMAKE_OPTIONS = subdir-objects
-AM_CPPFLAGS = -I$(srcdir)/../include ${libbitcoin_CFLAGS} -DSYSCONFDIR=\"${sysconfdir}\"
+AM_CPPFLAGS = -I$(srcdir)/../include ${libbitcoin_CFLAGS} ${libczmqpp_LIBS} ${libconfigxx_CFLAGS} -DSYSCONFDIR=\"${sysconfdir}\"
lib_LTLIBRARIES = libobelisk.la
libobelisk_la_SOURCES = \
message.cpp \
- zmq_message.cpp \
client/backend.cpp \
client/interface/interface.cpp \
client/interface/fetch_x.cpp \
@@ -13,19 +12,12 @@ libobelisk_la_SOURCES = \
client/interface/protocol.cpp \
client/interface/transaction_pool.cpp
-bin_PROGRAMS = obbalancer obworker
+bin_PROGRAMS = obworker
confdir=$(sysconfdir)/obelisk
conf_DATA = \
- balancer/balancer.cfg \
worker/worker.cfg
-obbalancer_SOURCES = \
- balancer/balancer.cpp \
- balancer/config.cpp
-
-obbalancer_LDADD = -lobelisk ${libbitcoin_LIBS} ${libzmq_LIBS} ${libconfigxx_LIBS}
-
obworker_SOURCES = \
worker/config.cpp \
worker/echo.cpp \
@@ -40,5 +32,5 @@ obworker_SOURCES = \
worker/service/protocol.cpp \
worker/service/transaction_pool.cpp
-obworker_LDADD = -lobelisk ${libbitcoin_LIBS} ${libzmq_LIBS} ${libconfigxx_LIBS}
+obworker_LDADD = -lobelisk ${libbitcoin_LIBS} ${libzmq_LIBS} ${libsodium_LIBS} ${libczmq_LIBS} ${libczmqpp_LIBS} ${libconfigxx_LIBS}
diff --git a/src/balancer/Makefile b/src/balancer/Makefile
deleted file mode 100644
index e3faa7b..0000000
--- a/src/balancer/Makefile
+++ /dev/null
@@ -1,10 +0,0 @@
-default: obbalancer
-
-config.o:
- g++ -c config.cpp -o config.o $(shell pkg-config --cflags libbitcoin)
-balancer.o:
- g++ -c balancer.cpp -o balancer.o $(shell pkg-config --cflags libbitcoin)
-
-obbalancer: config.o balancer.o
- g++ balancer.o config.o -lzmq -lconfig++ -o obbalancer $(shell pkg-config --libs libbitcoin)
-
diff --git a/src/balancer/balancer.cfg b/src/balancer/balancer.cfg
deleted file mode 100644
index a4dc28e..0000000
--- a/src/balancer/balancer.cfg
+++ /dev/null
@@ -1,5 +0,0 @@
-# Port which clients will connect on.
-frontend = "tcp://*:9091"
-# Port which backend workers will connect to.
-backend = "tcp://*:9092"
-
diff --git a/src/balancer/balancer.cpp b/src/balancer/balancer.cpp
deleted file mode 100644
index cd93d26..0000000
--- a/src/balancer/balancer.cpp
+++ /dev/null
@@ -1,322 +0,0 @@
-#include <vector>
-#include <boost/filesystem.hpp>
-#include <bitcoin/bitcoin.hpp>
-#include <obelisk/zmq_message.hpp>
-#include "config.hpp"
-
-#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
-#define HEARTBEAT_INTERVAL 1000 // msecs
-
-#define LOG_BALANCER "balancer"
-
-#if ZMQ_VERSION_MAJOR == 4
- constexpr size_t zmq_uuid_size = 5;
-#else
- constexpr size_t zmq_uuid_size = 17;
-#endif
-
-using namespace bc;
-using namespace obelisk;
-
-typedef data_chunk worker_uuid;
-
-static void s_version_assert(int want_major, int want_minor)
-{
- int major, minor, patch;
- zmq_version (&major, &minor, &patch);
- if (major < want_major
- || (major == want_major && minor < want_minor)) {
- std::cout << "Current 0MQ version is " << major << "." << minor << std::endl;
- std::cout << "Application needs at least " << want_major << "." << want_minor
- << " - cannot continue" << std::endl;
- exit (EXIT_FAILURE);
- }
-}
-
-// Return current system clock as milliseconds
-static int64_t s_clock(void)
-{
-#if (defined (__WINDOWS__))
- SYSTEMTIME st;
- GetSystemTime (&st);
- return (int64_t) st.wSecond * 1000 + st.wMilliseconds;
-#else
- struct timeval tv;
- gettimeofday (&tv, NULL);
- return (int64_t) (tv.tv_sec * 1000 + tv.tv_usec / 1000);
-#endif
-}
-
-// This defines one active worker in our worker queue
-
-typedef struct {
- worker_uuid identity; // Address of worker
- int64_t expiry; // Expires at this time
-} worker_t;
-
-// Insert worker at end of queue, reset expiry
-// Worker must not already be in queue
-void s_worker_append(std::vector<worker_t>& queue, const worker_uuid& identity)
-{
- bool found = false;
- for (auto it = queue.begin(); it < queue.end(); ++it)
- {
- if (it->identity == identity)
- {
- std::cout << "E: duplicate worker identity " << identity << std::endl;
- found = true;
- break;
- }
- }
- if (!found)
- {
- worker_t worker;
- worker.identity = identity;
- worker.expiry = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
- queue.push_back(worker);
- }
-}
-
-// Remove worker from queue, if present
-void s_worker_delete(std::vector<worker_t>& queue, const worker_uuid& identity)
-{
- for (auto it = queue.begin(); it < queue.end(); ++it)
- {
- if (it->identity == identity)
- {
- it = queue.erase(it);
- break;
- }
- }
-}
-
-// Reset worker expiry, worker must be present
-void s_worker_refresh(std::vector<worker_t>& queue, const worker_uuid& identity)
-{
- bool found = false;
- for (auto it = queue.begin(); it < queue.end(); ++it)
- {
- if (it->identity == identity)
- {
- it->expiry = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
- found = true;
- break;
- }
- }
- if (!found)
- {
- std::cout << "E: worker " << identity << " not ready" << std::endl;
- }
-}
-
-// Pop next available worker off queue, return identity
-const worker_uuid s_worker_dequeue(std::vector<worker_t>& queue)
-{
- assert(queue.size());
- const worker_uuid identity = queue[0].identity;
- queue.erase(queue.begin());
- return identity;
-}
-
-// Look for & kill expired workers
-void s_queue_purge(std::vector<worker_t>& queue)
-{
- int64_t clock = s_clock();
- for (auto it = queue.begin(); it < queue.end(); ++it)
- {
- if (clock > it->expiry)
- {
- it = queue.erase(it) - 1;
- }
- }
-}
-
-typedef std::vector<worker_t> worker_queue;
-
-void forward_request(zmq::socket_t& frontend, zmq::socket_t& backend,
- worker_queue& queue)
-{
- // Get client request.
- zmq_message msg_in;
- msg_in.recv(frontend);
- const data_stack& in_parts = msg_in.parts();
-
- if (in_parts.size() != 6)
- {
- log_warning(LOG_BALANCER) << "Wrong sized message";
- return;
- }
- // First item is client's identity.
- if (in_parts[0].size() != zmq_uuid_size)
- {
- log_warning(LOG_BALANCER) << "Client UUID malformed " << in_parts[0];
- return;
- }
- // Second item is worker identity or nothing.
- if (in_parts[1].size() > 255)
- {
- log_warning(LOG_BALANCER) << "Worker UUID malformed";
- return;
- }
-
- // We now deconstruct the request message to the backend
- // which looks like:
- // [CLIENT UUID]
- // [WORKER UUID]
- // ...
- // And create a new message that looks like:
- // [WORKER UUID]
- // [CLIENT UUID]
- // ...
- // Before sending it to the worker.
-
- // This is so the client can specify a worker to send their
- // request specifically to.
-
- zmq_message msg_out;
- // If second frame is nothing, then we select a random worker.
- if (in_parts[1].empty())
- {
- // Route to next worker
- worker_uuid worker_identity = s_worker_dequeue(queue);
- msg_out.append(worker_identity);
- }
- else
- {
- // Route to client's preferred worker.
- msg_out.append(in_parts[1]);
- }
- msg_out.append(in_parts[0]);
- // Copy the remaining data.
- for (auto it = in_parts.begin() + 2; it != in_parts.end(); ++it)
- msg_out.append(*it);
- msg_out.send(backend);
-}
-
-void handle_control_message(const data_stack& in_parts,
- worker_queue& queue, const worker_uuid& identity)
-{
- std::string command(in_parts[1].begin(), in_parts[1].end());
- log_info(LOG_BALANCER) << "command: " << command;
- if (command == "READY")
- {
- s_worker_delete(queue, identity);
- s_worker_append(queue, identity);
- }
- else if (command == "HEARTBEAT")
- {
- s_worker_refresh(queue, identity);
- }
- else
- log_error(LOG_BALANCER)
- << "Invalid command from " << identity;
-}
-
-void passback_response(zmq::socket_t& backend, zmq::socket_t& frontend,
- worker_queue& queue)
-{
- zmq_message msg_in;
- msg_in.recv(backend);
- const data_stack& in_parts = msg_in.parts();
- BITCOIN_ASSERT(in_parts.size() == 2 || in_parts.size() == 6);
- worker_uuid identity = in_parts[0];
-
- // Return reply to client if it's not a control message
- if (in_parts.size() == 2)
- return handle_control_message(in_parts, queue, identity);
-
- // We now deconstruct the request message to the frontend
- // which looks like:
- // [WORKER UUID]
- // [CLIENT UUID]
- // ...
- // And create a new message that looks like:
- // [CLIENT UUID]
- // [WORKER UUID]
- // ...
- // Before sending it to the backend.
-
- // This is so the client will know which worker
- // responded to their request.
-
- BITCOIN_ASSERT(in_parts.size() == 6);
- zmq_message msg_out;
- BITCOIN_ASSERT(in_parts[1].size() == zmq_uuid_size);
- msg_out.append(in_parts[1]);
- BITCOIN_ASSERT(in_parts[0].size() > 0 && in_parts[0].size() < 256);
- msg_out.append(in_parts[0]);
- for (auto it = in_parts.begin() + 2; it != in_parts.end(); ++it)
- msg_out.append(*it);
- BITCOIN_ASSERT(in_parts.size() == msg_out.parts().size());
- msg_out.send(frontend);
- // Add worker back to available pool of workers.
- s_worker_append(queue, identity);
-}
-
-int main(int argc, char** argv)
-{
- s_version_assert(2, 1);
- config_map_type config;
- if (argc == 2)
- load_config(config, argv[1]);
- else
- {
- using boost::filesystem::path;
- path conf_filename = path(SYSCONFDIR) / "obelisk" / "balancer.cfg";
- load_config(config, conf_filename.native());
- }
-
- // Prepare our context and sockets
- zmq::context_t context(1);
- zmq::socket_t frontend(context, ZMQ_ROUTER);
- zmq::socket_t backend(context, ZMQ_ROUTER);
- // For clients
- frontend.bind(config["frontend"].c_str());
- // For workers
- backend.bind(config["backend"].c_str());
-
- // Queue of available workers
- worker_queue queue;
- // Send out heartbeats at regular intervals
- int64_t heartbeat_at = s_clock() + HEARTBEAT_INTERVAL;
-
- while (1)
- {
- zmq::pollitem_t items[] = {
- { backend, 0, ZMQ_POLLIN, 0 },
- { frontend, 0, ZMQ_POLLIN, 0 }
- };
-
- // Poll frontend only if we have available workers
- if (queue.empty())
- zmq::poll(items, 1, HEARTBEAT_INTERVAL * 1000);
- else
- zmq::poll(items, 2, HEARTBEAT_INTERVAL * 1000);
-
- // Handle worker activity on backend
- if (items [0].revents & ZMQ_POLLIN)
- passback_response(backend, frontend, queue);
- if (items [1].revents & ZMQ_POLLIN)
- forward_request(frontend, backend, queue);
-
- // Send heartbeats to idle workers if it's time
- if (s_clock() > heartbeat_at)
- {
- for (auto it = queue.begin(); it < queue.end(); ++it)
- {
- zmq_message msg;
- msg.append(it->identity);
- std::string command = "HEARTBEAT";
- msg.append(data_chunk(command.begin(), command.end()));
- msg.send(backend);
- }
- heartbeat_at = s_clock() + HEARTBEAT_INTERVAL;
- }
- s_queue_purge(queue);
- }
- // We never exit the main loop
- // But pretend to do the right shutdown anyhow
- queue.clear();
- return 0;
-}
-
diff --git a/src/balancer/config.cpp b/src/balancer/config.cpp
deleted file mode 100644
index 201cb69..0000000
--- a/src/balancer/config.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-#include "config.hpp"
-
-#include <boost/lexical_cast.hpp>
-#include <libconfig.h++>
-
-namespace obelisk {
-
-template <typename T>
-void get_value(const libconfig::Setting& root, config_map_type& config,
- const std::string& key_name, const T& fallback_value)
-{
- T value;
- if (root.lookupValue(key_name, value))
- config[key_name] = boost::lexical_cast<std::string>(value);
- else
- config[key_name] = boost::lexical_cast<std::string>(fallback_value);
-}
-
-void load_config(config_map_type& config, const std::string& filename)
-{
- std::cout << "Using config file: " << filename << std::endl;
- libconfig::Config cfg;
- // Ignore error if unable to read config file.
- try
- {
- cfg.readFile(filename.c_str());
- }
- catch (const libconfig::FileIOException&) {}
- catch (const libconfig::ParseException&) {}
- // Read off values
- const libconfig::Setting& root = cfg.getRoot();
- get_value<std::string>(root, config, "frontend", "tcp://*:9091");
- get_value<std::string>(root, config, "backend", "tcp://*:9092");
-}
-
-} // namespace obelisk
-
diff --git a/src/balancer/config.hpp b/src/balancer/config.hpp
deleted file mode 100644
index b9f719e..0000000
--- a/src/balancer/config.hpp
+++ /dev/null
@@ -1,15 +0,0 @@
-#ifndef OBELISK_WORKER_CONFIG_HPP
-#define OBELISK_WORKER_CONFIG_HPP
-
-#include <map>
-#include <string>
-
-namespace obelisk {
-
-typedef std::map<std::string, std::string> config_map_type;
-void load_config(config_map_type& config, const std::string& config_path);
-
-} // namespace obelisk
-
-#endif
-
diff --git a/src/client/backend.cpp b/src/client/backend.cpp
index 6dd9548..8292c10 100644
--- a/src/client/backend.cpp
+++ b/src/client/backend.cpp
@@ -14,13 +14,26 @@ constexpr size_t request_retries = 3;
const posix_time::time_duration request_timeout_init = seconds(30);
backend_cluster::backend_cluster(threadpool& pool,
- zmq::context_t& context, const std::string& connection)
+ czmqpp::context& context, const std::string& connection,
+ const std::string& cert_filename, const std::string& server_pubkey)
: context_(context), socket_(context_, ZMQ_DEALER), strand_(pool)
{
- socket_.connect(connection.c_str());
+ BITCOIN_ASSERT(socket_.self());
+ if (!server_pubkey.empty())
+ enable_crypto(cert_filename, server_pubkey);
+ // Connect
+ socket_.connect(connection);
// Configure socket to not wait at close time.
- int linger = 0;
- socket_.setsockopt(ZMQ_LINGER, &linger, sizeof (linger));
+ socket_.set_linger(0);
+}
+
+void backend_cluster::enable_crypto(
+ const std::string& cert_filename, const std::string& server_pubkey)
+{
+ cert_.reset(czmqpp::load_cert(cert_filename));
+ BITCOIN_ASSERT(cert_.self());
+ cert_.apply(socket_);
+ socket_.set_curve_serverkey(server_pubkey);
}
void backend_cluster::request(
@@ -40,17 +53,18 @@ void backend_cluster::request(
}
void backend_cluster::send(const outgoing_message& message)
{
+ BITCOIN_ASSERT(socket_.self());
message.send(socket_);
}
void backend_cluster::update()
{
// Poll socket for a reply, with timeout
- zmq::pollitem_t items[] = { { socket_, 0, ZMQ_POLLIN, 0 } };
- zmq::poll(&items[0], 1, 0);
+ czmqpp::poller poller(socket_);
+ BITCOIN_ASSERT(poller.self());
+ czmqpp::socket which = poller.wait(0);
// If we got a reply, process it
- if (items[0].revents & ZMQ_POLLIN)
- receive_incoming();
+ receive_incoming();
// Finally resend any expired requests that we didn't get
// a response to yet.
strand_.randomly_queue(
diff --git a/src/client/interface/blockchain.cpp b/src/client/interface/blockchain.cpp
index 1504cce..7c390ff 100644
--- a/src/client/interface/blockchain.cpp
+++ b/src/client/interface/blockchain.cpp
@@ -115,5 +115,50 @@ void wrap_fetch_transaction_index(const data_chunk& data,
handle_fetch(ec, block_height, index);
}
+void wrap_fetch_stealth(const data_chunk& data,
+ blockchain::fetch_handler_stealth handle_fetch);
+void blockchain_interface::fetch_stealth(const stealth_prefix& prefix,
+ blockchain::fetch_handler_stealth handle_fetch,
+ size_t from_height)
+{
+ data_chunk data(9);
+ auto serial = make_serializer(data.begin());
+ serial.write_byte(prefix.number_bits);
+ serial.write_4_bytes(prefix.bitfield);
+ serial.write_4_bytes(from_height);
+ BITCOIN_ASSERT(serial.iterator() == data.end());
+ backend_.request("blockchain.fetch_stealth", data,
+ std::bind(wrap_fetch_stealth, _1, handle_fetch));
+}
+void wrap_fetch_stealth(const data_chunk& data,
+ blockchain::fetch_handler_stealth handle_fetch)
+{
+ BITCOIN_ASSERT(data.size() >= 4);
+ std::error_code ec;
+ auto deserial = make_deserializer(data.begin(), data.end());
+ if (!read_error_code(deserial, data.size(), ec))
+ return;
+ BITCOIN_ASSERT(deserial.iterator() == data.begin() + 4);
+ size_t row_size = 33 + 21 + 32;
+ if ((data.size() - 4) % row_size != 0)
+ {
+ log_error() << "Malformed response for blockchain.fetch_stealth";
+ return;
+ }
+ size_t number_rows = (data.size() - 4) / row_size;
+ blockchain::stealth_list stealth_results(number_rows);
+ for (size_t i = 0; i < stealth_results.size(); ++i)
+ {
+ blockchain::stealth_row& row = stealth_results[i];
+ row.ephemkey = deserial.read_data(33);
+ uint8_t address_version = deserial.read_byte();
+ const short_hash address_hash = deserial.read_short_hash();
+ row.address.set(address_version, address_hash);
+ row.transaction_hash = deserial.read_hash();
+ }
+ BITCOIN_ASSERT(deserial.iterator() == data.end());
+ handle_fetch(ec, stealth_results);
+}
+
} // namespace obelisk
diff --git a/src/client/interface/fetch_x.cpp b/src/client/interface/fetch_x.cpp
index 95f4576..8079098 100644
--- a/src/client/interface/fetch_x.cpp
+++ b/src/client/interface/fetch_x.cpp
@@ -20,13 +20,13 @@ void wrap_fetch_history_args(data_chunk& data,
void receive_history_result(const data_chunk& data,
blockchain::fetch_handler_history handle_fetch)
{
+ BITCOIN_ASSERT(data.size() >= 4);
std::error_code ec;
auto deserial = make_deserializer(data.begin(), data.end());
if (!read_error_code(deserial, data.size(), ec))
return;
BITCOIN_ASSERT(deserial.iterator() == data.begin() + 4);
size_t row_size = 36 + 4 + 8 + 36 + 4;
- BITCOIN_ASSERT(data.size() >= 4);
if ((data.size() - 4) % row_size != 0)
{
log_error() << "Malformed response for *.fetch_history";
diff --git a/src/client/interface/interface.cpp b/src/client/interface/interface.cpp
index e93c3f9..457eafd 100644
--- a/src/client/interface/interface.cpp
+++ b/src/client/interface/interface.cpp
@@ -1,7 +1,6 @@
#include <obelisk/client/interface.hpp>
#include <bitcoin/bitcoin.hpp>
-#include <obelisk/zmq_message.hpp>
#include "fetch_x.hpp"
#include "util.hpp"
@@ -18,28 +17,21 @@ using posix_time::second_clock;
const posix_time::time_duration sub_renew = minutes(2);
-subscriber_part::subscriber_part(zmq::context_t& context)
- : context_(context)
+subscriber_part::subscriber_part(czmqpp::context& context)
+ : socket_block_(context, ZMQ_SUB), socket_tx_(context, ZMQ_SUB)
{
}
-bool subscriber_part::setup_socket(const std::string& connection,
- zmq_socket_uniqptr& socket)
+bool subscriber_part::setup_socket(
+ const std::string& connection, czmqpp::socket socket)
{
- socket.reset(new zmq::socket_t(context_, ZMQ_SUB));
- try
+ if (!socket.connect(connection))
{
- socket->connect(connection.c_str());
- socket->setsockopt(ZMQ_SUBSCRIBE, 0, 0);
- }
- catch (zmq::error_t error)
- {
- socket.reset();
log_warning(LOG_SUBSCRIBER)
<< "Subscriber failed to connect: " << connection;
- BITCOIN_ASSERT(error.num() != 0);
return false;
}
+ socket.set_subscribe("");
return true;
}
@@ -63,21 +55,17 @@ bool subscriber_part::subscribe_transactions(const std::string& connection,
void subscriber_part::update()
{
+ czmqpp::poller poller;
+ if (socket_tx_.self())
+ poller.add(socket_tx_);
+ if (socket_block_.self())
+ poller.add(socket_block_);
+ czmqpp::socket which = poller.wait(0);
// Poll socket for a reply, with timeout
- if (socket_tx_)
- {
- zmq::pollitem_t items[] = { { *socket_tx_, 0, ZMQ_POLLIN, 0 } };
- zmq::poll(&items[0], 1, 0);
- if (items[0].revents & ZMQ_POLLIN)
- recv_tx();
- }
- if (socket_block_)
- {
- zmq::pollitem_t items[] = { { *socket_block_, 0, ZMQ_POLLIN, 0 } };
- zmq::poll(&items[0], 1, 0);
- if (items[1].revents & ZMQ_POLLIN)
- recv_block();
- }
+ if (socket_tx_.self() && which == socket_tx_)
+ recv_tx();
+ if (socket_block_.self() && which == socket_block_)
+ recv_block();
}
bool read_hash(hash_digest& hash, const data_chunk& raw_hash)
@@ -93,8 +81,8 @@ bool read_hash(hash_digest& hash, const data_chunk& raw_hash)
void subscriber_part::recv_tx()
{
- zmq_message message;
- bool success = message.recv(*socket_tx_);
+ czmqpp::message message;
+ bool success = message.receive(socket_tx_);
BITCOIN_ASSERT(success);
// [ tx hash ]
// [ raw tx ]
@@ -122,8 +110,8 @@ void subscriber_part::recv_tx()
void subscriber_part::recv_block()
{
- zmq_message message;
- bool success = message.recv(*socket_block_);
+ czmqpp::message message;
+ bool success = message.receive(socket_block_);
BITCOIN_ASSERT(success);
// [ block hash ]
// [ height ]
@@ -277,8 +265,9 @@ void address_subscriber::fetch_history(const payment_address& address,
}
fullnode_interface::fullnode_interface(
- threadpool& pool, const std::string& connection)
- : context_(1), backend_(pool, context_, connection),
+ threadpool& pool, const std::string& connection,
+ const std::string& cert_filename, const std::string& server_pubkey)
+ : backend_(pool, context_, connection, cert_filename, server_pubkey),
blockchain(backend_), transaction_pool(backend_),
protocol(backend_), address(pool, backend_),
subscriber_(context_)
diff --git a/src/message.cpp b/src/message.cpp
index c92fbf9..e722e38 100644
--- a/src/message.cpp
+++ b/src/message.cpp
@@ -4,31 +4,25 @@
#include <bitcoin/format.hpp>
#include <bitcoin/utility/assert.hpp>
#include <bitcoin/utility/sha256.hpp>
-#include <obelisk/zmq_message.hpp>
namespace obelisk {
using namespace bc;
-constexpr uint32_t control_id = std::numeric_limits<uint32_t>::max();
-
-bool incoming_message::recv(zmq::socket_t& socket)
+bool incoming_message::recv(czmqpp::socket& socket)
{
- zmq_message message;
- message.recv(socket);
+ czmqpp::message message;
+ message.receive(socket);
const data_stack& parts = message.parts();
- if (parts.size() == 1)
- {
- id_ = control_id;
- command_ = std::string(parts[0].begin(), parts[0].end());
- return true;
- }
- else if (parts.size() != 5)
+ if (parts.size() != 3 && parts.size() != 4)
return false;
auto it = parts.begin();
- // [ DESTINATION ]
- origin_ = *it;
- ++it;
+ // [ DESTINATION ] (optional - ROUTER sockets strip this)
+ if (parts.size() == 4)
+ {
+ origin_ = *it;
+ ++it;
+ }
// [ COMMAND ]
const data_chunk& raw_command = *it;
command_ = std::string(raw_command.begin(), raw_command.end());
@@ -42,21 +36,10 @@ bool incoming_message::recv(zmq::socket_t& socket)
// [ DATA ]
data_ = *it;
++it;
- // [ CHECKSUM ]
- const data_chunk& raw_checksum = *it;
- uint32_t checksum = cast_chunk<uint32_t>(raw_checksum);
- if (checksum != generate_sha256_checksum(data_))
- return false;
- ++it;
BITCOIN_ASSERT(it == parts.end());
return true;
}
-bool incoming_message::is_signal() const
-{
- return id_ == std::numeric_limits<uint32_t>::max();
-}
-
const bc::data_chunk incoming_message::origin() const
{
return origin_;
@@ -93,28 +76,17 @@ outgoing_message::outgoing_message(
{
}
-outgoing_message::outgoing_message(const std::string& command)
- : id_(control_id), command_(command)
-{
-}
-
-void append_str(zmq_message& message, const std::string& command)
+void append_str(czmqpp::message& message, const std::string& command)
{
message.append(data_chunk(command.begin(), command.end()));
}
-void outgoing_message::send(zmq::socket_t& socket) const
+void outgoing_message::send(czmqpp::socket& socket) const
{
- zmq_message message;
- if (id_ == control_id)
- {
- // Control message. Don't bother with other fields.
- append_str(message, command_);
- message.send(socket);
- return;
- }
- // [ DEST ]
- message.append(dest_);
+ czmqpp::message message;
+ // [ DESTINATION ] (optional - ROUTER sockets strip this)
+ if (!dest_.empty())
+ message.append(dest_);
// [ COMMAND ]
append_str(message, command_);
// [ ID ]
@@ -123,10 +95,6 @@ void outgoing_message::send(zmq::socket_t& socket) const
message.append(raw_id);
// [ DATA ]
message.append(data_);
- // [ CHECKSUM ]
- data_chunk raw_checksum = uncast_type(generate_sha256_checksum(data_));
- BITCOIN_ASSERT(raw_checksum.size() == 4);
- message.append(raw_checksum);
// Send.
message.send(socket);
}
diff --git a/src/worker/config.cpp b/src/worker/config.cpp
index b6ca66a..4cf2285 100644
--- a/src/worker/config.cpp
+++ b/src/worker/config.cpp
@@ -27,6 +27,25 @@ void load_nodes(const libconfig::Setting& root, config_type& config)
catch (const libconfig::SettingNotFoundException&) {}
}
+void load_whitelist(const libconfig::Setting& root, config_type& config)
+{
+ try
+ {
+ const libconfig::Setting& setting = root["whitelist"];
+ for (size_t i = 0; i < setting.getLength(); ++i)
+ {
+ std::string address = (const char*)setting[i];
+ config.whitelist.push_back(address);
+ }
+ }
+ catch (const libconfig::SettingTypeException)
+ {
+ std::cerr << "Incorrectly formed whitelist setting in config."
+ << std::endl;
+ }
+ catch (const libconfig::SettingNotFoundException&) {}
+}
+
void load_config(config_type& config, const std::string& filename)
{
// Load values from config file.
@@ -46,9 +65,13 @@ void load_config(config_type& config, const std::string& filename)
root.lookupValue("blockchain-path", config.blockchain_path);
root.lookupValue("hosts-file", config.hosts_file);
root.lookupValue("service", config.service);
+ root.lookupValue("heartbeat", config.heartbeat);
root.lookupValue("publisher_enabled", config.publisher_enabled);
root.lookupValue("block-publish", config.block_publish);
root.lookupValue("tx-publish", config.tx_publish);
+ root.lookupValue("certificate", config.certificate);
+ root.lookupValue("client-allowed-certs", config.client_allowed_certs);
+ load_whitelist(root, config);
root.lookupValue("name", config.name);
root.lookupValue("outgoing-connections", config.outgoing_connections);
root.lookupValue("listener_enabled", config.listener_enabled);
diff --git a/src/worker/config.hpp b/src/worker/config.hpp
index c3cc99c..2cc61a3 100644
--- a/src/worker/config.hpp
+++ b/src/worker/config.hpp
@@ -16,15 +16,20 @@ struct node_config_object
struct config_type
{
typedef std::vector<node_config_object> nodes_list;
+ typedef std::vector<std::string> ipaddress_list;
std::string output_file = "debug.log";
std::string error_file = "error.log";
std::string blockchain_path = "blockchain/";
std::string hosts_file = "hosts";
- std::string service = "tcp://localhost:9092";
+ std::string service = "tcp://*:9091";
+ std::string heartbeat = "tcp://*:9092";
bool publisher_enabled = false;
std::string block_publish;
std::string tx_publish;
+ std::string certificate = "";
+ ipaddress_list whitelist;
+ std::string client_allowed_certs = "ALLOW_ALL_CERTS";
std::string name;
unsigned int outgoing_connections = 8;
bool listener_enabled = true;
diff --git a/src/worker/main.cpp b/src/worker/main.cpp
index b83f22f..9f93a82 100644
--- a/src/worker/main.cpp
+++ b/src/worker/main.cpp
@@ -16,11 +16,11 @@ using namespace obelisk;
using std::placeholders::_1;
using std::placeholders::_2;
+bool stopped = false;
void interrupt_handler(int)
{
echo() << "Stopping... Please wait.";
- // ZeroMQ will catch this signal and propagate it
- // as an exception in the main runloop below.
+ stopped = true;
}
int main(int argc, char** argv)
@@ -44,7 +44,11 @@ int main(int argc, char** argv)
publisher publish(node);
if (config.publisher_enabled)
if (!publish.start(config))
+ {
+ std::cerr << "Failed to start publisher: "
+ << zmq_strerror(zmq_errno()) << std::endl;
return 1;
+ }
// Address subscriptions
subscribe_manager addr_sub(node);
// Attach commands
@@ -71,6 +75,7 @@ int main(int argc, char** argv)
blockchain_fetch_transaction_index);
attach("blockchain.fetch_spend", blockchain_fetch_spend);
attach("blockchain.fetch_block_height", blockchain_fetch_block_height);
+ attach("blockchain.fetch_stealth", blockchain_fetch_stealth);
attach("protocol.broadcast_transaction", protocol_broadcast_transaction);
attach("transaction_pool.validate", transaction_pool_validate);
attach("transaction_pool.fetch_transaction",
@@ -78,33 +83,21 @@ int main(int argc, char** argv)
// Start the node last so that all subscriptions to new blocks
// don't miss anything.
if (!node.start(config))
+ {
+ std::cerr << "Failed to start Bitcoin node." << std::endl;
return 1;
+ }
echo() << "Node started.";
signal(SIGINT, interrupt_handler);
- while (true)
- {
- try
- {
- worker.update();
- }
- catch (zmq::error_t error)
- {
- // SIGINT caught.
- if (error.num() == EINTR)
- break;
- else
- {
- log_error(LOG_WORKER) << "ZMQ: " << error.what();
- echo() << "Closing down because of error.";
- throw;
- }
- }
- }
+ // Main loop.
+ while (!stopped)
+ worker.update();
worker.stop();
if (config.publisher_enabled)
publish.stop();
if (!node.stop())
return -1;
+ echo() << "Node shutdown cleanly.";
return 0;
}
diff --git a/src/worker/node_impl.cpp b/src/worker/node_impl.cpp
index c140fe5..5670348 100644
--- a/src/worker/node_impl.cpp
+++ b/src/worker/node_impl.cpp
@@ -253,7 +253,7 @@ void node_impl::handle_mempool_store(
if (ec)
{
log_warning()
- << "Error storing memory pool transaction "
+ << "Failed to store transaction in mempool "
<< hash_transaction(tx) << ": " << ec.message();
return;
}
diff --git a/src/worker/publisher.cpp b/src/worker/publisher.cpp
index 99048a7..02ac145 100644
--- a/src/worker/publisher.cpp
+++ b/src/worker/publisher.cpp
@@ -1,6 +1,5 @@
#include "publisher.hpp"
-#include <obelisk/zmq_message.hpp>
#include "echo.hpp"
#define LOG_PUBLISHER LOG_WORKER
@@ -12,20 +11,18 @@ using std::placeholders::_1;
using std::placeholders::_2;
publisher::publisher(node_impl& node)
- : context_(1), node_(node)
+ : node_(node),
+ socket_block_(context_, ZMQ_PUB),
+ socket_tx_(context_, ZMQ_PUB)
{
}
-bool publisher::setup_socket(const std::string& connection,
- zmq_socket_uniqptr& socket)
+bool publisher::setup_socket(
+ const std::string& connection, czmqpp::socket& socket)
{
- if (connection != "")
- {
- socket.reset(new zmq::socket_t(context_, ZMQ_PUB));
- socket->bind(connection.c_str());
+ if (connection.empty())
return true;
- }
- return false;
+ return socket.bind(connection) != -1;
}
bool publisher::start(config_type& config)
@@ -48,15 +45,7 @@ bool publisher::stop()
return true;
}
-bool send_raw(const bc::data_chunk& raw,
- zmq::socket_t& socket, bool send_more=false)
-{
- zmq::message_t message(raw.size());
- memcpy(message.data(), raw.data(), raw.size());
- return socket.send(message, send_more ? ZMQ_SNDMORE : 0);
-}
-
-void append_hash(zmq_message& message, const hash_digest& hash)
+void append_hash(czmqpp::message& message, const hash_digest& hash)
{
message.append(data_chunk(hash.begin(), hash.end()));
}
@@ -74,7 +63,7 @@ bool publisher::send_blk(uint32_t height, const block_type& blk)
// hash [32 bytes]
// txs size [4 bytes]
// ... txs ...
- zmq_message message;
+ czmqpp::message message;
message.append(raw_height);
append_hash(message, hash_block_header(blk.header));
message.append(raw_blk_header);
@@ -85,7 +74,7 @@ bool publisher::send_blk(uint32_t height, const block_type& blk)
for (const bc::transaction_type& tx: blk.transactions)
append_hash(message, hash_transaction(tx));
// Finished. Send message.
- if (!message.send(*socket_block_))
+ if (!message.send(socket_block_))
{
log_warning(LOG_PUBLISHER) << "Problem publishing block data.";
return false;
@@ -97,10 +86,10 @@ bool publisher::send_tx(const transaction_type& tx)
{
data_chunk raw_tx(bc::satoshi_raw_size(tx));
satoshi_save(tx, raw_tx.begin());
- zmq_message message;
+ czmqpp::message message;
append_hash(message, hash_transaction(tx));
message.append(raw_tx);
- if (!message.send(*socket_tx_))
+ if (!message.send(socket_tx_))
{
log_warning(LOG_PUBLISHER) << "Problem publishing tx data.";
return false;
diff --git a/src/worker/publisher.hpp b/src/worker/publisher.hpp
index 7236b6e..ca18d45 100644
--- a/src/worker/publisher.hpp
+++ b/src/worker/publisher.hpp
@@ -1,7 +1,7 @@
#ifndef QUERY_PUBLISHER_HPP
#define QUERY_PUBLISHER_HPP
-#include <zmq.hpp>
+#include <czmq++/czmq.hpp>
#include <bitcoin/bitcoin.hpp>
#include "config.hpp"
@@ -17,17 +17,15 @@ public:
bool stop();
private:
- typedef std::unique_ptr<zmq::socket_t> zmq_socket_uniqptr;
-
- bool setup_socket(const std::string& connection,
- zmq_socket_uniqptr& socket);
+ bool setup_socket(
+ const std::string& connection, czmqpp::socket& socket);
bool send_blk(uint32_t height, const bc::block_type& blk);
bool send_tx(const bc::transaction_type& tx);
- zmq::context_t context_;
node_impl& node_;
- zmq_socket_uniqptr socket_block_, socket_tx_;
+ czmqpp::context context_;
+ czmqpp::socket socket_block_, socket_tx_;
};
} // namespace obelisk
diff --git a/src/worker/service/blockchain.cpp b/src/worker/service/blockchain.cpp
index 79598ff..597d97c 100644
--- a/src/worker/service/blockchain.cpp
+++ b/src/worker/service/blockchain.cpp
@@ -292,5 +292,49 @@ void block_height_fetched(const std::error_code& ec, size_t block_height,
queue_send(response);
}
+void stealth_fetched(const std::error_code& ec,
+ const blockchain::stealth_list& stealth_results,
+ const incoming_message& request, queue_send_callback queue_send);
+void blockchain_fetch_stealth(node_impl& node,
+ const incoming_message& request, queue_send_callback queue_send)
+{
+ const data_chunk& data = request.data();
+ if (data.size() != 9)
+ {
+ log_error(LOG_WORKER)
+ << "Incorrect data size for blockchain.fetch_stealth";
+ return;
+ }
+ auto deserial = make_deserializer(data.begin(), data.end());
+ stealth_prefix prefix;
+ prefix.number_bits = deserial.read_byte();
+ prefix.bitfield = deserial.read_4_bytes();
+ size_t from_height = deserial.read_4_bytes();
+ node.blockchain().fetch_stealth(prefix,
+ std::bind(stealth_fetched, _1, _2, request, queue_send), from_height);
+}
+void stealth_fetched(const std::error_code& ec,
+ const blockchain::stealth_list& stealth_results,
+ const incoming_message& request, queue_send_callback queue_send)
+{
+ // [ ephemkey:33 ][ address:21 ][ tx_hash:32 ]
+ constexpr size_t row_size = 33 + 21 + 32;
+ data_chunk result(4 + row_size * stealth_results.size());
+ auto serial = make_serializer(result.begin());
+ write_error_code(serial, ec);
+ BITCOIN_ASSERT(serial.iterator() == result.begin() + 4);
+ for (const blockchain::stealth_row row: stealth_results)
+ {
+ serial.write_data(row.ephemkey);
+ serial.write_byte(row.address.version());
+ serial.write_short_hash(row.address.hash());
+ serial.write_hash(row.transaction_hash);
+ }
+ log_debug(LOG_REQUEST)
+ << "blockchain.fetch_stealth() finished. Sending response.";
+ outgoing_message response(request, result);
+ queue_send(response);
+}
+
} // namespace obelisk
diff --git a/src/worker/service/blockchain.hpp b/src/worker/service/blockchain.hpp
index 2559b32..c5453ca 100644
--- a/src/worker/service/blockchain.hpp
+++ b/src/worker/service/blockchain.hpp
@@ -31,6 +31,9 @@ void blockchain_fetch_spend(node_impl& node,
void blockchain_fetch_block_height(node_impl& node,
const incoming_message& request, queue_send_callback queue_send);
+void blockchain_fetch_stealth(node_impl& node,
+ const incoming_message& request, queue_send_callback queue_send);
+
} // namespace obelisk
#endif
diff --git a/src/worker/service/fetch_x.cpp b/src/worker/service/fetch_x.cpp
index 6dff974..d733bac 100644
--- a/src/worker/service/fetch_x.cpp
+++ b/src/worker/service/fetch_x.cpp
@@ -32,7 +32,7 @@ bool send_history_result(const std::error_code& ec,
const blockchain::history_list& history,
const incoming_message& request, queue_send_callback queue_send)
{
- size_t row_size = 36 + 4 + 8 + 36 + 4;
+ constexpr size_t row_size = 36 + 4 + 8 + 36 + 4;
data_chunk result(4 + row_size * history.size());
auto serial = make_serializer(result.begin());
write_error_code(serial, ec);
diff --git a/src/worker/worker.cfg b/src/worker/worker.cfg
index e77cf49..478e62c 100644
--- a/src/worker/worker.cfg
+++ b/src/worker/worker.cfg
@@ -12,11 +12,24 @@ hosts-file = "hosts"
publisher_enabled = false
block-publish = "tcp://*:9093"
tx-publish = "tcp://*:9094"
-# Connection to the balancer.
-# Which port to connect to the load balancer on.
-service = "tcp://localhost:9092"
+# Which port for clients to connect to.
+service = "tcp://*:9091"
+# And the port where we send our heartbeats.
+heartbeat = "tcp://*:9092"
+# ----------------------
# Advanced features:
+# ----------------------
+
+#certificate = "server.cert"
+#whitelist = (
+# "127.0.0.1"
+#)
+# Directory containing allowed client public certificates.
+# Comment out to allow all clients to connect.
+# New certs can be added without needing to restart the server.
+#client-allowed-certs = "client-certs/"
+
# Uncomment to give this worker a named UUID. Must be unique.
#name = "ada"
# Number of outgoing network connections to p2p network.
diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp
index 84998c7..19f3a2c 100644
--- a/src/worker/worker.cpp
+++ b/src/worker/worker.cpp
@@ -2,12 +2,8 @@
#include <bitcoin/format.hpp>
#include <bitcoin/utility/logger.hpp>
-#include <obelisk/zmq_message.hpp>
#include "echo.hpp"
-// Needed for the ZMQ version macros below.
-#include <zmq.h>
-
namespace obelisk {
using namespace bc;
@@ -17,81 +13,83 @@ using posix_time::milliseconds;
using posix_time::seconds;
using posix_time::microsec_clock;
-const posix_time::time_duration heartbeat_interval = milliseconds(1000);
-constexpr size_t interval_init = 4, interval_max = 32;
-
-#if ZMQ_VERSION_MAJOR >= 3
- // Milliseconds
- constexpr long poll_sleep_interval = 500;
-#elif ZMQ_VERSION_MAJOR == 2
- // Microseconds
- constexpr long poll_sleep_interval = 500000;
-#else
- #error ZMQ_VERSION_MAJOR macro undefined.
-#endif
+const posix_time::time_duration heartbeat_interval = milliseconds(4000);
+// Milliseconds
+constexpr long poll_sleep_interval = 1000;
auto now = []() { return microsec_clock::universal_time(); };
-socket_factory::socket_factory()
- : context(1)
-{
-}
-zmq_socket_ptr socket_factory::spawn_socket()
-{
- zmq_socket_ptr socket =
- std::make_shared<zmq::socket_t>(context, ZMQ_DEALER);
- // Set the socket identity name.
- if (!name.empty())
- socket->setsockopt(ZMQ_IDENTITY, name.c_str(), name.size());
- // Connect...
- socket->connect(connection.c_str());
- // Configure socket to not wait at close time
- int linger = 0;
- socket->setsockopt(ZMQ_LINGER, &linger, sizeof (linger));
- return socket;
-}
-
-send_worker::send_worker(zmq::context_t& context)
+send_worker::send_worker(czmqpp::context& context)
: context_(context)
{
}
void send_worker::queue_send(const outgoing_message& message)
{
- zmq::socket_t queue_socket(context_, ZMQ_PUSH);
+ czmqpp::socket queue_socket(context_, ZMQ_PUSH);
queue_socket.connect("inproc://trigger-send");
message.send(queue_socket);
}
request_worker::request_worker()
- : sender_(factory_.context),
- wakeup_socket_(factory_.context, ZMQ_PULL)
+ : auth_(context_),
+ sender_(context_),
+ socket_(context_, ZMQ_ROUTER),
+ wakeup_socket_(context_, ZMQ_PULL),
+ heartbeat_socket_(context_, ZMQ_PUB)
{
wakeup_socket_.bind("inproc://trigger-send");
}
bool request_worker::start(config_type& config)
{
// Load config values.
- factory_.connection = config.service;
- factory_.name = config.name;
log_requests_ = config.log_requests;
- // Start ZeroMQ socket.
- create_new_socket();
+ if (log_requests_)
+ auth_.set_verbose(true);
+ if (!config.whitelist.empty())
+ whitelist(config.whitelist);
+ if (config.certificate.empty())
+ socket_.set_zap_domain("global");
+ else
+ enable_crypto(config);
+ // Start ZeroMQ sockets.
+ create_new_socket(config);
+ log_debug(LOG_WORKER) << "Heartbeat: " << config.heartbeat;
+ heartbeat_socket_.bind(config.heartbeat);
// Timer stuff
- last_heartbeat_ = now();
heartbeat_at_ = now() + heartbeat_interval;
- interval_ = interval_init;
+ return true;
}
void request_worker::stop()
{
}
-void request_worker::create_new_socket()
+void request_worker::whitelist(config_type::ipaddress_list& addrs)
{
- log_debug(LOG_WORKER) << "Connecting: " << factory_.connection;
- socket_ = factory_.spawn_socket();
+ for (const std::string& ip_address: addrs)
+ auth_.allow(ip_address);
+}
+void request_worker::enable_crypto(config_type& config)
+{
+ if (config.client_allowed_certs == "ALLOW_ALL_CERTS")
+ auth_.configure_curve("*", CURVE_ALLOW_ANY);
+ else
+ auth_.configure_curve("*", config.client_allowed_certs);
+ cert_.reset(czmqpp::load_cert(config.certificate));
+ cert_.apply(socket_);
+ socket_.set_curve_server(1);
+}
+void request_worker::create_new_socket(config_type& config)
+{
+ log_debug(LOG_WORKER) << "Listening: " << config.service;
+ // Set the socket identity name.
+ if (!config.name.empty())
+ socket_.set_identity(config.name.c_str());
+ // Connect...
+ socket_.bind(config.service);
+ // Configure socket to not wait at close time
+ socket_.set_linger(0);
// Tell queue we're ready for work
log_info(LOG_WORKER) << "worker ready";
- send_control_message("READY");
}
void request_worker::attach(
@@ -108,87 +106,59 @@ void request_worker::update()
void request_worker::poll()
{
// Poll for network updates.
- zmq::pollitem_t items [] = {
- { *socket_, 0, ZMQ_POLLIN, 0 },
- { wakeup_socket_, 0, ZMQ_POLLIN, 0 } };
- int rc = zmq::poll(items, 2, poll_sleep_interval);
- BITCOIN_ASSERT(rc >= 0);
-
- // TODO: refactor this code.
+ czmqpp::poller poller(socket_, wakeup_socket_);
+ BITCOIN_ASSERT(poller.self());
+ czmqpp::socket which = poller.wait(poll_sleep_interval);
- if (items[0].revents & ZMQ_POLLIN)
+ BITCOIN_ASSERT(socket_.self() && wakeup_socket_.self());
+ if (which == socket_)
{
// Get message
- // - 6-part envelope + content -> request
- // - 1-part "HEARTBEAT" -> heartbeat
+ // 6-part envelope + content -> request
incoming_message request;
- request.recv(*socket_);
+ request.recv(socket_);
- if (!request.is_signal())
- {
- last_heartbeat_ = now();
- auto it = handlers_.find(request.command());
- // Perform request if found.
- if (it != handlers_.end())
- {
- if (log_requests_)
- log_debug(LOG_REQUEST)
- << request.command() << " from " << request.origin();
- it->second(request,
- std::bind(&send_worker::queue_send, &sender_, _1));
- }
- else
- {
- log_warning(LOG_WORKER)
- << "Unhandled request: " << request.command()
- << " from " << request.origin();
- }
- }
- else if (request.command() == "HEARTBEAT")
+ auto it = handlers_.find(request.command());
+ // Perform request if found.
+ if (it != handlers_.end())
{
- log_debug(LOG_WORKER) << "Received heartbeat";
- last_heartbeat_ = now();
+ if (log_requests_)
+ log_debug(LOG_REQUEST)
+ << request.command() << " from " << request.origin();
+ it->second(request,
+ std::bind(&send_worker::queue_send, &sender_, _1));
}
else
{
- log_warning(LOG_WORKER) << "invalid message";
+ log_warning(LOG_WORKER)
+ << "Unhandled request: " << request.command()
+ << " from " << request.origin();
}
- interval_ = interval_init;
}
- else if (items[1].revents & ZMQ_POLLIN)
+ else if (which == wakeup_socket_)
{
// Send queued message.
- zmq_message message;
- message.recv(wakeup_socket_);
- message.send(*socket_);
- }
- else if (now() - last_heartbeat_ > seconds(interval_))
- {
- log_warning(LOG_WORKER) << "heartbeat failure, can't reach queue";
- log_warning(LOG_WORKER) << "reconnecting in "
- << interval_ << " seconds...";
-
- if (interval_ < interval_max)
- {
- interval_ *= 2;
- }
- create_new_socket();
- last_heartbeat_ = now();
+ czmqpp::message message;
+ message.receive(wakeup_socket_);
+ message.send(socket_);
}
- // Send heartbeat to queue if it's time
+ // Publish heartbeat.
if (now() > heartbeat_at_)
{
heartbeat_at_ = now() + heartbeat_interval;
log_debug(LOG_WORKER) << "Sending heartbeat";
- send_control_message("HEARTBEAT");
+ publish_heartbeat();
}
}
-void request_worker::send_control_message(const std::string& command)
+void request_worker::publish_heartbeat()
{
- outgoing_message message(command);
- sender_.queue_send(message);
+ static uint32_t counter = 0;
+ czmqpp::message message;
+ message.append(uncast_type(counter));
+ message.send(heartbeat_socket_);
+ ++counter;
}
} // namespace obelisk
diff --git a/src/worker/worker.hpp b/src/worker/worker.hpp
index 70a0f88..d8bd3cc 100644
--- a/src/worker/worker.hpp
+++ b/src/worker/worker.hpp
@@ -5,8 +5,8 @@
#include <mutex>
#include <thread>
#include <unordered_map>
-#include <zmq.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
+#include <czmq++/czmq.hpp>
#include <obelisk/message.hpp>
#include "config.hpp"
#include "lockless_queue.hpp"
@@ -14,16 +14,6 @@
namespace obelisk {
-struct socket_factory
-{
- socket_factory();
- zmq_socket_ptr spawn_socket();
-
- zmq::context_t context;
- std::string connection;
- std::string name;
-};
-
/**
* We don't want to block the originating threads that execute a send
* as that would slow down requests if they all have to sync access
@@ -41,13 +31,13 @@ struct socket_factory
class send_worker
{
public:
- send_worker(zmq::context_t& context);
+ send_worker(czmqpp::context& context);
void queue_send(const outgoing_message& message);
private:
typedef lockless_queue<outgoing_message> send_message_queue;
- zmq::context_t& context_;
+ czmqpp::context& context_;
// When the send is ready, then the sending thread is woken up.
send_message_queue send_queue_;
};
@@ -67,20 +57,25 @@ public:
private:
typedef std::unordered_map<std::string, command_handler> command_map;
- void create_new_socket();
+ void whitelist(config_type::ipaddress_list& addrs);
+ void enable_crypto(config_type& config);
+ void create_new_socket(config_type& config);
void poll();
- void send_control_message(const std::string& command);
+ void publish_heartbeat();
- socket_factory factory_;
+ czmqpp::context context_;
// Main socket.
- zmq_socket_ptr socket_;
+ czmqpp::socket socket_;
+ czmqpp::certificate cert_;
+ czmqpp::authenticator auth_;
// Socket to trigger wakeup for send.
- zmq::socket_t wakeup_socket_;
+ czmqpp::socket wakeup_socket_;
+ // We publish a heartbeat every so often so clients
+ // can know our availability.
+ czmqpp::socket heartbeat_socket_;
- boost::posix_time::ptime last_heartbeat_;
// Send out heartbeats at regular intervals
boost::posix_time::ptime heartbeat_at_;
- size_t interval_;
command_map handlers_;
send_worker sender_;
diff --git a/src/zmq_message.cpp b/src/zmq_message.cpp
deleted file mode 100644
index ea219bd..0000000
--- a/src/zmq_message.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-#include <obelisk/zmq_message.hpp>
-
-#include <bitcoin/format.hpp>
-#include <bitcoin/utility/assert.hpp>
-
-namespace obelisk {
-
-using namespace bc;
-
-void zmq_message::append(const data_chunk& part)
-{
- parts_.push_back(part);
-}
-
-bool zmq_message::send(zmq::socket_t& socket) const
-{
- bool send_more = true;
- for (auto it = parts_.begin(); it != parts_.end(); ++it)
- {
- BITCOIN_ASSERT(send_more);
- const data_chunk& data = *it;
- zmq::message_t message(data.size());
- uint8_t* message_data = reinterpret_cast<uint8_t*>(message.data());
- std::copy(data.begin(), data.end(), message_data);
- if (it == parts_.end() - 1)
- send_more = false;
- try
- {
- if (!socket.send(message, send_more ? ZMQ_SNDMORE : 0))
- return false;
- }
- catch (zmq::error_t error)
- {
- BITCOIN_ASSERT(error.num() != 0);
- return false;
- }
- }
- return true;
-}
-
-bool zmq_message::recv(zmq::socket_t& socket)
-{
- int64_t more = 1;
- while (more)
- {
- zmq::message_t message(0);
- try
- {
- if (!socket.recv(&message, 0))
- return false;
- }
- catch (zmq::error_t error)
- {
- BITCOIN_ASSERT(error.num() != 0);
- return false;
- }
- uint8_t* msg_begin = reinterpret_cast<uint8_t*>(message.data());
- data_chunk data(msg_begin, msg_begin + message.size());
- parts_.push_back(data);
- size_t more_size = sizeof (more);
- socket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
- }
- return true;
-}
-
-const data_stack& zmq_message::parts() const
-{
- return parts_;
-}
-
-} // namespace obelisk
-