summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAmir Taaki <genjix@riseup.net>2014-03-17 01:21:36 (GMT)
committer Amir Taaki <genjix@riseup.net>2014-03-17 01:21:36 (GMT)
commit789ca3cccbba7f1367aeea58d107f9d480f641b8 (patch)
tree6cd327974ff356567ed5c213b5c7c5e99bf413f2
parente626e1c1ca44fb2c5dfa5a0285e82b7504827624 (diff)
avoid creating socket repeatedly and trim old fluff from code.
-rw-r--r--src/worker/lockless_queue.hpp138
-rw-r--r--src/worker/worker.cpp15
-rw-r--r--src/worker/worker.hpp14
3 files changed, 12 insertions, 155 deletions
diff --git a/src/worker/lockless_queue.hpp b/src/worker/lockless_queue.hpp
deleted file mode 100644
index d6b5b0e..0000000
--- a/src/worker/lockless_queue.hpp
+++ /dev/null
@@ -1,138 +0,0 @@
-#ifndef OBELISK_WORKER_LOCKLESS_QUEUE_HPP
-#define OBELISK_WORKER_LOCKLESS_QUEUE_HPP
-
-#include <atomic>
-
-namespace obelisk {
-
-/**
- * Lockless queue. Multiple producers, single consumer.
- *
- * @code
- * lockless_queue<int> s;
- * s.produce(1);
- * s.produce(2);
- * s.produce(3);
- * for (auto h: lockless_iterable(s))
- * {
- * std::cout << h << std::endl;
- * }
- * @endcode
- */
-template<typename T>
-class lockless_queue
-{
-public:
- template<typename NodeDataType>
- struct node
- {
- // Enables people to access data type for this node externally.
- typedef NodeDataType DataType;
- node(const DataType& data)
- : data(data), next(nullptr) {}
- DataType data;
- node* next;
- };
-
- typedef node<T> NodeType;
-
- lockless_queue()
- : lock_(false), head_(nullptr) {}
-
- void produce(const T &data)
- {
- NodeType* new_node = new NodeType(data);
- while (lock_.exchange(true)) {}
- new_node->next = head_;
- head_ = new_node;
- lock_ = false;
- }
-
- NodeType* consume_all()
- {
- while (lock_.exchange(true)) {}
- NodeType* ret_node = head_;
- head_ = nullptr;
- lock_ = false;
- return ret_node;
- }
-private:
- std::atomic<bool> lock_;
- NodeType* head_;
-};
-
-// Classes to allow iteration over the consumed items.
-// These mutate as they're iterated, so they only support
-// basic iteration from beginning to end.
-
-/**
- * Iterator for lockless queue which deletes consumed items
- * as they iterated over.
- * Once iteration is complete, memory should be cleared.
- */
-template <typename NodeType>
-class lockless_iterator
-{
-public:
- lockless_iterator(NodeType* ptr)
- : ptr_(ptr) {}
-
- template <typename LockLessIterator>
- bool operator!=(const LockLessIterator& other)
- {
- return ptr_ != other.ptr_;
- }
-
- typename NodeType::DataType operator*() const
- {
- return ptr_->data;
- }
-
- void operator++()
- {
- NodeType* tmp = ptr_->next;
- delete ptr_;
- ptr_ = tmp;
- }
-private:
- NodeType* ptr_;
-};
-
-template <typename LockLessQueue>
-class lockless_iterable_dispatch
-{
-public:
- typedef typename LockLessQueue::NodeType NodeType;
-
- lockless_iterable_dispatch(LockLessQueue& queue)
- : head_(queue.consume_all()) {}
-
- lockless_iterator<NodeType> begin()
- {
- return lockless_iterator<NodeType>(head_);
- }
-
- lockless_iterator<NodeType> end()
- {
- return lockless_iterator<NodeType>(nullptr);
- }
-private:
- NodeType* head_;
-};
-
-// This is needed so that when creating the iterable object,
-// we don't need to specify the template arguments.
-// Functions are able to deduce the template args from the param list.
-template <typename LockLessQueue>
-lockless_iterable_dispatch<
- LockLessQueue
->
-lockless_iterable(LockLessQueue& queue)
-{
- return lockless_iterable_dispatch<LockLessQueue>(queue);
-}
-
-} // namespace obelisk
-
-#endif
-
diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp
index 19f3a2c..143dce5 100644
--- a/src/worker/worker.cpp
+++ b/src/worker/worker.cpp
@@ -20,14 +20,15 @@ constexpr long poll_sleep_interval = 1000;
auto now = []() { return microsec_clock::universal_time(); };
send_worker::send_worker(czmqpp::context& context)
- : context_(context)
+ : socket_(context, ZMQ_PUSH)
{
+ BITCOIN_ASSERT(socket_.self());
+ int rc = socket_.connect("inproc://trigger-send");
+ BITCOIN_ASSERT(rc == 0);
}
void send_worker::queue_send(const outgoing_message& message)
{
- czmqpp::socket queue_socket(context_, ZMQ_PUSH);
- queue_socket.connect("inproc://trigger-send");
- message.send(queue_socket);
+ message.send(socket_);
}
request_worker::request_worker()
@@ -37,7 +38,11 @@ request_worker::request_worker()
wakeup_socket_(context_, ZMQ_PULL),
heartbeat_socket_(context_, ZMQ_PUB)
{
- wakeup_socket_.bind("inproc://trigger-send");
+ BITCOIN_ASSERT(socket_.self());
+ BITCOIN_ASSERT(wakeup_socket_.self());
+ BITCOIN_ASSERT(heartbeat_socket_.self());
+ int rc = wakeup_socket_.bind("inproc://trigger-send");
+ BITCOIN_ASSERT(rc != -1);
}
bool request_worker::start(config_type& config)
{
diff --git a/src/worker/worker.hpp b/src/worker/worker.hpp
index d8bd3cc..cfe0b47 100644
--- a/src/worker/worker.hpp
+++ b/src/worker/worker.hpp
@@ -9,7 +9,6 @@
#include <czmq++/czmq.hpp>
#include <obelisk/message.hpp>
#include "config.hpp"
-#include "lockless_queue.hpp"
#include "service/util.hpp"
namespace obelisk {
@@ -19,14 +18,9 @@ namespace obelisk {
* as that would slow down requests if they all have to sync access
* to a single socket.
*
- * Instead we have a lockless queue where send requests are pushed,
+ * Instead we have a queue (push socket) where send requests are pushed,
* and then the send_worker is notified. The worker wakes up and pushes
* all pending requests to the socket.
- *
- * We have to manage reconnects, so there's still the need to sync access
- * to the socket which is shared with the receiving request_worker.
- * This is however only for set_socket() and queue_send() so thread
- * contention is kept to the minimum and sending is (mostly) lockfree.
*/
class send_worker
{
@@ -35,11 +29,7 @@ public:
void queue_send(const outgoing_message& message);
private:
- typedef lockless_queue<outgoing_message> send_message_queue;
-
- czmqpp::context& context_;
- // When the send is ready, then the sending thread is woken up.
- send_message_queue send_queue_;
+ czmqpp::socket socket_;
};
class request_worker