使用protobuf设计消息协议(C++asio网络库相关)

Fiorenza ·
更新时间:2024-11-14
· 503 次阅读

protobuf是目前应用最广范的设计消息协议工具

具有以下一些特点:

1、proto2只支持python,c++,JAVA,proto3可以支持更多语言
2、定义协议以proto作为文件后缀名,在第一行用syntax指定proto版本
3、C++使用protobuf官方文档:http://developers.google.com/protocol-buffers/docs/cpptutorial当然需要翻墙后才能访问
4、required表示必须字段,optional表示可选字段,在proto3中可以省略定义默认都是可选
5、用protoc生成工具可以直接将proto协议生成对应语言可使用的代码,生成命令比如:protoc --cpp_out=. Protocal.proto

使用protobuf时应注意以下几点:

1、使用google::protobuf::ShutdownProtobufLibrary();进行protobuf内存释放,理论上不调用此函数也行但为了防止第三方内存检测工具误报内存泄漏还是加上比较好
2、在main里加上GOOGLE_PROTOBUF_VERIFY_VERSION进行版本检测,防止开发版本和部署版本不一致的问题发生
3、在linux下编译带protobuf的makefile文件时要加上-lprotobuf链接库
4、发消息的时候通过SerializeToString可以将消息序列化成字符串以便发送给客户端或服务器,通过返回值判断是否出错,出错的可能性有两种pb本身的出错或者内存不足
5、接收消息的时候通过ParseFromString将收到的string转化成protobuf结构体且可以通过返回值判断成功还是失败

优点:
1、效率和可读性的结合
2、安全性提高了许多

以下是用protobuf作为通信协议的asio聊天室系统协议部分:

//Protocal.proto syntax = 'proto3'; message PBindName { string name = 1; } message PChat { string information = 1; } message PRoomInformation { string name = 1; string information = 2; }

生成好该文件后执行protoc --cpp_out=. Protocal.proto可执行.h头文件和.cc代码文件
其中需要注意部分:
1、mutable_name()和release_name()分别是获取对string类型的name指针,和释放name指针
2、用的最多的是parsefromstring处理字符串数据
3、用parsefromstream可进行数据流的处理

以下是asio聊天室用protobuf的完整例子:
parseMessage4就是用protobuf处理协议

