summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorevoskuil <eric@voskuil.org>2014-03-18 10:48:36 (GMT)
committer evoskuil <eric@voskuil.org>2014-03-18 10:48:36 (GMT)
commitf9243ed96b6e65effc57cca1a7f48d90bbf4462f (patch)
treeeb5417ba816de0e7c5f693dbc859f0d42e71b98d
parent26c4702836c1294212981d546a07abc65cd44129 (diff)
parent3b0f408c804304de5f357dad137c817f154b8186 (diff)
Merge branch 'evoskuil/master'
-rw-r--r--include/obelisk/client/interface.hpp5
-rw-r--r--src/client/interface/interface.cpp2
-rw-r--r--src/worker/lockless_queue.hpp138
-rw-r--r--src/worker/worker.cpp15
-rw-r--r--src/worker/worker.hpp12
5 files changed, 16 insertions, 156 deletions
diff --git a/include/obelisk/client/interface.hpp b/include/obelisk/client/interface.hpp
index d009827..c150e85 100644
--- a/include/obelisk/client/interface.hpp
+++ b/include/obelisk/client/interface.hpp
@@ -30,7 +30,7 @@ public:
private:
bool setup_socket(
- const std::string& connection, czmqpp::socket socket);
+ const std::string& connection, czmqpp::socket& socket);
void recv_tx();
void recv_block();
@@ -106,7 +106,8 @@ class fullnode_interface
{
public:
fullnode_interface(bc::threadpool& pool, const std::string& connection,
- const std::string& cert_filename, const std::string& server_pubkey);
+ const std::string& cert_filename="",
+ const std::string& server_pubkey="");
// Non-copyable
fullnode_interface(const fullnode_interface&) = delete;
diff --git a/src/client/interface/interface.cpp b/src/client/interface/interface.cpp
index 457eafd..607203a 100644
--- a/src/client/interface/interface.cpp
+++ b/src/client/interface/interface.cpp
@@ -23,7 +23,7 @@ subscriber_part::subscriber_part(czmqpp::context& context)
}
bool subscriber_part::setup_socket(
- const std::string& connection, czmqpp::socket socket)
+ const std::string& connection, czmqpp::socket& socket)
{
if (!socket.connect(connection))
{
diff --git a/src/worker/lockless_queue.hpp b/src/worker/lockless_queue.hpp
deleted file mode 100644
index d6b5b0e..0000000
--- a/src/worker/lockless_queue.hpp
+++ /dev/null
@@ -1,138 +0,0 @@
-#ifndef OBELISK_WORKER_LOCKLESS_QUEUE_HPP
-#define OBELISK_WORKER_LOCKLESS_QUEUE_HPP
-
-#include <atomic>
-
-namespace obelisk {
-
-/**
- * Lockless queue. Multiple producers, single consumer.
- *
- * @code
- * lockless_queue<int> s;
- * s.produce(1);
- * s.produce(2);
- * s.produce(3);
- * for (auto h: lockless_iterable(s))
- * {
- * std::cout << h << std::endl;
- * }
- * @endcode
- */
-template<typename T>
-class lockless_queue
-{
-public:
- template<typename NodeDataType>
- struct node
- {
- // Enables people to access data type for this node externally.
- typedef NodeDataType DataType;
- node(const DataType& data)
- : data(data), next(nullptr) {}
- DataType data;
- node* next;
- };
-
- typedef node<T> NodeType;
-
- lockless_queue()
- : lock_(false), head_(nullptr) {}
-
- void produce(const T &data)
- {
- NodeType* new_node = new NodeType(data);
- while (lock_.exchange(true)) {}
- new_node->next = head_;
- head_ = new_node;
- lock_ = false;
- }
-
- NodeType* consume_all()
- {
- while (lock_.exchange(true)) {}
- NodeType* ret_node = head_;
- head_ = nullptr;
- lock_ = false;
- return ret_node;
- }
-private:
- std::atomic<bool> lock_;
- NodeType* head_;
-};
-
-// Classes to allow iteration over the consumed items.
-// These mutate as they're iterated, so they only support
-// basic iteration from beginning to end.
-
-/**
- * Iterator for lockless queue which deletes consumed items
- * as they iterated over.
- * Once iteration is complete, memory should be cleared.
- */
-template <typename NodeType>
-class lockless_iterator
-{
-public:
- lockless_iterator(NodeType* ptr)
- : ptr_(ptr) {}
-
- template <typename LockLessIterator>
- bool operator!=(const LockLessIterator& other)
- {
- return ptr_ != other.ptr_;
- }
-
- typename NodeType::DataType operator*() const
- {
- return ptr_->data;
- }
-
- void operator++()
- {
- NodeType* tmp = ptr_->next;
- delete ptr_;
- ptr_ = tmp;
- }
-private:
- NodeType* ptr_;
-};
-
-template <typename LockLessQueue>
-class lockless_iterable_dispatch
-{
-public:
- typedef typename LockLessQueue::NodeType NodeType;
-
- lockless_iterable_dispatch(LockLessQueue& queue)
- : head_(queue.consume_all()) {}
-
- lockless_iterator<NodeType> begin()
- {
- return lockless_iterator<NodeType>(head_);
- }
-
- lockless_iterator<NodeType> end()
- {
- return lockless_iterator<NodeType>(nullptr);
- }
-private:
- NodeType* head_;
-};
-
-// This is needed so that when creating the iterable object,
-// we don't need to specify the template arguments.
-// Functions are able to deduce the template args from the param list.
-template <typename LockLessQueue>
-lockless_iterable_dispatch<
- LockLessQueue
->
-lockless_iterable(LockLessQueue& queue)
-{
- return lockless_iterable_dispatch<LockLessQueue>(queue);
-}
-
-} // namespace obelisk
-
-#endif
-
diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp
index 19f3a2c..0ba8a77 100644
--- a/src/worker/worker.cpp
+++ b/src/worker/worker.cpp
@@ -25,9 +25,12 @@ send_worker::send_worker(czmqpp::context& context)
}
void send_worker::queue_send(const outgoing_message& message)
{
- czmqpp::socket queue_socket(context_, ZMQ_PUSH);
- queue_socket.connect("inproc://trigger-send");
- message.send(queue_socket);
+ czmqpp::socket socket(context_, ZMQ_PUSH);
+ BITCOIN_ASSERT(socket.self());
+ int rc = socket.connect("inproc://trigger-send");
+ BITCOIN_ASSERT(rc == 0);
+ message.send(socket);
+ socket.destroy(context_);
}
request_worker::request_worker()
@@ -37,7 +40,11 @@ request_worker::request_worker()
wakeup_socket_(context_, ZMQ_PULL),
heartbeat_socket_(context_, ZMQ_PUB)
{
- wakeup_socket_.bind("inproc://trigger-send");
+ BITCOIN_ASSERT(socket_.self());
+ BITCOIN_ASSERT(wakeup_socket_.self());
+ BITCOIN_ASSERT(heartbeat_socket_.self());
+ int rc = wakeup_socket_.bind("inproc://trigger-send");
+ BITCOIN_ASSERT(rc != -1);
}
bool request_worker::start(config_type& config)
{
diff --git a/src/worker/worker.hpp b/src/worker/worker.hpp
index d8bd3cc..f9dc904 100644
--- a/src/worker/worker.hpp
+++ b/src/worker/worker.hpp
@@ -9,7 +9,6 @@
#include <czmq++/czmq.hpp>
#include <obelisk/message.hpp>
#include "config.hpp"
-#include "lockless_queue.hpp"
#include "service/util.hpp"
namespace obelisk {
@@ -19,14 +18,9 @@ namespace obelisk {
* as that would slow down requests if they all have to sync access
* to a single socket.
*
- * Instead we have a lockless queue where send requests are pushed,
+ * Instead we have a queue (push socket) where send requests are pushed,
* and then the send_worker is notified. The worker wakes up and pushes
* all pending requests to the socket.
- *
- * We have to manage reconnects, so there's still the need to sync access
- * to the socket which is shared with the receiving request_worker.
- * This is however only for set_socket() and queue_send() so thread
- * contention is kept to the minimum and sending is (mostly) lockfree.
*/
class send_worker
{
@@ -35,11 +29,7 @@ public:
void queue_send(const outgoing_message& message);
private:
- typedef lockless_queue<outgoing_message> send_message_queue;
-
czmqpp::context& context_;
- // When the send is ready, then the sending thread is woken up.
- send_message_queue send_queue_;
};
class request_worker