转载自(http://www.cnblogs.com/lidabo/p/3906055.html)
1、实现多线程方法:
其实就是多个线程同时调用io_service::run
1 for (int i = 0; i != m_nThreads; ++i)
2 {
3 boost::shared_ptr<boost::thread> pTh(new boost::thread(
4 boost::bind(&boost::asio::io_service::run,&m_ioService)));
5 m_listThread.push_back(pTh);
6 }
2、多线程调度情况:
asio规定:只能在调用io_service::run的线程中才能调用事件完成处理器。
注:事件完成处理器就是你async_accept、async_write等注册的句柄,类似于回调的东西。
单线程:
如果只有一个线程调用io_service::run,根据asio的规定,事件完成处理器也只能在这个线程中执行。也就是说,你所有代码都在同一个线程中运行,因此变量的访问是安全的。
多线程:
如果有多个线程同时调用io_service::run以实现多线程并发处理。对于asio来说,这些线程都是平等的,没有主次之分。如果你投递的一个请求比如async_write完成时,asio将随机的激活调用io_service::run的线程。并在这个线程中调用事件完成处理器(async_write当时注册的句柄)。如果你的代码耗时较长,这个时候你投递的另一个async_write请求完成时,asio将不等待你的代码处理完成,它将在另外的一个调用io_service::run线程中,调用async_write当时注册的句柄。也就是说,你注册的事件完成处理器有可能同时在多个线程中调用。
当然你可以使用 boost::asio::io_service::strand让完成事件处理器的调用,在同一时间只有一个, 比如下面的的代码:
socket_.async_read_some(boost::asio::buffer(buffer_),
strand_.wrap(
boost::bind(&connection::handle_read, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
...
boost::asio::io_service::strand strand_;
此时async_read_som完成后掉用handle_read时,必须等待其它handle_read调用完成时才能被执行(async_read_som引起的handle_read调用)。
多线程调用时,还有一个重要的问题,那就是无序化。比如说,你短时间内投递多个async_write,那么完成处理器的调用并不是按照你投递async_write的顺序调用的。asio第一次调用完成事件处理器,有可能是第二次async_write返回的结果,也有可能是第3次的。使用strand也是这样的。strand只是保证同一时间只运行一个完成处理器,但它并不保证顺序。
代码测试:
服务器:
将下面的代码编译以后,使用cmd命令提示符下传人参数<IP> <port> <threads>调用
比如:test.exe 0.0.0.0 3005 10
客服端 使用windows自带的telnet
cmd命令提示符:
telnet 127.0.0.1 3005
原理:客户端连接成功后,同一时间调用100次boost::asio::async_write给客户端发送数据,并且在完成事件处理器中打印调用序号,和线程ID。
核心代码:
1 void start()
2 {
3 for (int i = 0; i != 100; ++i)
4 {
5 boost::shared_ptr<string> pStr(new string);
6 *pStr = boost::lexical_cast<string>(boost::this_thread::get_id());
7 *pStr += "\r\n";
8 boost::asio::async_write(m_nSocket,boost::asio::buffer(*pStr),
9 boost::bind(&CMyTcpConnection::HandleWrite,shared_from_this(),
10 boost::asio::placeholders::error,
11 boost::asio::placeholders::bytes_transferred,
12 pStr,i)
13 );
14 }
15 }
16
17 //去掉 boost::mutex::scoped_lock lock(m_ioMutex); 效果更明显。
18
19 void HandleWrite(const boost::system::error_code& error
20 ,std::size_t bytes_transferred
21 ,boost::shared_ptr<string> pStr,int nIndex)
22 {
23 if (!error)
24 {
25 boost::mutex::scoped_lock lock(m_ioMutex);
26 cout << "发送序号=" << nIndex << ",线程id=" << boost::this_thread::get_id() << endl;
27 }
28 else
29 {
30 cout << "连接断开" << endl;
31 }
32 }
完整代码:
1 #include <boost/bind.hpp>
2 #include <boost/shared_ptr.hpp>
3 #include <boost/enable_shared_from_this.hpp>
4 #include <boost/asio.hpp>
5 #include <boost/lexical_cast.hpp>
6 #include <boost/thread.hpp>
7 #include <boost/thread/mutex.hpp>
8 #include <string>
9 #include <iostream>
10
11
12 using std::cout;
13 using std::endl;
14 using std::string;
15 using boost::asio::ip::tcp;
16
17 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
18
19
20 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
21
22 class CMyTcpConnection : public boost::enable_shared_from_this<CMyTcpConnection>
23 {
24 public:
25 CMyTcpConnection(boost::asio::io_service &ser)
26 :m_nSocket(ser)
27 {
28 }
29
30 typedef boost::shared_ptr<CMyTcpConnection> CPMyTcpCon;
31
32
33 static CPMyTcpCon CreateNew(boost::asio::io_service& io_service) {
34 return CPMyTcpCon(new CMyTcpConnection(io_service));
35 }
36
37
38
39 public:
40 void start()
41 {
42 for (int i = 0; i != 100; ++i) {
43 boost::shared_ptr<string> pStr(new string);
44 *pStr = boost::lexical_cast<string>(boost::this_thread::get_id());
45 *pStr += "\r\n";
46 boost::asio::async_write(m_nSocket,boost::asio::buffer(*pStr),
47 boost::bind(&CMyTcpConnection::HandleWrite,shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, pStr,i)
48 );
49 }
50 }
51
52
53 tcp::socket& socket()
54 {
55 return m_nSocket;
56 }
57
58
59 private:
60 void HandleWrite(const boost::system::error_code& error ,std::size_t bytes_transferred ,boost::shared_ptr<string> pStr,int nIndex)
61 {
62 if (!error) {
63 boost::mutex::scoped_lock lock(m_ioMutex);
64 cout << "发送序号=" << nIndex << ",线程id=" << boost::this_thread::get_id() << endl;
65 } else {
66 cout << "连接断开" << endl;
67 }
68 }
69
70 public:
71
72 private:
73 tcp::socket m_nSocket;
74 boost::mutex m_ioMutex;
75 };
76
77 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
78
79
80 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
81
82 class CMyService : private boost::noncopyable
83 {
84 public:
85 CMyService(string const &strIP,string const &strPort,int nThreads)
86 :m_tcpAcceptor(m_ioService)
87 ,m_nThreads(nThreads)
88 {
89 tcp::resolver resolver(m_ioService);
90 tcp::resolver::query query(strIP,strPort);
91 tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
92 boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
93 m_tcpAcceptor.open(endpoint.protocol());
94 m_tcpAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
95 m_tcpAcceptor.bind(endpoint);
96 m_tcpAcceptor.listen();
97
98
99 StartAccept();
100 }
101
102 ~CMyService(){Stop();}
103
104 public:
105 void Stop()
106 {
107 m_ioService.stop();
108 for (std::vector<boost::shared_ptr<boost::thread>>::const_iterator it = m_listThread.cbegin();
109 it != m_listThread.cend(); ++ it)
110 {
111 (*it)->join();
112 }
113 }
114 void Start() {
115 for (int i = 0; i != m_nThreads; ++i) {
116 boost::shared_ptr<boost::thread> pTh( new boost::thread( boost::bind(&boost::asio::io_service::run, &m_ioService) ) );
117 m_listThread.push_back(pTh);
118 }
119 }
120 private:
121 void HandleAccept(const boost::system::error_code& error ,boost::shared_ptr<CMyTcpConnection> newConnect)
122 {
123 if (!error) {
124 newConnect->start();
125 }
126 StartAccept();
127 }
128
129
130 void StartAccept()
131 {
132 CMyTcpConnection::CPMyTcpCon newConnect = CMyTcpConnection::CreateNew(m_tcpAcceptor.get_io_service());
133 m_tcpAcceptor.async_accept( newConnect->socket(), vboost::bind(&CMyService::HandleAccept, this, boost::asio::placeholders::error, newConnect) );
134 }
135
136
137 private:
138 boost::asio::io_service m_ioService;
139 boost::asio::ip::tcp::acceptor m_tcpAcceptor;
140 std::vector<boost::shared_ptr<boost::thread>> m_listThread;
141 std::size_t m_nThreads;
142 };
143
144 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
145
146
147 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
148
149 int main(int argc, char* argv[])
150 {
151 try
152 {
153 if (argc != 4)
154 {
155 std::cerr << "<IP> <port> <threads>\n";
156 return 1;
157 }
158 int nThreads = boost::lexical_cast<int>(argv[3]);
159 CMyService mySer(argv[1], argv[2], nThreads);
160 mySer.Start();
161 getchar();
162 mySer.Stop();
163 } catch (std::exception& e) {
164 std::cerr << "Exception: " << e.what() << "\n";
165 }
166 return 0;
167 }
## 相关函数介绍
### boost::asio::ip::tcp::resolver
iterator resolve(
const endpoint_type & e,
boost::system::error_code & ec);
A forward-only iterator that can be
used to traverse the list of endpoint entries. Returns a default
constructed iterator if an error occurs.
A default constructed iterator represents the end of the list.
A successful call to this function is guaranteed to return at least one entry.
tcp::resolver一般和tcp::resolver::query结合用,通过query这个词顾名思义就知道它是用来查询socket的相应信息,一般而言我们关心socket的东东有address,port而已,通过
tcp::resolver很容易实现设置和查询,它通过query把字符串格式的ip如192.168.0.200或主机名
http://localhost,端口“8080”等转化成socket内部表示格式,这样我们应用的时候可以直接使用字符串的形式,而且不用再担心
socket的字节顺序转化问题。示例如下:
1 boost::asio::io_service io_service ;
2 boost::asio::ip::tcp::resolver resolver(io_service);
3 boost::asio::ip::tcp::resolver::query query("localhost", "9000");
还
有要说明的是, boost::asio把通讯双方(server,
client)都用endpoint的表示,所以endpoint中的address, port
分别封装了ip和端口。貌似resolver和endpoint不相干,于是乎出现tcp::resolver::iterator了,它是
resolver的迭代器,其实就是endpoint的指针,那么就可以这样: 1 boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
2 boost::asio::ip::tcp::resolver::iterator end; //默认构造的resolver::iterator,用来代表失败
3 boost::system::error_code error = boost::asio::error::host_not_found;
4 boost::asio::ip::tcp::endpoint endpoint;
5 while (error && endpoint_iterator != end) {
6 endpoint = *endpoint_iterator ; //返回的是迭代器,需要解引用成endpoint
7 socket.close(); //先close,再connect
8 socket.connect(endpoint, error);
9 endpoint_iterator++ ;
10 }
得到endpoint后就好说啦,endpoint.address().to_string()就能够返回string格式的ip地址,endpoint.port()返回端口。
其实endpoint 完全可以自己构造,方法也是很简单的,tcp::endpoint(tcp::v4(), (unsigned short)9000) 这个是server端的用法,tcp::v4()直接返回自己的address,如果用于client那么需要设置server的ip ,实现如下:
1
2 boost::system::error_code error = boost::asio::error::host_not_found;
3 boost::asio::ip::address add;
4 add.from_string("127.0.0.1");
5 tcp::endpoint endpoint(add, short(9000));
6 socket.connect(endpoint, error);
7
这样不使用resolver也是可以的。
还有更神奇的:
1
2 boost::asio::io_service ioservice ;
3 boost::asio::io_service my_io_service ;
4 boost::asio::ip::tcp::resolver resolver(my_io_service);
5 boost::asio::ip::tcp::resolver::query query("www.google.com", "http");
6 boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query);
7 boost::asio::ip::tcp::resolver::iterator end; // End marker.
8
9 while (iter != end) {
10 boost::asio::ip::tcp::endpoint endpoint = *iter++;
11 std::cout << endpoint << std::endl;
12 }
13
这样有发现一个新的用途,通过resolver迭代可以得到多个节点endpoint,比如google 就有好几个ip。
上面这个例子的运行结果:
1 74.125.128.106:80
2 74.125.128.147:80
3 74.125.128.99:80
4 74.125.128.103:80
5 74.125.128.104:80
6 74.125.128.105:80
### boost::mutex::scoped_lock
boost::mutex::scoped_lock lock(m_ioMutex);
其依赖RAII机制, 在过了作用域之后对锁进行自动释放和回收.其实现代码如图所示
boost_1_60_0/boost/asio/detail/scoped_lock.hpp:
1 //
2 // detail/scoped_lock.hpp
3 // ~~~~~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #ifndef BOOST_ASIO_DETAIL_SCOPED_LOCK_HPP
12 #define BOOST_ASIO_DETAIL_SCOPED_LOCK_HPP
13
14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
15 # pragma once
16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17
18 #include <boost/asio/detail/noncopyable.hpp>
19
20 #include <boost/asio/detail/push_options.hpp>
21
22 namespace boost {
23 namespace asio {
24 namespace detail {
25
26 // Helper class to lock and unlock a mutex automatically.
27 template <typename Mutex>
28 class scoped_lock
29 : private noncopyable
30 {
31 public:
32 // Tag type used to distinguish constructors.
33 enum adopt_lock_t { adopt_lock };
34
35 // Constructor adopts a lock that is already held.
36 scoped_lock(Mutex& m, adopt_lock_t)
37 : mutex_(m),
38 locked_(true)
39 {
40 }
41
42 // Constructor acquires the lock.
43 explicit scoped_lock(Mutex& m)
44 : mutex_(m)
45 {
46 mutex_.lock();
47 locked_ = true;
48 }
49
50 // Destructor releases the lock.
51 ~scoped_lock()
52 {
53 if (locked_)
54 mutex_.unlock();
55 }
56
57 // Explicitly acquire the lock.
58 void lock()
59 {
60 if (!locked_)
61 {
62 mutex_.lock();
63 locked_ = true;
64 }
65 }
66
67 // Explicitly release the lock.
68 void unlock()
69 {
70 if (locked_)
71 {
72 mutex_.unlock();
73 locked_ = false;
74 }
75 }
76
77 // Test whether the lock is held.
78 bool locked() const
79 {
80 return locked_;
81 }
82
83 // Get the underlying mutex.
84 Mutex& mutex()
85 {
86 return mutex_;
87 }
88
89 private:
90 // The underlying mutex.
91 Mutex& mutex_;
92
93 // Whether the mutex is currently locked or unlocked.
94 bool locked_;
95 };
96
97 } // namespace detail
98 } // namespace asio
99 } // namespace boost
100
101 #include <boost/asio/detail/pop_options.hpp>
102
103 #endif // BOOST_ASIO_DETAIL_SCOPED_LOCK_HPP
104
### lexical_cast
lexical_cast是boost中一个非常有用,常用,好用的库,我现在的小数据转换用的都是lexical_cast。
lexical_cast最大的特点是安全,包括长度安全,类型安全。
使用方式:
1 #include <boost/lexical_cast.hpp>
2
3 const double PI=3.1415926535;
4 string str;
5 str = lexical_cast<string>(PI);
6
7 string str="3.1415926535";
8 double PI=lexical_cast<double>(str);
9
如果转换失败, 抛出bad_lexical_cast异常
其实现方式如下:
boost_1_60_0/boost/lexical_cast.hpp
1 // Copyright Kevlin Henney, 2000-2005.
2 // Copyright Alexander Nasonov, 2006-2010.
3 // Copyright Antony Polukhin, 2011-2014.
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See
6 // accompanying file LICENSE_1_0.txt or copy at
7 // http://www.boost.org/LICENSE_1_0.txt)
8 //
9 // what: lexical_cast custom keyword cast
10 // who: contributed by Kevlin Henney,
11 // enhanced with contributions from Terje Slettebo,
12 // with additional fixes and suggestions from Gennaro Prota,
13 // Beman Dawes, Dave Abrahams, Daryle Walker, Peter Dimov,
14 // Alexander Nasonov, Antony Polukhin, Justin Viiret, Michael Hofmann,
15 // Cheng Yang, Matthew Bradbury, David W. Birdsall, Pavel Korzh and other Boosters
16 // when: November 2000, March 2003, June 2005, June 2006, March 2011 - 2014
17
18 #ifndef BOOST_LEXICAL_CAST_INCLUDED
19 #define BOOST_LEXICAL_CAST_INCLUDED
20
21 #include <boost/config.hpp>
22 #ifdef BOOST_HAS_PRAGMA_ONCE
23 # pragma once
24 #endif
25
26 #if defined(BOOST_NO_STRINGSTREAM) || defined(BOOST_NO_STD_WSTRING)
27 #define BOOST_LCAST_NO_WCHAR_T
28 #endif
29
30 #include <boost/range/iterator_range_core.hpp>
31 #include <boost/lexical_cast/bad_lexical_cast.hpp>
32 #include <boost/lexical_cast/try_lexical_convert.hpp>
33
34 namespace boost
35 {
36 template <typename Target, typename Source>
37 inline Target lexical_cast(const Source &arg)
38 {
39 Target result;
40
41 if (!boost::conversion::detail::try_lexical_convert(arg, result)) {
42 boost::conversion::detail::throw_bad_cast<Source, Target>();
43 }
44
45 return result;
46 }
47
48 template <typename Target>
49 inline Target lexical_cast(const char* chars, std::size_t count)
50 {
51 return ::boost::lexical_cast<Target>(
52 ::boost::iterator_range<const char*>(chars, chars + count)
53 );
54 }
55
56 template <typename Target>
57 inline Target lexical_cast(const unsigned char* chars, std::size_t count)
58 {
59 return ::boost::lexical_cast<Target>(
60 ::boost::iterator_range<const unsigned char*>(chars, chars + count)
61 );
62 }
63
64 template <typename Target>
65 inline Target lexical_cast(const signed char* chars, std::size_t count)
66 {
67 return ::boost::lexical_cast<Target>(
68 ::boost::iterator_range<const signed char*>(chars, chars + count)
69 );
70 }
71
72 #ifndef BOOST_LCAST_NO_WCHAR_T
73 template <typename Target>
74 inline Target lexical_cast(const wchar_t* chars, std::size_t count)
75 {
76 return ::boost::lexical_cast<Target>(
77 ::boost::iterator_range<const wchar_t*>(chars, chars + count)
78 );
79 }
80 #endif
81 #ifndef BOOST_NO_CXX11_CHAR16_T
82 template <typename Target>
83 inline Target lexical_cast(const char16_t* chars, std::size_t count)
84 {
85 return ::boost::lexical_cast<Target>(
86 ::boost::iterator_range<const char16_t*>(chars, chars + count)
87 );
88 }
89 #endif
90 #ifndef BOOST_NO_CXX11_CHAR32_T
91 template <typename Target>
92 inline Target lexical_cast(const char32_t* chars, std::size_t count)
93 {
94 return ::boost::lexical_cast<Target>(
95 ::boost::iterator_range<const char32_t*>(chars, chars + count)
96 );
97 }
98 #endif
99
100 } // namespace boost
101
102 #undef BOOST_LCAST_NO_WCHAR_T
103
104 #endif // BOOST_LEXICAL_CAST_INCLUDED
105
106