summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAmir Taaki <genjix@riseup.net>2014-03-23 11:33:31 (GMT)
committer Amir Taaki <genjix@riseup.net>2014-03-24 19:25:53 (GMT)
commit0b8dd13452c79142b79f106734636b1e064b3c1e (patch)
tree5bebe4d2d43e7b76511fb16289154036f68835c4
parent91d1c5c0376365bb64b7f392ab832e1d28f7e89c (diff)
switch protocol to async_strand so std::bind(...) cruft isn't needed everywhere.
-rw-r--r--include/bitcoin/network/protocol.hpp9
-rw-r--r--include/bitcoin/threadpool.hpp18
-rw-r--r--src/network/protocol.cpp101
3 files changed, 56 insertions, 72 deletions
diff --git a/include/bitcoin/network/protocol.hpp b/include/bitcoin/network/protocol.hpp
index 03af44a..070a581 100644
--- a/include/bitcoin/network/protocol.hpp
+++ b/include/bitcoin/network/protocol.hpp
@@ -170,9 +170,8 @@ public:
template <typename Message>
void broadcast(const Message& packet, broadcast_handler handle_send)
{
- strand_.post(
- std::bind(&protocol::do_broadcast<Message>,
- this, packet, handle_send));
+ strand_.queue(
+ &protocol::do_broadcast<Message>, this, packet, handle_send);
}
private:
@@ -235,7 +234,7 @@ private:
bool finished_;
// From parent
- io_service::strand& strand_;
+ async_strand& strand_;
hosts& hosts_;
handshake& handshake_;
network& network_;
@@ -311,7 +310,7 @@ private:
node->send(packet, send_handler);
}
- io_service::strand strand_;
+ async_strand strand_;
std::string hosts_filename_ = "hosts.p2p";
hosts& hosts_;
diff --git a/include/bitcoin/threadpool.hpp b/include/bitcoin/threadpool.hpp
index 9c9620a..c194c1f 100644
--- a/include/bitcoin/threadpool.hpp
+++ b/include/bitcoin/threadpool.hpp
@@ -147,11 +147,11 @@ struct wrapped_handler_impl
* Convenience class for objects wishing to synchronize operations around
* shared data.
*
- * push() guarantees that any handlers passed to it will never execute
- * at the same time.
+ * queue() guarantees that any handlers passed to it will never
+ * execute at the same time, and they will be called in sequential order.
*
- * queue() guarantees that any handlers passed to it will never execute
- * at the same time, and they will be called in sequential order.
+ * randomly_queue() guarantees that any handlers passed to it will never
+ * execute at the same time.
*/
class async_strand
{
@@ -163,12 +163,14 @@ public:
* encapsulates will never execute at the same time as another handler
* passing through this class.
*/
- template <typename... Args>
- auto wrap(Args&&... args)
+ template <typename Function, typename... Args>
+ auto wrap(Function&& func, Args&&... args)
-> wrapped_handler_impl<
- decltype(std::bind(std::forward<Args>(args)...))>
+ decltype(std::bind(
+ std::forward<Function>(func), std::forward<Args>(args)...))>
{
- auto handler = std::bind(std::forward<Args>(args)...);
+ auto handler = std::bind(
+ std::forward<Function>(func), std::forward<Args>(args)...);
return {handler, strand_};
}
diff --git a/src/network/protocol.cpp b/src/network/protocol.cpp
index af4cdfb..32374a6 100644
--- a/src/network/protocol.cpp
+++ b/src/network/protocol.cpp
@@ -43,7 +43,7 @@ static std::string pretty(const ip_address_type& ip)
protocol::protocol(threadpool& pool, hosts& hsts,
handshake& shake, network& net)
- : strand_(pool.service()), hosts_(hsts), handshake_(shake), network_(net),
+ : strand_(pool), hosts_(hsts), handshake_(shake), network_(net),
watermark_timer_(pool.service())
{
channel_subscribe_ = std::make_shared<channel_subscriber_type>(pool);
@@ -66,7 +66,7 @@ void protocol::start(completion_handler handle_complete)
{
// Bootstrap from seeds if neccessary.
bootstrap(strand_.wrap(
- std::bind(&protocol::handle_bootstrap, this, _1, handle_complete)));
+ &protocol::handle_bootstrap, this, _1, handle_complete));
// Start handshake service, but if it fails to start
// then it's not a critical error.
auto handshake_service_started = [](const std::error_code& ec)
@@ -92,9 +92,8 @@ void protocol::handle_bootstrap(
void protocol::stop(completion_handler handle_complete)
{
- hosts_.save(hosts_filename_,
- strand_.wrap(std::bind(&protocol::handle_save,
- this, _1, handle_complete)));
+ hosts_.save(hosts_filename_, strand_.wrap(
+ &protocol::handle_save, this, _1, handle_complete));
}
void protocol::handle_save(const std::error_code& ec,
completion_handler handle_complete)
@@ -112,9 +111,8 @@ void protocol::handle_save(const std::error_code& ec,
void protocol::bootstrap(completion_handler handle_complete)
{
- hosts_.load(hosts_filename_,
- strand_.wrap(std::bind(&protocol::load_hosts,
- this, _1, handle_complete)));
+ hosts_.load(hosts_filename_, strand_.wrap(
+ &protocol::load_hosts, this, _1, handle_complete));
}
void protocol::load_hosts(const std::error_code& ec,
completion_handler handle_complete)
@@ -126,9 +124,8 @@ void protocol::load_hosts(const std::error_code& ec,
handle_complete(ec);
return;
}
- hosts_.fetch_count(
- strand_.wrap(std::bind(&protocol::if_0_seed,
- this, _1, _2, handle_complete)));
+ hosts_.fetch_count(strand_.wrap(
+ &protocol::if_0_seed, this, _1, _2, handle_complete));
}
void protocol::if_0_seed(const std::error_code& ec, size_t hosts_count,
@@ -200,9 +197,8 @@ void protocol::seeds::error_case(const std::error_code& ec)
void protocol::seeds::connect_dns_seed(const std::string& hostname)
{
- connect(handshake_, network_, hostname, protocol_port,
- strand_.wrap(std::bind(&protocol::seeds::request_addresses,
- shared_from_this(), _1, _2)));
+ connect(handshake_, network_, hostname, protocol_port, strand_.wrap(
+ &protocol::seeds::request_addresses, shared_from_this(), _1, _2));
}
void protocol::seeds::request_addresses(
const std::error_code& ec, channel_ptr dns_seed_node)
@@ -216,11 +212,11 @@ void protocol::seeds::request_addresses(
else
{
dns_seed_node->send(get_address_type(),
- strand_.wrap(std::bind(&protocol::seeds::handle_send_get_address,
- shared_from_this(), _1)));
+ strand_.wrap(&protocol::seeds::handle_send_get_address,
+ shared_from_this(), _1));
dns_seed_node->subscribe_address(
- strand_.wrap(std::bind(&protocol::seeds::save_addresses,
- shared_from_this(), _1, _2, dns_seed_node)));
+ strand_.wrap(&protocol::seeds::save_addresses,
+ shared_from_this(), _1, _2, dns_seed_node));
}
}
@@ -248,9 +244,8 @@ void protocol::seeds::save_addresses(const std::error_code& ec,
{
log_debug(LOG_PROTOCOL) << "Storing seeded addresses.";
for (const network_address_type& net_address: packet.addresses)
- hosts_.store(net_address,
- strand_.wrap(std::bind(&protocol::seeds::handle_store,
- shared_from_this(), _1)));
+ hosts_.store(net_address, strand_.wrap(
+ &protocol::seeds::handle_store, shared_from_this(), _1));
if (!finished_)
{
@@ -270,10 +265,10 @@ void protocol::seeds::handle_store(const std::error_code& ec)
void protocol::run()
{
- strand_.dispatch(std::bind(&protocol::start_connecting, this));
+ strand_.queue(&protocol::start_connecting, this);
if (listen_is_enabled_)
network_.listen(protocol_port,
- strand_.wrap(std::bind(&protocol::handle_listen, this, _1, _2)));
+ strand_.wrap(&protocol::handle_listen, this, _1, _2));
}
void protocol::start_connecting()
{
@@ -303,9 +298,8 @@ void protocol::try_connect_once(slot_index slot)
// Begin connection flow: finding_peer -> connecting -> established.
// Failures end with connect_state::stopped and loop back here again.
connect_states_[slot] = connect_state::finding_peer;
- hosts_.fetch_address(
- strand_.wrap(std::bind(&protocol::attempt_connect,
- this, _1, _2, slot)));
+ hosts_.fetch_address(strand_.wrap(
+ &protocol::attempt_connect, this, _1, _2, slot));
}
void protocol::start_watermark_reset_timer()
@@ -328,7 +322,7 @@ void protocol::start_watermark_reset_timer()
start_watermark_reset_timer();
};
watermark_timer_.expires_from_now(watermark_reset_interval);
- watermark_timer_.async_wait(strand_.wrap(reset_watermark));
+ watermark_timer_.async_wait(strand_.wrap(reset_watermark, _1));
}
template <typename ConnectionList>
@@ -372,8 +366,7 @@ void protocol::attempt_connect(const std::error_code& ec,
log_debug(LOG_PROTOCOL) << "Trying "
<< pretty(address.ip) << ":" << address.port;
connect(handshake_, network_, pretty(address.ip), address.port,
- strand_.wrap(std::bind(&protocol::handle_connect,
- this, _1, _2, address, slot)));
+ strand_.wrap(&protocol::handle_connect, this, _1, _2, address, slot));
}
void protocol::handle_connect(
const std::error_code& ec, channel_ptr node,
@@ -398,17 +391,15 @@ void protocol::handle_connect(
<< pretty(address.ip) << ":" << address.port
<< " (" << connections_.size() << " connections)";
// Remove channel from list of connections
- node->subscribe_stop(
- strand_.wrap(std::bind(&protocol::outbound_channel_stopped,
- this, _1, node, slot)));
+ node->subscribe_stop(strand_.wrap(
+ &protocol::outbound_channel_stopped, this, _1, node, slot));
setup_new_channel(node);
}
void protocol::maintain_connection(const std::string& hostname, uint16_t port)
{
- connect(handshake_, network_, hostname, port,
- strand_.wrap(std::bind(&protocol::handle_manual_connect,
- this, _1, _2, hostname, port)));
+ connect(handshake_, network_, hostname, port, strand_.wrap(
+ &protocol::handle_manual_connect, this, _1, _2, hostname, port));
}
void protocol::handle_manual_connect(
const std::error_code& ec, channel_ptr node,
@@ -426,9 +417,8 @@ void protocol::handle_manual_connect(
log_info(LOG_PROTOCOL) << "Manual connection established: "
<< hostname << ":" << port;
// Remove channel from list of connections
- node->subscribe_stop(
- strand_.wrap(std::bind(&protocol::manual_channel_stopped,
- this, _1, node, hostname, port)));
+ node->subscribe_stop(strand_.wrap(
+ &protocol::manual_channel_stopped, this, _1, node, hostname, port));
setup_new_channel(node);
}
@@ -441,17 +431,15 @@ void protocol::handle_listen(const std::error_code& ec, acceptor_ptr accept)
}
else
{
- accept->accept(
- strand_.wrap(std::bind(&protocol::handle_accept,
- this, _1, _2, accept)));
+ accept->accept(strand_.wrap(
+ &protocol::handle_accept, this, _1, _2, accept));
}
}
void protocol::handle_accept(const std::error_code& ec, channel_ptr node,
acceptor_ptr accept)
{
- accept->accept(
- strand_.wrap(std::bind(&protocol::handle_accept,
- this, _1, _2, accept)));
+ accept->accept(strand_.wrap(
+ &protocol::handle_accept, this, _1, _2, accept));
if (ec)
{
log_error(LOG_PROTOCOL)
@@ -470,9 +458,8 @@ void protocol::handle_accept(const std::error_code& ec, channel_ptr node,
return;
}
// Remove channel from list of connections
- node->subscribe_stop(
- strand_.wrap(std::bind(&protocol::inbound_channel_stopped,
- this, _1, node)));
+ node->subscribe_stop(strand_.wrap(
+ &protocol::inbound_channel_stopped, this, _1, node));
setup_new_channel(node);
};
handshake_.ready(node, handshake_complete);
@@ -560,9 +547,8 @@ void protocol::inbound_channel_stopped(
void protocol::subscribe_address(channel_ptr node)
{
- node->subscribe_address(
- strand_.wrap(std::bind(&protocol::receive_address_message,
- this, _1, _2, node)));
+ node->subscribe_address(strand_.wrap(
+ &protocol::receive_address_message, this, _1, _2, node));
}
void protocol::receive_address_message(const std::error_code& ec,
const address_type& packet, channel_ptr node)
@@ -575,12 +561,10 @@ void protocol::receive_address_message(const std::error_code& ec,
}
log_debug(LOG_PROTOCOL) << "Storing addresses.";
for (const network_address_type& net_address: packet.addresses)
- hosts_.store(net_address,
- strand_.wrap(std::bind(&protocol::handle_store_address,
- this, _1)));
- node->subscribe_address(
- strand_.wrap(std::bind(&protocol::receive_address_message,
- this, _1, _2, node)));
+ hosts_.store(net_address, strand_.wrap(
+ &protocol::handle_store_address, this, _1));
+ node->subscribe_address(strand_.wrap(
+ &protocol::receive_address_message, this, _1, _2, node));
}
void protocol::handle_store_address(const std::error_code& ec)
{
@@ -591,9 +575,8 @@ void protocol::handle_store_address(const std::error_code& ec)
void protocol::fetch_connection_count(
fetch_connection_count_handler handle_fetch)
{
- strand_.post(
- std::bind(&protocol::do_fetch_connection_count,
- this, handle_fetch));
+ strand_.queue(
+ &protocol::do_fetch_connection_count, this, handle_fetch);
}
void protocol::do_fetch_connection_count(
fetch_connection_count_handler handle_fetch)