协议处理部分: //structHeader.h #ifndef FND_STRUCT_HEADER_H #define FND_STRUCT_HEADER_H #include struct Header { int bodySize; int type; }; enum MessageType { MT_BIND_NAME = 1, // {"name" : "abc"} MT_CHAT_INFO = 2, // {"information" "what i say"} MT_ROOM_INFO = 3, // {"name" : "abc", "information" : "what i say"} }; // client send struct BindName { char name[32]; int nameLen; }; // client send struct ChatInformation { char information[256]; int infoLen; }; // serversend struct RoomInformation { BindName name; ChatInformation chat; }; bool parseMessage(const std::string &input, int *type, std::string &outbuffer); bool parseMessage2(const std::string &input, int *type, std::string &outbuffer); bool parseMessage3(const std::string &input, int *type, std::string &outbuffer); bool parseMessage4(const std::string &input, int *type, std::string &outbuffer); #endif //structHeader.cpp #include "structHeader.h" #include "SerilizationObject.h" #include "JsonObject.h" #include "Protocal.pb.h" #include #include #include template std::string seriliaze(const T &obj) { std::stringstream ss; boost::archive::text_oarchive oa(ss); oa & obj; return ss.str(); } bool parseMessage4(const std::string &input, int *type, std::string &outbuffer) { auto pos = input.find_first_of(" "); if (pos == std::string::npos) return false; if (pos == 0) return false; // "BindName ok" -> substr -> BindName auto command = input.substr(0, pos); if (command == "BindName") { // we try to bind name std::string name = input.substr(pos + 1); if (name.size() > 32) return false; if (type) *type = MT_BIND_NAME; PBindName bindName; bindName.set_name(name); //auto oldname = bindName.name(); auto ok = bindName.SerializeToString(&outbuffer); return ok; } else if (command == "Chat") { // we try to chat std::string chat = input.substr(pos + 1); if (chat.size() > 256) return false; PChat pchat; pchat.set_information(chat); auto ok = pchat.SerializeToString(&outbuffer); if (type) *type = MT_CHAT_INFO; return ok; } return false; } bool parseMessage3(const std::string &input, int *type, std::string &outbuffer) { auto pos = input.find_first_of(" "); if (pos == std::string::npos) return false; if (pos == 0) return false; // "BindName ok" -> substr -> BindName auto command = input.substr(0, pos); if (command == "BindName") { // we try to bind name std::string name = input.substr(pos + 1); if (name.size() > 32) return false; if (type) *type = MT_BIND_NAME; ptree tree; tree.put("name", name); outbuffer = ptreeToJsonString(tree); //outbuffer = seriliaze(SBindName(std::move(name))); return true; } else if (command == "Chat") { // we try to chat std::string chat = input.substr(pos + 1); if (chat.size() > 256) return false; ptree tree; tree.put("information", chat); outbuffer = ptreeToJsonString(tree); //outbuffer = seriliaze(SChatInfo(std::move(chat))); if (type) *type = MT_CHAT_INFO; return true; } return false; } bool parseMessage2(const std::string &input, int *type, std::string &outbuffer) { auto pos = input.find_first_of(" "); if (pos == std::string::npos) return false; if (pos == 0) return false; // "BindName ok" -> substr -> BindName auto command = input.substr(0, pos); if (command == "BindName") { // we try to bind name std::string name = input.substr(pos + 1); if (name.size() > 32) return false; if (type) *type = MT_BIND_NAME; //SBindName bindInfo(std::move(name)); outbuffer = seriliaze(SBindName(std::move(name))); return true; } else if (command == "Chat") { // we try to chat std::string chat = input.substr(pos + 1); if (chat.size() > 256) return false; outbuffer = seriliaze(SChatInfo(std::move(chat))); // ChatInformation info; // info.infoLen = chat.size(); // std::memcpy(&(info.information), chat.data(), chat.size()); // auto buffer = reinterpret_cast(&info); // outbuffer.assign(buffer, buffer + sizeof(info)); if (type) *type = MT_CHAT_INFO; return true; } return false; } // cmd messagebody bool parseMessage(const std::string &input, int *type, std::string &outbuffer) { // input should be cmd body auto pos = input.find_first_of(" "); if (pos == std::string::npos) return false; if (pos == 0) return false; // "BindName ok" -> substr -> BindName auto command = input.substr(0, pos); if (command == "BindName") { // we try to bind name std::string name = input.substr(pos + 1); if (name.size() > 32) return false; if (type) *type = MT_BIND_NAME; BindName bindInfo; bindInfo.nameLen = name.size(); std::memcpy(&(bindInfo.name), name.data(), name.size()); auto buffer = reinterpret_cast(&bindInfo); outbuffer.assign(buffer, buffer + sizeof(bindInfo)); return true; } else if (command == "Chat") { // we try to chat std::string chat = input.substr(pos + 1); if (chat.size() > 256) return false; ChatInformation info; info.infoLen = chat.size(); std::memcpy(&(info.information), chat.data(), chat.size()); auto buffer = reinterpret_cast(&info); outbuffer.assign(buffer, buffer + sizeof(info)); if (type) *type = MT_CHAT_INFO; return true; } return false; } 聊天消息处理: //chat_message.h #ifndef CHAT_MESSAGE_HPP #define CHAT_MESSAGE_HPP #include "structHeader.h" #include "SerilizationObject.h" #include #include #include #include #include // s -> c c -> s message { header, body } // header length class chat_message { public: enum { header_length = sizeof(Header) }; enum { max_body_length = 512 }; chat_message(){} const char *data() const { return data_; } char *data() { return data_; } std::size_t length() const { return header_length + m_header.bodySize; } const char *body() const { return data_ + header_length; } char *body() { return data_ + header_length; } int type() const { return m_header.type; } std::size_t body_length() const { return m_header.bodySize; } void setMessage(int messageType, const void *buffer, size_t bufferSize) { assert(bufferSize max_body_length) { std::cout << "body size " << m_header.bodySize << " " << m_header.type << std::endl; return false; } return true; } private: char data_[header_length + max_body_length]; Header m_header; }; #endif // CHAT_MESSAGE_HPP 服务器代码部分: //main.cpp #include "chat_message.h" #include "JsonObject.h" #include "Protocal.pb.h" #include #include #include #include #include #include #include #include using boost::asio::ip::tcp; //---------------------------------------------------------------------- using chat_message_queue = std::deque; using chat_message_queue2 = std::list; //---------------------------------------------------------------------- //---------------------------------------------------------------------- class chat_session; using chat_session_ptr = std::shared_ptr; class chat_room { public: public: void join(chat_session_ptr); void leave(chat_session_ptr); void deliver(const chat_message&); private: std::set participants_; enum { max_recent_msgs = 100 }; chat_message_queue recent_msgs_; }; //---------------------------------------------------------------------- class chat_session : public std::enable_shared_from_this { public: chat_session(tcp::socket socket, chat_room &room) : socket_(std::move(socket)), room_(room) {} void start() { room_.join(shared_from_this()); do_read_header(); } void deliver(const chat_message &msg) { // first false bool write_in_progress = !write_msgs_.empty(); write_msgs_.push_back(msg); if (!write_in_progress) { // first do_write(); } } private: void do_read_header() { auto self(shared_from_this()); boost::asio::async_read( socket_, boost::asio::buffer(read_msg_.data(), chat_message::header_length), [this, self](boost::system::error_code ec, std::size_t /*length*/) { if (!ec && read_msg_.decode_header()) { do_read_body(); } else { std::cout << "Player leave the room\n"; room_.leave(shared_from_this()); } }); } void do_read_body() { auto self(shared_from_this()); boost::asio::async_read( socket_, boost::asio::buffer(read_msg_.body(), read_msg_.body_length()), [this, self](boost::system::error_code ec, std::size_t /*length*/) { if (!ec) { //room_.deliver(read_msg_); handleMessage(); do_read_header(); } else { room_.leave(shared_from_this()); } }); } template T toObject() { T obj; std::stringstream ss(std::string( read_msg_.body(), read_msg_.body() + read_msg_.body_length())); boost::archive::text_iarchive oa(ss); oa &obj; return obj; } bool fillProtobuf(::google::protobuf::Message* msg) { std::string ss( read_msg_.body(), read_msg_.body() + read_msg_.body_length()); auto ok = msg->ParseFromString(ss); return ok; } ptree toPtree() { ptree obj; std::stringstream ss( std::string(read_msg_.body(), read_msg_.body() + read_msg_.body_length())); boost::property_tree::read_json(ss, obj); return obj; } void handleMessage() { if (read_msg_.type() == MT_BIND_NAME) { PBindName bindName; if(fillProtobuf(&bindName)) m_name = bindName.name(); } else if (read_msg_.type() == MT_CHAT_INFO) { PChat chat; if(!fillProtobuf(&chat)) return; m_chatInformation = chat.information(); auto rinfo = buildRoomInfo(); chat_message msg; msg.setMessage(MT_ROOM_INFO, rinfo); room_.deliver(msg); } else { // not valid msg do nothing } } void do_write() { auto self(shared_from_this()); boost::asio::async_write( socket_, boost::asio::buffer(write_msgs_.front().data(), write_msgs_.front().length()), [this, self](boost::system::error_code ec, std::size_t /*length*/) { if (!ec) { write_msgs_.pop_front(); if (!write_msgs_.empty()) { do_write(); } } else { room_.leave(shared_from_this()); } }); } tcp::socket socket_; chat_room &room_; chat_message read_msg_; chat_message_queue write_msgs_; std::string m_name; std::string m_chatInformation; std::string buildRoomInfo() const { PRoomInformation roomInfo; roomInfo.set_name(m_name); roomInfo.set_information(m_chatInformation); std::string out; auto ok = roomInfo.SerializeToString(&out); assert(ok); return out; } // RoomInformation buildRoomInfo() const { // RoomInformation info; // info.name.nameLen = m_name.size(); // std::memcpy(info.name.name, m_name.data(), m_name.size()); // info.chat.infoLen = m_chatInformation.size(); // std::memcpy(info.chat.information, m_chatInformation.data(), // m_chatInformation.size()); // return info; // } }; void chat_room::join(chat_session_ptr participant) { participants_.insert(participant); for (const auto& msg : recent_msgs_) participant->deliver(msg); } void chat_room::leave(chat_session_ptr participant) { participants_.erase(participant); } void chat_room::deliver(const chat_message &msg) { recent_msgs_.push_back(msg); while (recent_msgs_.size() > max_recent_msgs) recent_msgs_.pop_front(); for (auto& participant : participants_) participant->deliver(msg); } //---------------------------------------------------------------------- class chat_server { public: chat_server(boost::asio::io_service &io_service, const tcp::endpoint &endpoint) : acceptor_(io_service, endpoint), socket_(io_service) { do_accept(); } private: void do_accept() { acceptor_.async_accept(socket_, [this](boost::system::error_code ec) { if (!ec) { auto session = std::make_shared(std::move(socket_), room_); session->start(); } do_accept(); }); } tcp::acceptor acceptor_; tcp::socket socket_; chat_room room_; }; //---------------------------------------------------------------------- int main(int argc, char *argv[]) { try { GOOGLE_PROTOBUF_VERIFY_VERSION; if (argc < 2) { std::cerr << "Usage: chat_server [ ...]\n"; return 1; } boost::asio::io_service io_service; std::list servers; for (int i = 1; i < argc; ++i) { tcp::endpoint endpoint(tcp::v4(), std::atoi(argv[i])); servers.emplace_back(io_service, endpoint); } io_service.run(); } catch (std::exception &e) { std::cerr << "Exception: " << e.what() << "\n"; } google::protobuf::ShutdownProtobufLibrary(); return 0; } 客户端代码部分: //client.cpp #include "chat_message.h" #include "structHeader.h" #include "JsonObject.h" #include "SerilizationObject.h" #include "Protocal.pb.h" #include #include #include #include #include #include using boost::asio::ip::tcp; using chat_message_queue = std::deque; class chat_client { public: chat_client(boost::asio::io_service &io_service, tcp::resolver::iterator endpoint_iterator) : io_service_(io_service), socket_(io_service) { do_connect(endpoint_iterator); } void write(const chat_message &msg) { io_service_.post([this, msg]() { bool write_in_progress = !write_msgs_.empty(); write_msgs_.push_back(msg); if (!write_in_progress) { do_write(); } }); } void close() { io_service_.post([this]() { socket_.close(); }); } private: void do_connect(tcp::resolver::iterator endpoint_iterator) { boost::asio::async_connect( socket_, endpoint_iterator, [this](boost::system::error_code ec, tcp::resolver::iterator) { if (!ec) { do_read_header(); } }); } void do_read_header() { boost::asio::async_read( socket_, boost::asio::buffer(read_msg_.data(), chat_message::header_length), [this](boost::system::error_code ec, std::size_t /*length*/) { if (!ec && read_msg_.decode_header()) { do_read_body(); } else { socket_.close(); } }); } void do_read_body() { boost::asio::async_read( socket_, boost::asio::buffer(read_msg_.body(), read_msg_.body_length()), [this](boost::system::error_code ec, std::size_t /*length*/) { if (!ec) { if (read_msg_.type() == MT_ROOM_INFO) { //SRoomInfo info; std::string buffer(read_msg_.body(), read_msg_.body() + read_msg_.body_length()); PRoomInformation roomInfo; auto ok = roomInfo.ParseFromString(buffer); //if(!ok) throw std::runtime_error("not valid message"); //std::stringstream ss(buffer); //ptree tree; //boost::property_tree::read_json(ss, tree); if (ok) { std::cout << "client: '"; std::cout << roomInfo.name(); std::cout << "' says '"; std::cout << roomInfo.information(); std::cout << "'\n"; } // boost::archive::text_iarchive ia(ss); // ia & info; // std::cout << "client: '"; // std::cout << info.name(); // std::cout << "' says '"; // std::cout << info.information(); // std::cout << "'\n"; } do_read_header(); } else { socket_.close(); } }); } void do_write() { boost::asio::async_write( socket_, boost::asio::buffer(write_msgs_.front().data(), write_msgs_.front().length()), [this](boost::system::error_code ec, std::size_t /*length*/) { if (!ec) { write_msgs_.pop_front(); if (!write_msgs_.empty()) { do_write(); } } else { socket_.close(); } }); } private: boost::asio::io_service &io_service_; tcp::socket socket_; chat_message read_msg_; chat_message_queue write_msgs_; }; int main(int argc, char *argv[]) { try { GOOGLE_PROTOBUF_VERIFY_VERSION; if (argc != 3) { std::cerr << "Usage: chat_client \n"; return 1; } boost::asio::io_service io_service; tcp::resolver resolver(io_service); auto endpoint_iterator = resolver.resolve({argv[1], argv[2]}); chat_client c(io_service, endpoint_iterator); std::thread t([&io_service]() { io_service.run(); }); char line[chat_message::max_body_length + 1]; // ctrl-d while (std::cin.getline(line, chat_message::max_body_length + 1)) { chat_message msg; auto type = 0; std::string input(line, line + std::strlen(line)); std::string output; if(parseMessage4(input, &type, output)) { msg.setMessage(type, output.data(), output.size()); c.write(msg); std::cout << "write message for server " << output.size() << std::endl; } } c.close(); t.join(); } catch (std::exception &e) { std::cerr << "Exception: " << e.what() << "\n"; } google::protobuf::ShutdownProtobufLibrary(); return 0; }
作者:昔拉天使



asio c+ 协议 protobuf

需要 登录 后方可回复, 如果你还没有账号请 注册新账号