网络基础

协议的概念

什么是协议

从应用角度出发,协议可理解为”规则”,是数据传输和数据解释的规则

假设A、B双方欲传输文件,规定:

  • 第一次:传输文件名,接收方接受到的文件名应答OK给传输方
  • 第二次:发送文件的尺寸,接收方接收到数据应答一个OK给传输方
  • 第三次:传输文件内容,接收方接收到数据完成后应答OK表示文件内容接受成功

由此,无论A、B之间传递何种文件,都是通过三次数据传输来完成。A、B之间形成了一个最简单的数据传输规则。双方都按此规则发送、接收数据。A、B之间达成的这个相互遵守的规则即为协议

这种仅在A、B之间被遵守的协议称之为原始协议。当此协议被更多的人采用,不断地增加、改进、维护、完善,最终形成了一个稳定的、完整的文件传输协议,被广泛应用于各种文件传输过程中。该协议就成为了一个标准协议。最早的FTP协议就是由此衍生而来

TCP协议注重数据的传输,HTTP协议注重数据的解释

典型协议

应用层:HTTP、FTP、SSH、NFS、TELNET、

传输层:TCP、UDP

网络层:IP、ARP、ICMP、IGMP

TCP:传输控制协议(Transmisssion Control Protocol)是一种面向连接的、可靠的、基于字节流的传输层通信协议

UDP:用户数据报协议(User Datagram Protocol)是OSI参考模型中一种无连接的传输层协议,提供面向事务的简单不可靠信息传送协议

HTTP:超文本传输协议(Hyper Text Transfer Protocol)是互联网上应用最为广泛的一种网络协议

FTP:文本传输协议(File Transfer Protocol)

IP:因特网互联协议(Internet Protocol)

ICMP:因特网控制报文协议(Internet Control Message Protocol)是TCP/IP协议族的一个子协议,用于在IP主机、路由器之间传递控制消息

IGMP:因特网组管理协议(Internet Group Management Protocol)是因特网协议家族的一个组播协议,运行在主机和组播路由器之间

ARP:正向地址解析协议(Address Resolution Protocol)通过已知IP地址寻找主机对应MAC地址

网络应用程序设计模式

C/S模式:传统的网络应用设计模式,客户机(client)/服务器(server)模式。需要在通讯两端各自部署客户机和服务器来完成数据通信

B/S模式:浏览器(Browser)/服务器(Server)模式。只需在一端部署服务器,而另外一端使用每台PC都默认配置的浏览器即可完成数据的传输

优缺点:

  • C/S:
    • 优点:缓存数据,调高传输效率;协议选择灵活,可定制、剪裁;
    • 缺点:开发工作量大;安全性差;不能跨平台
  • B/S:
    • 优点:开发工作量小;安全性强;跨平台
    • 缺点:数据无法缓存;只能使用固定协议

分层模型

OSI七层模型

物理层:主要定义物理设备标准,如网线的接口类型、光线的接口类型、各种传输介质的传输速率等。它的主要作用是传输比特流(就是由1、0转化为电流强弱来进行传输,到达目的地后再转化为1、0,也就是我们常说的数模转换与模数转化),这一层的数据叫做比特层

数据链路层:定义了如何让格式化数据以帧为单位进行传输,以及如何让控制对物理介质的访问。这一层通常还提供错误检测和纠正,以确保数据的可靠传输。如:串口通信中使用到的115200、8、N、1

网络层:在位于不同地理位置的网络中的两个主机系统之间提供连接和路由选择。Internet的发展使得从世界各站点访问信息的用户数大大增加,而网络层正是管理这种连接的层

传输层:定义了一些传输数据的协议和端口号(WWW端口号80等)如:TCP(传输控制协议,传输效率低,可靠性强,用于传输可靠性要求高、数据量大的数据),UDP(传输控制协议,传输效率高,可靠性弱,用于传输可靠性要求低、数据量小的数据)主要是将从下层接受的数据进行分段和传输,到达目的地址后再进行重组

会话层:通过传输层(端口号,传输端口与接收端口)建立数据传输的通路。主要在你的系统之间发起会话或者接受会话请求(设备之间需要相互认识可以是IP也可以是MAC或者是主机名)

表示层:可确保一个系统的应用层所发送的信息可以被另一个系统的应用层读取。例如:PC程序与另一台计算机进行通信,其中一台计算机使用扩展EBCDIC交换码,而另一台则使用美国信息交换标准码(ASCII)来表示相同字符。如果必要,表示层会通过使用一种通用格式来实验多种数据格式之间的转换

应用层:是最靠近用户的OSI层。这一层为用户的应用程序(例如电子邮件、文件传输和终端仿真)提供网路服务

TCP/IP四层模型

TCP/IP网络协议栈分为应用层(Application)、传输层(Transport)、网络层(Network)和链路层(Link)四层

TCP/IP模型

一般在应用开发过程中讨论最多的就是TCP/IP模型

通信过程

两台计算机通过TCP/IP协议通信的过程如下:

TCP/IP通信过程

上图对应两台计算机在同一网段中的情况,如果两天计算机在不同的网段中,那么是数据从一台计算机到另一台计算机传输过程要经过一个或过个路由器

数据没有封装之前是不能在网络中传递

  • 链路层有以太网、令牌环网等标准,链路层负责网卡设备的驱动、帧同步(即从网线上检测到什么信号算作新帧的开始)、冲突检测(如果检测到冲突就自动重发)、数据差错校验等工作。交换机是工作在链路层的网络设备,可以在不同的链路层网络之间转发数据帧(比如十兆以太网和百兆以太网之间、以太网和令牌环网之间),由于不同链路层的帧格式不同,交换机要将进来的数据包拆掉链路层首部重新封装之后再转发
  • 网络层的IP协议是构成Internet的基础。Internet上的主机通过IP地址来标识,Inter-net上有大量路由器负责根据IP地址选择合适的路径转发数据包,数据包从Internet上的源主机到目的主机往往要经过十多个路由器。路由器是工作在第三层的网络设备,同时兼有交换机的功能,可以在不同的链路层接口之间转发数据包,因此路由器需要将进来的数据包拆掉网络层和链路层两层首部并重新封装。IP协议不保证传输的可靠性,数据包在传输过程中可能丢失,可靠性可以在上层协议或应用程序中提供支持
  • 网络层负责点到点(ptop,point-to-point)的传输(这里的“点”指主机或路由器),而传输层负责端到端(etoe,end-to-end)的传输(这里的“端”指源主机和目的主机)。传输层可选择TCP或UDP协议
  • TCP是一种面向连接的、可靠的协议,有点像打电话,双方拿起电话互通身份之后就建立了连接,然后说话就行了,这边说的话那边保证听得到,并且是按说话的顺序听到的,说完话挂机断开连接。也就是说TCP传输的双方需要首先建立连接,之后由TCP协议保证数据收发的可靠性,丢失的数据包自动重发,上层应用程序收到的总是可靠的数据流,通讯之后关闭连接
  • UDP是无连接的传输协议,不保证可靠性,有点像寄信,信写好放到邮筒里,既不能保证信件在邮递过程中不会丢失,也不能保证信件寄送顺序。使用UDP协议的应用程序需要自己完成丢包重发、消息排序等工作

协议格式

数据包封装

传输层及其以下的机制由内核提供,应用层由用户进程提供(后面将介绍如何使用socket API编写应用程序),应用程序对通讯数据的含义进行解释,而传输层及其以下处理通讯的细节,将数据从一台计算机通过一定的路径发送到另一台计算机。应用层数据通过协议栈发到网络上时,每层协议都要加上一个数据首部(header),称为封装(Encapsulation)

不同的协议层对数据包有不同的称谓,在传输层叫做段(segment),在网络层叫做数据报(datagram),在链路层叫做帧(frame)。数据封装成帧后发到传输介质上,到达目的主机后每层协议再剥掉相应的首部,最后将应用层数据交给应用程序处理

以太网帧格式

以太网帧中的数据长度规定最小46字节,最大1500字节,ARP和RARP数据包的长度不够46字节,要在后面补填充位。最大值1500称为以太网的最大传输单元(MTU),不同的网络类型有不同的MTU,如果一个数据包从以太网路由到拨号链路上,数据包长度大于拨号链路的MTU,则需要对数据包进行分片(fragmentation)

ARP数据报格式

ARP协议是根据IP地址获取MAC地址

在网络通讯时,源主机的应用程序知道目的主机的IP地址和端口号,却不知道目的主机的硬件地址,而数据包首先是被网卡接收到再去处理上层协议的,如果接收到的数据包的硬件地址与本机不符,则直接丢弃。因此在通讯前必须获得目的主机的硬件地址。ARP协议就起到这个作用。源主机发出ARP请求,询问“IP地址是192.168.0.1的主机的硬件地址是多少”,并将这个请求广播到本地网段(以太网帧首部的硬件地址填FF:FF:FF:FF:FF:FF表示广播),目的主机接收到广播的ARP请求,发现其中的IP地址与本机相符,则发送一个ARP应答数据包给源主机,将自己的硬件地址填写在应答包中

每台主机都维护一个ARP缓存表,可以用arp -a命令查看。缓存表中的表项有过期时间(一般为20分钟),如果20分钟内没有再次使用某个表项,则该表项失效,下次还要发ARP请求来获得目的主机的硬件地址

IP段格式

IP数据报的首部长度和数据长度都是可变长的,但总是4字节的整数倍。对于IPv4,4位版本字段是4。4位首部长度的数值是以4字节为单位的,最小值为5,也就是说首部长度最小是4x5=20字节,也就是不带任何选项的IP首部,4位能表示的最大值是15,也就是说首部长度最大是60字节。8位TOS字段有3个位用来指定IP数据报的优先级(目前已经废弃不用),还有4个位表示可选的服务类型(最小延迟、最大吐量、最大可靠性、最小成本),还有一个位总是0。总长度是整个数据报(包括IP首部和IP层payload)的字节数。每传一个IP数据报,16位的标识加1,可用于分片和重新组装数据报。3位标志和13位片偏移用于分片。TTL(Time to live)是这样用的:源主机为数据包设定一个生存时间,比如64,每过一个路由器就把该值减1,如果减到0就表示路由已经太长了仍然找不到目的主机的网络,就丢弃该包,因此这个生存时间的单位不是秒,而是跳(hop)。协议字段指示上层协议是TCP、UDP、ICMP还是IGMP。然后是校验和,只校验IP首部,数据的校验由更高层协议负责。IPv4的IP地址长度为32位

UDP数据报格式

般的网络通信都是像TFTP协议这样,通信的双方分别是客户端和服务器,客户端主动发起请求(上面的例子就是客户端发起的请求帧),而服务器被动地等待、接收和应答请求。客户端的IP地址和端口号唯一标识了该主机上的TFTP客户端进程,服务器的IP地址和端口号唯一标识了该主机上的TFTP服务进程,由于客户端是主动发起请求的一方,它必须知道服务器的IP地址和TFTP服务进程的端口号,所以,一些常见的网络协议有默认的服务器端口,例如HTTP服务默认TCP协议的80端口,FTP服务默认TCP协议的21端口,TFTP服务默认UDP协议的69端口(如上例所示)。在使用客户端程序时,必须指定服务器的主机名或IP地址,如果不明确指定端口号则采用默认端口,请读者查阅ftp、tftp等程序的man page了解如何指定端口号。/etc/services中列出了所有well-known的服务端口和对应的传输层协议,这是由IANA(Internet Assigned Numbers Authority)规定的,其中有些服务既可以用TCP也可以用UDP,为了清晰,IANA规定这样的服务采用相同的TCP或UDP默认端口号,而另外一些TCP和UDP的相同端口号却对应不同的服务

使用UDP协议的应用程序必须考虑到这些可能的问题并实现适当的解决方案,例如等待应答、超时重发、为数据包编号、流量控制等。一般使用UDP协议的应用程序实现都比较简单,只是发送一些对可靠性要求不高的消息,而不发送大量的数据。例如,基于UDP的TFTP协议一般只用于传送小文件(所以才叫trivial的ftp),而基于TCP的FTP协议适用于 各种文件的传输

TCP数据报格式

与UDP协议一样也有源端口号和目的端口号,通讯的双方由IP地址和端口号标识。32位序号、32位确认序号、窗口大小稍后详细解释。4位首部长度和IP协议头类似,表示TCP协议头的长度,以4字节为单位,因此TCP协议头最长可以是4x15=60字节,如果没有选项字段,TCP协议头最短20字节。URG、ACK、PSH、RST、SYN、FIN是六个控制位,本节稍后将解释SYN、ACK、FIN、RST四个位,其它位的解释从略。16位检验和将TCP协议头和数据都计算在内

TCP协议

TCP通信时序

建立连接(三次握手)的过程:

  • 客户端发送一个带SYN标志的TCP报文到服务器。这是三次握手过程中的段1。客户端发出段1,SYN位表示连接请求。序号是1000,这个序号在网络通讯中用作临时的地址,每发一个数据字节,这个序号要加1,这样在接收端可以根据序号排出数据包的正确顺序,也可以发现丢包的情况,另外,规定SYN位和FIN位也要占一个序号,这次虽然没发数据,但是由于发了SYN位,因此下次再发送应该用序号1001。mss表示最大段尺寸,如果一个段太大,封装成帧后超过了链路层的最大帧长度,就必须在IP层分片,为了避免这种情况,客户端声明自己的最大段尺寸,建议服务器端发来的段不要超过这个长度
  • 服务器端回应客户端,是三次握手中的第2个报文段,同时带ACK标志和SYN标志。它表示对刚才客户端SYN的回应;同时又发送SYN给客户端,询问客户端是否准备好进行数据通讯。服务器发出段2,也带有SYN位,同时置ACK位表示确认,确认序号是1001,表示“我接收到序号1000及其以前所有的段,请你下次发送序号为1001的段”,也就是应答了客户端的连接请求,同时也给客户端发出一个连接请求,同时声明最大尺寸为1024
  • 客户必须再次回应服务器端一个ACK报文,这是报文段3。客户端发出段3,对服务器的连接请求进行应答,确认序号是8001。在这个过程中,客户端和服务器分别给对方发了连接请求,也应答了对方的连接请求,其中服务器的请求和应答在一个段中发出,因此一共有三个段用于建立连接,称为”三方握手(three-way-handshake)”。在建立连接的同时,双方协商了一些信息,例如双方发送序号的初始值、最大段尺寸等

关闭连接(四次握手)的过程:

由于TCP连接是全双工的,因此每个方向都必须单独进行关闭。这原则是当一方完成它的数据发送任务后就能发送一个FIN来终止这个方向的连接。收到一个 FIN只意味着这一方向上没有数据流动,一个TCP连接在收到一个FIN后仍能发送数据。首先进行关闭的一方将执行主动关闭,而另一方执行被动关闭

  • 客户端发出段7,FIN位表示关闭连接的请求
  • 服务器发出段8,应答客户端的关闭连接请求
  • 服务器发出段9,其中也包含FIN位,向客户端发送关闭连接请求
  • 客户端发出段10,应答服务器的关闭连接请求

TCP状态转换

TCP状态转换

Socket编程

套接字概念

Socket本身有”插座”的意思,在Linux环境下,用于表示进程间网络通信的特殊文件类型。本质为内核借助缓冲区形成的伪文件

既然是文件,那么理所应当的可以使用文件描述符引用套接字。与管道类似,Linux系统将其封装成文件的目的是为了统一接口,使得读写套接字和读写文件的操作一致。区别是管道主要应用于本地进程间通信,而套接字多应用于网络进程间数据的传递

套接字的内核实现较为复杂

在TCP/IP协议中,”IP地址+TCP或UDP端口号”唯一标识网络通信中的一个进程。”IP地址+端口号”就对应一个socket。欲建立连接的两个进程各自有一个socket来标识,那么这两个socket组成的socket pair就唯一标识一个连接。因此可以用socket来描述网络连接的一对一关系

在网络通信中,套接字一定是成对出现的。一端的发送缓冲区对应对端的接受缓冲区

TCP/IP协议最早在BSD UNIX上实现,为TCP/IP协议设计的应用层编程接口称为socket API

预备知识

网络字节序

我们知道,内存中的多字节数据相对于内存地址有大端和小端之分,磁盘文件中的多字节数据相对于文件中的偏移地址也有大端小端之分。网络数据流同样有大端小端之分,那么如何定义网络数据流的地址?发送主机通常将发送缓冲区中的数据按内存地址从低到高的顺序发出,接收主机把从网络上接到的字节依次保存在接收缓冲区中,也是按内存地址从低到高的顺序保存。因此,网络数据流的地址应规定:先发出的数据是低地址,后发出的数据是高地址

  • 小端法(pc本地存储):高位存高地址,低位存低地址
  • 大端法(网络存储):高位存低地址,低位存高地址

网络字节序和主机字节序转换函数:

  • uint32_t htonl(uint32_t hostlong);
  • uint16_t htons(uint16_t hostshort);
  • uint32_t ntohl(uint32_t netlong);
  • uint16_t ntohs(uint16_t netshort);

IP地址转换函数

#include <arpa/inet.h>
int inet_pton(int af, const char *src, void *dst); // 本地字节序转化为网络字节序
const char *inet_ntop(int af, const char *src, char *dst, socklen_t size); // 网络字节序转换为本地字节序

支持IPv4和IPv6:af(AF_INET、AF_INET6)

sockaddr数据结构

struct sockaddr很多网络编程函数诞生早于IPv4,那时候都是用的是sockaddr结构体,为了向前兼容,现在sockaddr退化成(void *)的作用,传递一个地址给函数,至于这个函数是sockaddr_in还是sockaddr_in6由地址族确定,然后函数内部再强制类型转化为所需的地址类型

struct sockaddr {
sa_family_t sa_family; /* address family, AF_xxx */
char sa_data[14]; /* 14 bytes of protocol address */
};

struct sockaddr_in {
__kernel_sa_family_t sin_family; /* Address family */ 地址结构类型
__be16 sin_port; /* Port number */ 端口号
struct in_addr sin_addr; /* Internet address */ IP地址
/* Pad to size of `struct sockaddr'. */
unsigned char __pad[__SOCK_SIZE__ - sizeof(short int) -
sizeof(unsigned short int) - sizeof(struct in_addr)];
};

struct in_addr { /* Internet address. */
__be32 s_addr;
};

struct sockaddr_in6 {
unsigned short int sin6_family; /* AF_INET6 */
__be16 sin6_port; /* Transport layer port # */
__be32 sin6_flowinfo; /* IPv6 flow information */
struct in6_addr sin6_addr; /* IPv6 address */
__u32 sin6_scope_id; /* scope id (new in RFC2553) */
};

struct in6_addr {
union {
__u8 u6_addr8[16];
__be16 u6_addr16[8];
__be32 u6_addr32[4];
} in6_u;
#define s6_addr in6_u.u6_addr8
#define s6_addr16 in6_u.u6_addr16
#define s6_addr32 in6_u.u6_addr32
};

#define UNIX_PATH_MAX 108
struct sockaddr_un {
__kernel_sa_family_t sun_family; /* AF_UNIX */
char sun_path[UNIX_PATH_MAX]; /* pathname */
};

网络套接字函数

socket模型创建流程图

socket API
  • bind():向套接字绑定IP地址和端口号
  • listen():设置监听上限
  • accept():阻塞监听客户端连接

socket函数

#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
int socket(int domain, int type, int protocol);
  • domain:AF_INET、AF_INET6、AF_UNIX

  • type:SOCK_STREAM(流式TCP)、SOCK_DGRAM(报式UDP)

  • protocol:0(自动选择type对应的典型协议)

返回值:成功(新套接字对应的文件描述符)、失败(-1 errno)

bind函数

#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
  • sockfd:socket函数返回值

  • addr:(struct sockaddr *)&addr;

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8888);
    addr.sin_addr.s_addr = htonl(INADDR_ANY);
    // 传入(struct sockaddr *)&addr;
  • addrlen:sizeof(addr)地址结构大小

返回值:成功(0)、失败(-1 errno)

listen函数

#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
int listen(int sockfd, int backlog); // 设置同时与服务器建立连接的上线数(同时进行三次握手的客户端数量)
  • sockfd:socket函数返回值
  • backlog:上限数值

返回值:成功(0)、失败(-1 errno)

accept函数

#include <sys/types.h> 		/* See NOTES */
#include <sys/socket.h>
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen); // 阻塞等待客户端建立连接,成功返回一个与客户端成功连接的socket文件描述符
  • sockfd:socket函数返回值
  • addr:传出参数,成功与服务器建立连接的那个客户端的地址结构(IP+port)
  • addrlen:传入传出,传入addr大小,传出客户端addr实际大小

返回值:成功(能与服务器进行数据通信的socket文件描述符)、失败(-1 errno)

connect函数

#include <sys/types.h> 					/* See NOTES */
#include <sys/socket.h>
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen); // 使用现有的socket与服务器建立连接
  • sockfd:socket函数返回值
  • addr:传入参数,服务器的地址结构
  • addrlen:服务器地址结构的大小

返回值:成功(0)、失败(-1 errno)

如果不使用bind绑定客户端地址结构,采用”隐式绑定”

C/S模型

server

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

//server:
// socket() 创建socket
// bind() 绑定服务器地址结构
// listen() 设置监听上限
// accept() 阻塞监听客户端连接
// read(fd) 读socket获取客户端数据
// 大小写转换
// write(fd)
// close()

#define SERVER_PORT 9527

void sys_err(const char *str) {
perror(str);
exit(1);
}

int main() {
int lfd = 0, cfd = 0;
int ret;
char buf[BUFSIZ], clientIP[1024];
struct sockaddr_in serverAddr, clientAddr;
socklen_t clientAddrLen;

serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(SERVER_PORT);
serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);

// socket
lfd = socket(AF_INET, SOCK_STREAM, 0);
if (lfd == -1) {
sys_err("socket error");
}

// bind
int errno = bind(lfd, (struct sockaddr *) &serverAddr, sizeof(serverAddr));
if (errno == -1) {
sys_err("bind error");
}

// listen
errno = listen(lfd, 128);
if (errno == -1) {
sys_err("listen error");
}

clientAddrLen = sizeof(clientAddr);

while (1) {
// accept
cfd = accept(lfd, (struct sockaddr *) &clientAddr, &clientAddrLen);
if (cfd == -1) {
sys_err("accept error");
}

ret = read(cfd, buf, sizeof(buf));
if (ret == 0) {
close(cfd);
exit(1);
}

for (int i = 0; i < ret; i++) {
buf[i] = toupper(buf[i]);
}
write(cfd, buf, ret);
write(STDOUT_FILENO, buf, ret);

}
return 0;
}
// C++实现
#include <iostream>
#include <boost/asio.hpp>

int main() {
std::cout << "server start ..." << std::endl;
// io_service对象
boost::asio::io_service ios;
// 绑定端口
boost::asio::ip::tcp::acceptor acceptor(ios, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 8888));
// socket对象
boost::asio::ip::tcp::socket sock(ios);
// 阻塞等待socket连接
acceptor.accept(sock);

char msgFromClient[BUFSIZ];
char msgToClient[BUFSIZ];

while(true) {
boost::asio::read(sock, boost::asio::buffer(msgFromClient));
std::cout << "client: " << sock.remote_endpoint().address() << "; msgFromClient: " << msgFromClient << std::endl;

std::string tmp = std::string(msgFromClient);
std::transform(tmp.begin(), tmp.end(), tmp.begin(), ::toupper);
strcpy(msgToClient, tmp.c_str());

boost::asio::write(sock, boost::asio::buffer(msgToClient));
std::cout << "msgToClient: " << msgToClient << std::endl;
}

return 0;
}

client

#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <ctype.h>
#include <arpa/inet.h>

//client:
// socket() 创建socket
// connect() 与服务器建立连接
// write() 写数据到socket
// read() 读转换后的数据
// 显示结果
// close()

#define SERVER_PORT 9527

void sys_err(const char *str) {
perror(str);
exit(1);
}

int main() {
int cfd;
int count = 10;
char buf[BUFSIZ];
struct sockaddr_in serverAddr;

serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(SERVER_PORT);
inet_pton(AF_INET, "127.0.0.1", &serverAddr.sin_addr.s_addr);

cfd = socket(AF_INET, SOCK_STREAM, 0);
if(cfd == -1) {
sys_err("socket error");
}

int errno = connect(cfd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if(errno == -1) {
sys_err("connect error");
}

while(count--) {
write(cfd, "hello\n", 6);
int ret = read(cfd, buf, sizeof(buf));
write(STDOUT_FILENO, buf, ret);
sleep(1);
}

return 0;
}
// C++实现
#include <iostream>
#include <boost/asio.hpp>

int main() {
std::cout << "client start ..." << std::endl;
// io_service对象
boost::asio::io_service ios;
// socket对象
boost::asio::ip::tcp::socket sock(ios);
// 创建连接端
boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address::from_string("127.0.0.1"), 8888);

sock.connect(ep);

char msgFromServer[BUFSIZ];
char msgToServer[BUFSIZ];

for (int i = 0; i < 3; i++) {
std::memset(msgToServer, 0x00, sizeof(msgToServer));
std::memset(msgFromServer, 0x00, sizeof(msgFromServer));
std::string tmp = "hello";

strcpy(msgToServer, tmp.c_str());
boost::asio::write(sock, boost::asio::buffer(msgToServer));
std::cout << "client to server: " << msgToServer << std::endl;
boost::asio::read(sock, boost::asio::buffer(msgFromServer));
std::cout << "client from server: " << msgFromServer << std::endl;

std::this_thread::sleep_for(std::chrono::seconds(1));
}

return 0;
}

高并发服务器

高并发服务器

多进程并发服务器

使用多进程并发服务器时需要考虑一下几点:

  • 父进程最大文件描述符个数(父进程中需要close关闭accept返回的新文件描述符)
  • 系统内创建进程个数(与内存大小相关)
  • 进程创建过多是否降低整体服务性能(进程调度)

server/client

#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <ctype.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>

//server:
// socket() 创建socket
// bind() 绑定服务器地址结构
// listen() 设置监听上限
// accept() 阻塞监听客户端连接
// read(fd) 读socket获取客户端数据
// 大小写转换
// write(fd)
// close()

#define SERVER_PORT 9999

void sys_err(const char *str) {
perror(str);
exit(1);
}

void catch_child(int signum) {
while (waitpid(0, NULL, WNOHANG) > 0);
return;
}


int main() {
int lfd = 0, cfd = 0;
int ret;
char buf[BUFSIZ], clientIP[1024];
struct sockaddr_in serverAddr, clientAddr;
socklen_t clientAddrLen;
pid_t pid;

serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(SERVER_PORT);
serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);

// socket
lfd = socket(AF_INET, SOCK_STREAM, 0);
if (lfd == -1) {
sys_err("socket error");
}

// bind
int errno = bind(lfd, (struct sockaddr *) &serverAddr, sizeof(serverAddr));
if (errno == -1) {
sys_err("bind error");
}

// listen
errno = listen(lfd, 128);
if (errno == -1) {
sys_err("listen error");
}

clientAddrLen = sizeof(clientAddr);
// write/read
while (1) {
// accept
cfd = accept(lfd, (struct sockaddr *) &clientAddr, &clientAddrLen);
if (cfd == -1) {
sys_err("accept error");
}

pid = fork();
if (pid < 0) {
sys_err("fork error");
} else if (pid == 0) {
// 子进程
close(lfd);
while (1) {
ret = read(cfd, buf, sizeof(buf));
if (ret == 0) {
close(cfd);
exit(1);
}
for (int i = 0; i < ret; i++) {
buf[i] = toupper(buf[i]);
}
write(cfd, buf, ret);
write(STDOUT_FILENO, buf, ret);
}
break;
} else {
// 父进程
close(cfd);
// 回收子进程
struct sigaction act;
act.sa_handler = catch_child;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
ret = sigaction(SIGCHLD, &act, NULL);
if (ret != 0) {
sys_err("sigaction error");
}
continue;
}
}
return 0;
}

多线程并发服务器

使用多线程并发服务器时需要考虑一下几点:

  • 调整进程内最大文件描述符上限
  • 线程如有共享数据,考虑线程同步
  • 服务于客户端线程退出时,退出处理(退出处理,分离态)
  • 系统负载,随着链接2客户端增加,导致其他线程不能及时得到CPU

server

#include <iostream>
#include <string>
#include <boost/asio.hpp>

void session(boost::asio::ip::tcp::socket socket) {
try {
// 读取数据
char msgFromClient[BUFSIZ];
// 回传数据
char msgToClient[BUFSIZ];
boost::system::error_code error;
while(true) {
size_t len = socket.read_some(boost::asio::buffer(msgFromClient), error);
if (error == boost::asio::error::eof) {
return;
}

std::cout << "client: " << socket.remote_endpoint().address() << "; msgFromClient: " << msgFromClient
<< std::endl;

std::string tmp = std::string(msgFromClient);
std::transform(tmp.begin(), tmp.end(), tmp.begin(), ::toupper);
strcpy(msgToClient, tmp.c_str());

boost::asio::write(socket, boost::asio::buffer(msgToClient, len));
std::cout << "msgToClient: " << msgToClient << std::endl;
}
}
catch (std::exception &e) {
std::cerr << "Exception in session: " << e.what() << std::endl;
}
}

int main() {
try {
boost::asio::io_context io_context;

// 创建一个TCP端点并侦听端口 8000
boost::asio::ip::tcp::tcp::acceptor acceptor(io_context, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 8000));

while (true) {
// 接受客户端连接
boost::asio::ip::tcp::tcp::socket socket(io_context);
acceptor.accept(socket);

// 创建一个新的会话
std::thread(session, std::move(socket)).detach();
}
}
catch (std::exception &e) {
std::cerr << "Exception in server: " << e.what() << std::endl;
}

return 0;
}

client

#include <iostream>
#include <string>
#include <boost/asio.hpp>

int main() {
try {
boost::asio::io_context io_context;

// 连接到服务器
boost::asio::ip::tcp::socket socket(io_context);
socket.connect(tcp::endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 8000));

// 发送消息并接收回显
char msgToServer[BUFSIZ];
char msgFromServer[BUFSIZ];

for (int i = 0; i < 3; i++) {
std::memset(msgToServer, 0x00, sizeof(msgToServer));
std::memset(msgFromServer, 0x00, sizeof(msgFromServer));
std::string tmp = "hello";

strcpy(msgToServer, tmp.c_str());
boost::asio::write(socket, boost::asio::buffer(msgToServer));
std::cout << "client to server: " << msgToServer << std::endl;
boost::asio::read(socket, boost::asio::buffer(msgFromServer));
std::cout << "client from server: " << msgFromServer << std::endl;

std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
catch (std::exception &e) {
std::cerr << "Exception in client: " << e.what() << std::endl;
}

return 0;
}

多路I/O转接服务器

多路IO转接服务器也叫做多任务IO服务器。该类服务器实现的主旨思想是,不再由应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件

select

缺点:

  • 监听上限受文件描述符限制。最大1024
  • 检测满足条件的fd,自己添加业务逻辑提高较小。提高了编码难度

优点:

  • 跨平台

poll

缺点:

  • 不能跨平台,只能在Linux下使用
  • 无法直接定位满足监听事件的文件描述符,编码难度较大

优点:

  • 自带数据结构,可以将监听事件结合和返回事件集合分离
  • 拓展监听上限,超出1024限制

epoll

epoll是Linux下多路复用IO接口select/poll的增强版本,他能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要监听的文件描述符集合,另一点原因就是获取事件的时候,它无须遍历整个被监听的文件描述符集,只要遍那些被内核IO时间异步唤醒而加入Ready队列的文件描述符集合就行

epoll除了提供select/poll那种IO时间的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率

epoll函数原型:

  • int epoll_create(int size);
  • int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

参数:

  • size:创建的红黑树的监听节点数量(仅供内核参考)
  • epfd:epoll_create函数的返回值
  • op:对该监听红黑树所做的操作
    • EPOLL_CTL_ADD:添加fd到监听红黑树
    • EPOLL_CTL_MOD:修改fd在监听红黑树上的监听事件
    • EPOPP_CTL_DEL:将一个fd从监听红黑树上摘下(取消监听)
  • fd:待监听的fd
  • event:本质是struct epoll_event结构体地址
    • events:EPOLLIN / EPOLLOUT / EPOLLERROR等
    • data:联合体(fd、ptr、uint32_t、uint64_t)
  • events:传出参数,传出满足监听条件的fd结构体
  • maxevents:数组元素总个数
  • timeout:
    • -1:阻塞
    • 0:不阻塞
    • >0:超时时间(毫秒)

优点:高效,突破1024文件描述符

缺点:不能跨平台,Linux

事件模型

ET模式

ET模式即Edge Triggered工作模式,边缘触发模式

基于非阻塞文件语柄

只有当read或write返回EAGAIN(非阻塞读,暂时无数据)时才需要挂起、等待。但这并不是说每次read时都需要循环读,直到读产生一个EAGAIN才认为此事件处理完成,当read返回的读到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成

LT模式

LT模式即Level Triggered工作模式,水平触发(默认采用模式)

与ET模式不同的是,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll,无论后面的数据是否被使用

LT与ET比较

LT是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式变成出错的可能性小一点。传统的select/poll都是这种模型的代表

ET是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已准备就绪,并且不会再为那个文件描述符发送更多的就绪通知。请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only-once)

ET和LT区别在于残留数据是否会进行调用

epoll反应堆模型

epoll ET模式+非阻塞、轮询+void *ptr

在反应堆模型下:不但要监听cfd的读事件,还要监听cfd的写事件

线程池模型

  • 预先创建阻塞于accept多线程,使用互斥锁上锁保护accept
  • 预先创建多线程,由主线程调用accept
线程池
struct threadpoll_t {
pthread_mutex_t lock; // 用于锁住本结构体
pthread_mutex_t thread_counter; // 记录忙状态线程个数的锁

pthread_cond_t queue_not_full; // 当任务队列满时, 添加任务的线程阻塞, 等待此条件变量
pthread_cond_t queue_not_empty; // 任务队列里不为空时, 通知等待任务的线程

pthread_t *threads; // 存放线程池中每个线程的tid, 数组
pthread_t adjust_tid; // 存管理线程tid
threadpool_task_t *task_queue; // 任务队列(数组首地址)

int min_thr_num; // 线程池最小线程数
int max_thr_num; // 线程池最大线程数
int live_thr_num; // 当前存活线程个数
int busy_thr_num; // 忙状态线程个数
int wait_exit_thr_num; // 要销毁的线程个数

int queue_front; // task_queue队头下标
int queue_rear; // task_queue队尾下标
int queue_size; // task_queue队中实际任务数
int queue_max_size; // task_queue队列可容纳任务数上限

int shutdown; // 标志位, 线程池使用状态, true或false
};

UDP服务器

传输层主要应用的协议模型有两种:一种是TCP协议,另一种则是UDP协议。TCP协议在网络通信中占主导地位,绝大多数的网络通信记住TCP协议完成数据传输。但UDP也是网络通信中不可或缺的重要通信手段

相较于TCP而言,UDP通信形式更像是发送短信。不需要在数据传输之前建立、维护连接。只专心获取数据就好。省去了三次握手的过程,通信速度可以大大提高,但与之伴随的通信的稳定性和正确率便得不到保证。因此,我们称UDP为”无连接的不可靠报文传递”

UDP的优点和不足:由于无需创建连接,所以UDP开销较小,数据传输速度快,实用性较强。多用于对实时性要求较高的通信场合。但也伴随这数据传输的不可靠,传输数据的正确率、传输顺序和流量都得不到控制和保证

与TCP类似的,UDP也有可能出现缓冲区被填满后,再接收数据时丢包的现象。由于它没有TCP滑动窗口的机制,通常采用如下两种方法解决:

  • 服务器应用层设计流量控制,控制发送数据速度
  • 借助setsockopt函数改变接收缓冲区大小

C/S模型-UDP

server

// server
#include <iostream>
#include <boost/asio.hpp>

int main() {
try {
// 启动UDP服务器
std::cout << "UDP Server Start..." << std::endl;
// 创建io上下文对象
boost::asio::io_context ioContext;
// 创建UDP下的通信socket
boost::asio::ip::udp::socket serverSocket(ioContext, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 8000));

while (true) {
// 用于接收和发送数据
char messageFromClient[BUFSIZ] = {0};
char messageToClient[BUFSIZ] = {0};
// ep为保存的端口信息
boost::asio::ip::udp::endpoint ep;
// ec为接收发送端发送信息时是否出错
boost::system::error_code ec;
serverSocket.receive_from(boost::asio::buffer(messageFromClient), ep, 0, ec);
if (ec && ec != boost::asio::error::message_size) {
throw boost::system::system_error(ec);
}

// 将接收到的信息转化为大写
for (int i = 0; i < std::strlen(messageFromClient); i++) {
messageToClient[i] = (char) toupper(messageFromClient[i]);
}

std::cout << messageToClient << " send to port: " << ep.port() << std::endl;
// 将转换后的信息通过保存的端口信息返回给客户端
serverSocket.send_to(boost::asio::buffer(messageToClient), ep);

// 将用于接收和发送数据的空间置空
memset(messageToClient, '\0', BUFSIZ);
memset(messageFromClient, '\0', BUFSIZ);
}

} catch (std::exception &e) {
std::cout << "exception: " << e.what() << std::endl;
}
}

client

// client
#include <iostream>
#include <boost/asio.hpp>

int main() {
try {
// 启动UDP客户端
std::cout << "UDP Client Start..." << std::endl;
// 创建io上下文对象
boost::asio::io_context ioContext;
// 创建UDP下的通信socket
boost::asio::ip::udp::socket clientSocket(ioContext);
// 打开套接字, 多用于UDP和ICMP
clientSocket.open(boost::asio::ip::udp::v4());

// 用于接收和发送数据
char messageToServer[BUFSIZ] = {0};
char messageFromServer[BUFSIZ] = {0};

while (true) {
// 从标准输入流输入message信息
std::cin >> messageToServer;
// 将信息通过发送端口发送给服务器
clientSocket.send_to(boost::asio::buffer(messageToServer), boost::asio::ip::udp::endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 8000));
boost::asio::ip::udp::endpoint ep;
clientSocket.receive_from(boost::asio::buffer(messageFromServer), ep);

std::cout << "recv from port: " << ep.port() << std::endl;
std::cout << messageFromServer << std::endl;

memset(messageToServer, '\0', BUFSIZ);
memset(messageFromServer, '\0', BUFSIZ);
}


} catch (std::exception &e) {
std::cout << "exception: " << e.what() << std::endl;
}

return 0;
}

asio库高级

同步读写

// server
// 通过多线程实现并发
#include <iostream>
#include <boost/asio.hpp>
#include <set>
#include <memory>

// 最大长度
const int MAX_LENGTH = 1024;
typedef std::shared_ptr<boost::asio::ip::tcp::socket> socket_ptr;
std::set<std::shared_ptr<std::thread>> thread_set;

// 创建session函数
void Session(socket_ptr socketPtr) {
try {
// 读取数据
char msgFromClient[MAX_LENGTH];
// 回传数据
char msgToClient[MAX_LENGTH];
boost::system::error_code ec;
while (true) {
memset(msgToClient, '\0', MAX_LENGTH);
memset(msgFromClient, '\0', MAX_LENGTH);

socketPtr->read_some(boost::asio::buffer(msgFromClient, MAX_LENGTH), ec);
if (ec == boost::asio::error::eof) {
std::cout << "connection closed by peer" << std::endl;
return;
} else if (ec) {
throw boost::system::system_error(ec);
}

std::cout << "client: " << socketPtr->remote_endpoint().address().to_string() << "; msgFromClient: " << msgFromClient << std::endl;

std::string tmp = std::string(msgFromClient);
std::transform(tmp.begin(), tmp.end(), tmp.begin(), ::toupper);
strcpy(msgToClient, tmp.c_str());

boost::asio::write(*socketPtr, boost::asio::buffer(msgToClient, MAX_LENGTH));
std::cout << "msgToClient: " << msgToClient << std::endl;
}
} catch (std::exception &e) {
std::cerr << "Exception in session: " << e.what() << std::endl;
}
}

void Server(boost::asio::io_context &ioContext, unsigned short port) {
boost::asio::ip::tcp::acceptor acceptor(ioContext, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port));
while (true) {
socket_ptr socketPtr(new boost::asio::ip::tcp::socket(ioContext));
acceptor.accept(*socketPtr);
auto t = std::make_shared<std::thread>(Session, socketPtr);
thread_set.insert(t);
}
}

int main() {
try {
boost::asio::io_context ioContext;
Server(ioContext, 8000);
for (auto &t: thread_set) {
t->join();
}
} catch (std::exception &e) {
std::cout << "exception: " << e.what() << std::endl;
}

return 0;
}
// client
#include <iostream>
#include <boost/asio.hpp>

// 最大长度
const int MAX_LENGTH = 1024;

int main() {
try {
// 创建上下文服务ioContext
boost::asio::io_context ioContext;
// 创建连接点endpoint
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 8000);
// 创建socket
boost::asio::ip::tcp::socket clientSocket(ioContext);
// 创建错误提示码
boost::system::error_code ec = boost::asio::error::host_not_found;
// 连接服务器
clientSocket.connect(endpoint, ec);
if (ec) {
std::cout << "connect failed, code is " << ec.value() << "error msg is " << ec.message() << std::endl;
return 0;
}

char messageFromServer[MAX_LENGTH];
char messageToServer[MAX_LENGTH];
while (true) {
memset(messageFromServer, '\0', MAX_LENGTH);
memset(messageToServer, '\0', MAX_LENGTH);

std::cin >> messageToServer;
boost::asio::write(clientSocket, boost::asio::buffer(messageToServer, MAX_LENGTH));

boost::asio::read(clientSocket, boost::asio::buffer(messageFromServer, MAX_LENGTH));
std::cout << "message from server: " << messageFromServer << std::endl;
}
} catch (std::exception &e) {
std::cout << "exception: " << e.what() << std::endl;
}

return 0;
}
  • 同步读写的缺陷在于读写时阻塞的,如果客户端对端不发送数据服务器的read操作是阻塞的,这将导致服务器处于阻塞等待状态
  • 可以通过开辟新的线程为新生成的连接处理读写,但是一个进程开辟的线程是有限的,约为2048个线程,在Linux环境可以通过unlimit增加一个进程的线程数,但是线程过多也会导致切换消耗的时间片过多
  • 该服务器和客户端为应答式,实际场景为全双工通信模式,发送和接收要独立分开
  • 该服务器和客户端未考虑粘包处理

异步读写

// server
// Session.h
#pragma once
#ifndef SERVER_SESSION_H
#define SERVER_SESSION_H

#include <boost/asio.hpp>
#include <iostream>

class Session {
public:
Session(boost::asio::io_context &ioContext) : _socket(ioContext) {

}

boost::asio::ip::tcp::socket &Socket() {
return _socket;
}

void Start();

private:
void handle_read(const boost::system::error_code &error, size_t bytes_transrred);

void handle_write(const boost::system::error_code &error);

boost::asio::ip::tcp::socket _socket;
enum {
max_length = 1024
};
char _data[max_length];

};


class Server {
public:
Server(boost::asio::io_context &ioContext, short port);
private:
void start_accept();
void handle_accept(Session *new_session, const boost::system::error_code &error);
boost::asio::io_context &_ioContext;
boost::asio::ip::tcp::acceptor _acceptor;
};

#endif //SERVER_SESSION_H
//Session.cpp
#include "Session.h"

void Session::Start() {
memset(_data, 0, max_length); // 存储归零
// 异步读函数
_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2));

}

void Session::handle_read(const boost::system::error_code &error, size_t bytes_transferred) {
if (!error) {
// 读正常, echo
std::cout << "server receive data is " << _data << std::endl;
boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transferred), std::bind(&Session::handle_write, this, std::placeholders::_1));
} else {
// 读出错, 存在多次析构隐患
std::cout << "read error: " << error.message() << std::endl;
delete this;
}
}

void Session::handle_write(const boost::system::error_code &error) {
if (!error) {
memset(_data, 0, max_length);
// 进行异步读事件
_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2));
} else {
// 写出错
std::cout << "write error: " << error.message() << std::endl;
delete this;
}
}

Server::Server(boost::asio::io_context &ioContext, short port) : _ioContext(ioContext), _acceptor(ioContext, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)) {
std::cout << "Server start success, on port: " << port << std::endl;
start_accept();
}

void Server::start_accept() {
// new_session会创建一个session会话, 其中包含封装好的socket对象
Session *new_session = new Session(_ioContext);
_acceptor.async_accept(new_session->Socket(), std::bind(&Server::handle_accept, this, new_session, std::placeholders::_1));
}

void Server::handle_accept(Session *new_session, const boost::system::error_code &error) {
if (!error) {
new_session->Start();
} else {
delete new_session;
}
start_accept();
}
// main.cpp
#include <iostream>
#include "Session.h"

int main() {
try {
boost::asio::io_context ioContext;
Server s(ioContext, 8000);
ioContext.run();
} catch (std::exception &e) {
std::cout << "exception: " << e.what() << std::endl;
}
return 0;
}
  • 该服务器的发送和接受以应答的方式交互,而并不能做到应用层随意发送的目的,也就是未做到完全的收发分离(全双工逻辑)
  • 该服务器未处理粘包、序列化以及逻辑和收发线程解耦问题
  • 该服务器存在二次析构风险

利用伪闭包延长生命周期

C++中没有闭包机制,在Golang中可以通过闭包机制延长变量的声明周期

func DeferReturn() (res int) {
defer func() {
res++
log.Println(res)
}()
return 0
}

在以上Go代码中,我们可以发现延时函数调用了外部的res变量,所以在延时函数没有结束之前res不会被释放,即使主函数已经结束也不会释放res

在C++中我们可以通过智能指针的引用计数方式传入函数对象,如果函数对象不被释放则智能指针不被释放

// Session.h
#pragma once
#ifndef SERVER_SESSION_H
#define SERVER_SESSION_H

#include <boost/asio.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <iostream>
#include <map>

class Server;

class Session : public std::enable_shared_from_this<Session> {
// 会话类, 用于接收发送数据
public:
Session(boost::asio::io_context &ioContext, Server *server) : _socket(ioContext), _server(server) {
// 构造函数, socket绑定ioc
boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
_uuid = boost::uuids::to_string(a_uuid);
}

boost::asio::ip::tcp::socket &Socket() {
// 获取当前socket变量
return _socket;
}

void Start(); // 监听客户端的读和写

std::string &GetUuid();

private:
// 回调函数是作为参数传递给另一个函数, 并在函数执行完毕后被调用
void handle_read(const boost::system::error_code &error, size_t bytes_transferred, std::shared_ptr<Session> _self_shared); // 读回调函数
void handle_write(const boost::system::error_code &error, std::shared_ptr<Session> _self_shared); // 写回调函数

enum {
max_length = 1024 // 最大长度
};
boost::asio::ip::tcp::socket _socket; // 在服务端创建session类后, session需要管理一个tcp连接
char _data[max_length]; // 接收数据
Server *_server;
std::string _uuid;
};


class Server {
// 服务类, 用于维护连接
public:
Server(boost::asio::io_context &ioContext, short port);

void ClearSession(std::string uuid);

private:
void start_accept();

void handle_accept(std::shared_ptr<Session> new_session, const boost::system::error_code &error);

boost::asio::io_context &_ioContext; // ioContext不允许拷贝构造, 我们通过引用方式
boost::asio::ip::tcp::acceptor _acceptor;
std::map<std::string, std::shared_ptr<Session>> _sessions;
};

#endif //SERVER_SESSION_H
// Session.cpp
#include "Session.h"


// brief: 用于开启服务器
// param: NULL
// ret: NULL
void Session::Start() {
memset(_data, 0, max_length); // 存储归零
// 异步监听读事件, 并在接收到读事件后调用读回调函数, 将类的成员函数绑定为普通函数对象
_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, shared_from_this()));

}

std::string &Session::GetUuid() {
return _uuid;
}

// brief: 服务器的读回调函数, 用于读取异步上下文中的数据
// param: error: 在读取数据过程中的错误码
// bytes_transferred: 服务器实际读了多少数据
// ret: NULL
void Session::handle_read(const boost::system::error_code &error, size_t bytes_transferred, std::shared_ptr<Session> _self_shared) {
if (!error) {
// 读正常, echo
std::cout << "server receive data is " << _data << std::endl;
// 异步写事件, 将刚才收到的数据发回去, 绑定写回调函数
boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transferred), std::bind(&Session::handle_write, this, std::placeholders::_1, _self_shared));
} else {
// 读出错, 存在多次析构隐患
std::cout << "read error: " << error.message() << std::endl;
_server->ClearSession(_uuid);
}
}

// brief: 服务器的写回调函数, 用于像异步上下文中写入数据
// param: error: 在写入数据过程中的错误码
// ret: NULL
void Session::handle_write(const boost::system::error_code &error, std::shared_ptr<Session> _self_shared) {
if (!error) {
// 数据清零
memset(_data, 0, max_length);
// 异步读事件, 并在接收到读事件后调用读回调函数
_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, _self_shared));
} else {
// 写出错
std::cout << "write error: " << error.message() << std::endl;
_server->ClearSession(_uuid);
}
}

// brief: 用于创建并启动服务器
// param: ioContext: 上下文对象, 不允许拷贝构造在成员列表中为引用方式, 需要使用初始化列表方式赋值, 用于网络通信间的数据传递
// port: 端口, 用于指定服务器启动的端口地址
// ret: NULL
Server::Server(boost::asio::io_context &ioContext, short port) : _ioContext(ioContext), _acceptor(ioContext, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)) {
std::cout << "Server start success, on port: " << port << std::endl;
start_accept();
}

// brief: 用于启动描述符
// param: NULL
// ret: NULL
void Server::start_accept() {
std::shared_ptr<Session> new_session = std::make_shared<Session>(_ioContext, this);
// new_session会创建一个session类对象, 其中包含封装好的socket对象
// Session *new_session = new Session(_ioContext);
_acceptor.async_accept(new_session->Socket(), std::bind(&Server::handle_accept, this, new_session, std::placeholders::_1));
}

// brief: 回调函数, 当客户端需要连接时触发回调
// param: *new_session:
// error: 捕获错误信息
// ret: NULL
void Server::handle_accept(std::shared_ptr<Session> new_session, const boost::system::error_code &error) {
if (!error) {
new_session->Start();
_sessions.insert(std::make_pair(new_session->GetUuid(), new_session));
} else {
// delete new_session;
}
start_accept(); // 使Server接收新的连接
}

void Server::ClearSession(std::string uuid) {
_sessions.erase(uuid);
}

增加发送队列实现全双工通信

异步发送过程中,TCP底层的发送缓冲区不能把我们的数据进行全部发送,在二次发送过程中会出现数据错乱。我们引入发送队列保证发送的时序性

// 设计一个数据节点
class MsgNode {
friend class CSession;
public:
MsgNode(char *msg, int max_len) {
m_data = new char[max_len];
memcpy(m_data, msg, max_len);
}

~MsgNode() {
delete[] m_data;
}

private:
int m_cur_len; // 表示数据当前已处理的长度, 已发送的数据或已接收的数据长度
int m_max_len; // 表示数据的总长度
char *m_data; // 表示数据域, 已接收或已发送的数据都放在此空间内
};

在企业级开发中,通常发送接口和回调函数的接口不在一个线程中,为了保证队列访问的安全性,我们需要为队列增加一个锁

void Send(char *msg, int max_length);
std::queue<std::shared_ptr<MsgNode>> m_send_que;
std::mutex m_send_lock;

粘包问题

粘包问题是服务器发送数据常遇到的一个现象,当客户端发送多个数据包服务器时,服务器底层的TCP接受缓冲区收到的数据为粘在一起的

因为TCP面向字节流传输的,TCP只保证发送数据的准确性和顺序性,字节流以自己为单位,客户端每次发送N个字节给服务端,N取决于当前客户端的发送缓冲区是否有数据,比如发送缓冲区总大小为10个字节,当前有5个字节数据未发完,那么此时只有5个字节空闲空间,我们再次调用发送接口时会出现上次数据与本次数据的粘连

粘包原因:

  • 客户端的发送频率远高于服务器的接受频率,就会导致数据在服务器的TCP接收缓冲区滞留形成粘连,比如客户端1s内连续发送了两个hello world!,服务器过了2s才接收数据,那一次性读出两个hello world!
  • TCP底层的安全和效率机制不允许字节数特别少的小包发送频率过高,TCP会在底层累计数据到一定大小一起发送,比如连续发送1字节的数据要累计到多个字节才发送,可以了解下TCP底层的Nagle算法
  • 发送缓冲区有上次未发送完的数据或接受端的缓冲区里有未取出的数据导致数据粘连

处理粘包:处理粘包的方式主要采用应用层定义收发包格式的方式,这个过程俗称切包处理,常用的协议被称为tlv协议(消息id+消息长度+消息内容),我们简化为消息长度+消息内容

async_read_some这个函数的特点是只要对端发数据,服务器就接受数据,即使没有收全对端发送的数据也会触发HandleRead函数,所以我们会在HandleRead回调函数里判断接受的自己数,接受的数据可能不满足头部长度,可能大于头部长度但小于消息体的长度,可能大于消息体的长度,还可能大于多个消息体的长度,所以要切包等,这些逻辑写起来复杂,所以我们可以通过读取指定字节数,直到读完这些字节才触发回调函数,那么可以采用async_read函数,这个函数指定读取字节数,只有读完才会触发回调函数

字节序处理和消息队列的控制

字节序问题:

在计算机网络中,由于不同的计算机使用的CPU架构和字节序顺序可能不同,因此在传输数据时需要对数据的字节序进行统一,以保证数据能够正常传输和解析。这就是网路字节序的作用,具体来说,计算机内部存储的方式有两种:大端序(Big-Endian)和小端序(Little-Endian)。在大端序中,高位自己存储在低地址处,而低位字节存储在高地址处;在小端序中,高位自己存储在高地址处,而低位字节存储在低地址处。在网络通信中,通常使用大端序,这是因为早期的网络硬件大多采用了Motorda处理器,而Motorola处理器使用的是大端序。此外,大多数网络通信协议规定了网络字节序必须为大端序,因此,在进行网路编程时,需要将主机字节序转换为网络字节序,也就是将数据从本地字节序转化为大端序。可以使用htonl、htons、ntohl好ntohs等函数来实现

// 判断当前系统字节序时大端序还是小端序
bool isBigEndian() {
int num = 1;
if (*(char *)&num == 1) {
// 当前系统为小端序
return false;
} else {
// 当前系统为大端序
return true;
}
}

为了保证字节序一致性,网络传输使用网络字节序,也就是大端序模型

在boost::asio库中,可以使用boost::asio::detail::socket_ops::host_to_network_long()boost::asio::detail::socket_ops::host_to_network_short()函数主机字节序转化为网络字节序

  • host_to_network_long()函数将一个32位无符号整数从主机字节序转化为网络字节
  • host_to_network_short()函数将一个16位无符号整数从主机字节序转化为网络字节序

protobuf配置和使用

protobuf简介:Protocol Buffers(简称protobuf)是一种轻便高效的序列化数据结构协议,有Google开发。它可以用于将结构化的数据序列化到二进制格式,并广泛用于数据存储、通信协议、配置文件等领域

我们的逻辑是有类等抽象的数据构成的,而TCP是面向字节流的,我们需要将类结构序列化为字符串来传输

gRPC是一个高性能、开源和通用的RPC框架,使用Protocol Buffers作为序列化协议。它可以在任何地方运行,并连接各种语言和平台。常被使用在服务端与服务端之间通信

// 设置为proto3版本
syntax = "proto3";

message Book {
string name = 1;
int32 pages = 2;
float price = 3;
}

在终端中执行:protoc -cpp_out=. ./msg.proto会生成msg.pb.h和msg.pb.cc两个文件,这两个文件就是我们要用到的头文件和cpp文件

#include <iostream>
#include "msg.pb.h"

int main() {
Book book;
book.set_name("CPP programing");
book.set_pages(100);
std::string bookstr;
book.SerializeToString(&bookstr);
std::cout << "serialize str is " << bookstr << std::endl;
Book book2;
book2.ParseFromString(bookstr);
std::cout << "book2 name is " << book2.name() << "price is " << book2.price() << "pages is " << book2.pages() << std::endl;
getchar();
}

jsoncpp配置和使用

jsoncpp简介:jsoncpp是一个C++JSON库,它提供了将JSON数据解析为C++,将C++对象序列化为JSON数据的功能,它支持所有主流操作系统(包括Windows、Linux、MacOS等),并且可以与常见编译器(包括Visual Studio、GCC等)兼容

jsoncpp库是以源代码形式发布的,因此使用者需要自己构建和链接库文件。该文件不依赖于第三方库,只需要包含头文件即可使用

cmake_minimum_required(VERSION 3.25)
project(test)

set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_COMPILER g++)

add_compile_options(-std=c++11)

include_directories(
/opt/homebrew/Cellar/jsoncpp/1.9.5/include
)

link_directories(
/opt/homebrew/Cellar/jsoncpp/1.9.5/lib
)

add_executable(test main.cpp)

target_link_libraries(test PRIVATE jsoncpp)

jsoncpp库的特点:

  • 轻量级:JSON解析器和序列化器都非常快速,不会占用太多的CPU和内存资源
  • 易于使用:提供简单的API,易于理解和使用
  • 可靠性高:经过广泛测试,已被许多企业和开发者用于声生产环境中
  • 开源免费:遵循MIT许可证发布,使用和修改均免费
#include <iostream>
#include "json/json.h"
#include "json/value.h"
#include "json/reader.h"


int main() {
Json::Value root;
root["id"] = 1001;
root["data"] = "hello world";
std::string request = root.toStyledString();
std::cout << "request: " << request << std::endl;

Json::Value root2;
Json::Reader reader;
reader.parse(request, root2);
std::cout << "msg id: " << root2["id"] << std::endl;
std::cout << "msg data: " << root2["data"] << std::endl;

return 0;
}

jsoncpp在网络编程中的应用

在客户端发送数据时对发送的数据进行序列化

// client
Json::Value root;
root["id"] = 1001;
root["data"] = "hello world";
sts::string request = root.toStyleString();
size_t request_length = request.length(); // 计算要发送的字节序长度
char send_data[MAX_LENGTH] = {0};
// 转为网络字节序
int request_host_length = boost::asio::detail::socket_ops::host_to_network_short(request_length);
memcpy(send_data, &request_host_length, 2);
memcpy(send_data + 2, request.c_str(), request_length);
boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 2));

服务器接收到数据之后需要转换为本地字节序进行操作

// server
char reply_head[HEAD_LENGTH];
size_t reply_length = boost::asio::read(socket, boost::asio::buffer(reply_head, HEAD_LENGTH));
short msglen = 0;
memcpy(&msglen, reply_head, HEAD_LENGTH);
// 转为本地字节序
msglen = boost::asio::detail::socket_ops::network_to_host_short(msglen);
char msg[MAX_LENGTH] = {0};
size_t msg_length = boost::asio::read(socket, boost::asio::buffer(msg, msglen));
Json::Reader reader;
reader.parse(std::string(msg, msg_length), root);
std::cout << "msg id: " << root[]

服务器逻辑层设计和消息完善

本节概述了基于boost::asio实现的服务器逻辑层结构,并且完善之前设计的消息结构。因为为了简化粘包处理,我们简化了发送数据的结构,这次我们给出完整的消息设计,以及服务架构设计

在公司实际开发中,我们通常将逻辑系统设计为单线程模式,因为在逻辑系统操作时会读某些共享资源进行访问,如果设计为多线程模式就需要频繁的进行加锁和解锁,这样也会消耗大量资源,所以我们在设计逻辑系统是设计为单线程,例如Redis

父类中将默认构造函数和拷贝构造函数均置为private后,子类也就没有了默认构造和拷贝构造函数,子类就变成了单例类

逻辑层
**消息头完善:**在之前的消息头仅包含数据与和长度,但是要进行逻辑处理,就需要传递一个id字段表示要处理的消息id,当然可以不在包首部传id字段,将id字段序列化到消息体也是可以的,但是我们为了便于处理也便于回调逻辑层对应函数,最好是将id写入包首部
tlv消息格式

为了减少耦合和歧义,重新设计消息节点

  • MsgNode:消息节点的基类,头部的消息用这个结构体储存
  • RecvNode:接受消息节点
  • SendNode:发送消息节点
#ifndef JSONSERVER_MSGNODE_H
#define JSONSERVER_MSGNODE_H

#include <string>
#include <iostream>
#include "boost/asio.hpp"
#include "../include/JsonServer/const.h"

class MsgNode {
public:
MsgNode(short max_len) : m_total_len(max_len), m_cur_len(0) {
m_data = new char[m_total_len + 1]();
m_data[m_total_len] = '\0';
}

~MsgNode() {
std::cout << "destruct MsgNode" << std::endl;
delete m_data;
}

void Clear() {
std::memset(m_data, 0, m_total_len);
m_cur_len = 0;
}

// 当前长度
short m_cur_len;
// 总长度
short m_total_len;
// 数据域起始地址
char *m_data;
};

class RecvNode : public MsgNode {
public:
RecvNode(short max_len, short msg_id);

private:
short m_msg_id;
};

class SendNode : public MsgNode {
public:
SendNode(const char *msg, short max_len, short msg_id);

private:
short m_msg_id;
};


#endif //JSONSERVER_MSGNODE_H
#include "../include/JsonServer/MsgNode.h"

RecvNode::RecvNode(short max_len, short msg_id) : MsgNode(max_len), m_msg_id(msg_id){

}

SendNode::SendNode(const char *msg, short max_len, short msg_id) : MsgNode(max_len + HEAD_TOTAL_LEN), m_msg_id(msg_id){
// 实现深拷贝将msg拷贝到m_data
// 先发送id
short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(m_msg_id);
memcpy(m_data, &msg_id_host, HEAD_ID_LEN);
// 在发送长度
short msg_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);
memcpy(m_data + HEAD_ID_LEN, &msg_len_host, HEAD_DATA_LEN);
// 拷贝消息体
memcpy(m_data + HEAD_TOTAL_LEN, msg, max_len);
}

利用单例逻辑实现逻辑类

我们可以发现在消息处理的逻辑层中使用了一种单例模式,通过该模式使得对处理队列中的数据是依次有序的,该模式也在Redis的开发中被应用

单例模式:

  • 局部静态变量

    // 通过静态成员变量实现单例
    // 懒汉式
    class Single2 {
    private:
    Single2(){

    }

    Single2(const Single2 &) = delete;
    Single2 &operator=(const Single2 &) = delete;

    public:
    static Single2 &GetInst() {
    static Signle2 single;
    return single;
    }
    };

    上述代码通过局部静态成员single实现单例类,原理就是函数的局部静态变量生命周期随着进程结束而结束,但是在多线程中可能会出现问题,其调用方法&Single2::GetInst()

  • 静态成员变量指针方式

    // 饿汉式
    class Single2Hungry {
    private:
    Single2Hungry() {

    }

    Single2Hungry(const Single2Hungry &) = delete;
    Single2Hungry &operator=(const S ingle2Hungry &) = delete;

    static Single2Hungry *single;
    public:
    static Single2Hungry *GetInst() {
    if (single == nullptr) {
    single = new Single2Hungry();
    }
    return single;
    }
    };

    饿汉式对比懒汉式的好处在于避免了线程安全问题,无论是在多线程还是单线程模式下,通过静态成员变量的指针实现单例类都是唯一的。缺点是将内存释放交给了用户处理,其初始化方式Single2Hungry *Single2Hungry::single = Single2Hungry::GetInst();

  • 智能指针方式

    // 利用智能指针解决释放问题
    class SingleAuto {
    private:
    SingleAuto() {

    }

    SingleAuto(const SingleAuto &) = delete;
    SingleAuto &operator=(const SingleAuto &) = delete;

    static std::shared_ptr<SingleAuto> single;
    static mutex s_mutex;

    public:
    ~SingleAuto() {
    cout << "single auto delete success " << endl;
    }

    static std::shared_ptr<SingleAuto> GetInst() {
    if (single != nullptr) {
    return single;
    }
    s_mutex.lock();
    if (single != nullptr) {
    s_mutex.unlock();
    return single;
    }
    single = std::shared_ptr<SingleAuto>(new SingleAuto);
    s_mutex.unlock();

    return single;
    }
    };

    SingleAuto的GetInst返回std::shared_ptr<SingleAuto>类型的变量single。因为single是静态成员变量,所以会在进程结束时被回收。智能指针被回收时会调用内置指针类型的析构函数,从而完成内存的回收。智能指针方式不存在内存泄露,但是有一个隐患就是单例类的析构函数是public的,如果被人手动调用会存在崩溃问题

  • 辅助类智能指针单例模式

    智能指针在构造的时候可以指定删除器,所以可以传递一个辅助类或者辅助函数帮助智能指针回收内存时调用我们指定的析构函数

    // safe deletor
    //防止外界delete
    //声明辅助类
    //该类定义仿函数调用SingleAutoSafe析构函数
    //不可以提前声明SafeDeletor,编译时会提示incomplete type
    // class SafeDeletor;
    //所以要提前定义辅助类
    class SingleAutoSafe;
    class SafeDeletor {
    public:
    void operator()(SingleAutoSafe *sf)
    {
    cout << "this is safe deleter operator()" << endl;
    delete sf;
    }
    };

    class SingleAutoSafe {
    private:
    SingleAutoSafe() {

    }
    ~SingleAutoSafe() {
    cout << "this is single auto safe deletor" << endl;
    }
    SingleAutoSafe(const SingleAutoSafe &) = delete;
    SingleAutoSafe &operator=(const SingleAutoSafe &) = delete;
    //定义友元类,通过友元类调用该类析构函数
    friend class SafeDeletor;
    public:
    static std::shared_ptr<SingleAutoSafe> GetInst() {
    if (single != nullptr) {
    return single;
    }
    s_mutex.lock();
    if (single != nullptr) {
    s_mutex.unlock();
    return single;
    }
    //额外指定删除器
    single = std::shared_ptr<SingleAutoSafe>(new SingleAutoSafe, SafeDeletor());
    //也可以指定删除函数
    // single = std::shared_ptr<SingleAutoSafe>(new SingleAutoSafe, SafeDelFunc);
    s_mutex.unlock();
    return single;
    }
    private:
    static std::shared_ptr<SingleAutoSafe> single;
    static mutex s_mutex;
    };

    SafeDeletor要写在SingleAutoSafe上边,并且SafeDeletor要声明为SingleAutoSafe类的友元类,这样就可以访问SingleAutoSafe的析构函数了
    我们在构造single时制定了SafeDeletor(),single在回收时,会调用SingleAutoSafe的仿函数,从而完成内存的销毁

  • 通用的单例模版类

    template <typename T>
    class Single_T {
    protected:
    Single_T() = default;
    Single_T(const Single_T<T> &st) = delete;
    Single_T &operator=(const Single_T<T> &st) = delete;
    ~Single_T() {
    cout << "this is auto safe template destruct" << endl;
    }
    public:
    static std::shared_ptr<T> GetInst() {
    if (single != nullptr) {
    return single;
    }
    s_mutex.lock();
    if (single != nullptr) {
    s_mutex.unlock();
    return single;
    }
    //额外指定删除器
    single = std::shared_ptr<T>(new T, SafeDeletor_T<T>());
    //也可以指定删除函数
    // single = std::shared_ptr<SingleAutoSafe>(new SingleAutoSafe, SafeDelFunc);
    s_mutex.unlock();
    return single;
    }
    private:
    static std::shared_ptr<T> single;
    static mutex s_mutex;
    };
    //模板类的static成员要放在h文件里初始化
    template <typename T>
    std::shared_ptr<T> Single_T<T>::single = nullptr;
    template <typename T>
    mutex Single_T<T>::s_mutex;

服务器代码:

// main.cpp
#include <iostream>
#include <boost/asio.hpp>
#include <csignal>
#include <thread>
#include "./include/CServer.h"

bool bStop = false;
std::condition_variable condQuit;
std::mutex mutexQuit;

void SigHandler(int sig) {
// 收到cmd + z信号退出服务器
if (sig == SIGINT || sig == SIGTERM) {
//
std::unique_lock<std::mutex> lockQuit(mutexQuit);
// 改变退出所需要的全局变量, 唤醒主线程后, 主线程得以跳出循环
bStop = true;
lockQuit.unlock();
// 唤醒主线程
condQuit.notify_one();
}

}

int main() {
try {
// 初始化ioContext
boost::asio::io_context ioContext;
// 创建一个线程, 该线程用于管理网络层, 初始化服务, 启动服务
std::thread newWorkThread([&ioContext] {
Server server(ioContext, 8888);
ioContext.run();
});
// 将退出信号与退出函数绑定
signal(SIGINT, SigHandler);
// 如果停止按钮为false, 未收到退出信号, 将主线程挂起, 当收到唤醒信号时继续执行
while (!bStop) {
std::unique_lock<std::mutex> lockQuit(mutexQuit);
// 使用条件变量等待退出锁
condQuit.wait(lockQuit);
}
// bStop被改为true后, 主线程唤醒并使用asio提供的stop函数
ioContext.stop();
// 回收网络线程
newWorkThread.join();
} catch (std::exception &exception) {
std::cerr << "exception: " << exception.what() << std::endl;
}
}



// CServer.h
#ifndef SERVER_CSERVER_H
#define SERVER_CSERVER_H

#include <boost/asio.hpp>
#include <memory>
#include <map>
#include <iostream>
#include "CSession.h"

class Session;

class Server {
public:
Server(boost::asio::io_context &ioContext, short port);

// 擦除连接session的uuid
void ClearSession(std::string uuid);

private:
// 回调函数, 用于等待客户端发来请求, 由asio底层进行调用
void HandleAccept(std::shared_ptr<Session> newSession, const boost::system::error_code &errorCode);

// 启动网络服务
void StartAccept();

boost::asio::io_context &m_ioContext;
short m_port;
boost::asio::ip::tcp::acceptor m_acceptor;
std::map<std::string, std::shared_ptr<Session>> m_sessions;
};

#endif //SERVER_CSERVER_H



// Cserver.cpp
#include "../include/CServer.h"

Server::Server(boost::asio::io_context &ioContext, short port) : m_ioContext(ioContext), m_port(port), m_acceptor(ioContext, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)) {
// 创建一个网络层服务对象, 其含有启动服务器方法和接受
std::cout << "Server start success, listen on port: " << m_port << std::endl;
// 开始接受客户端发来请求
StartAccept();
}

void Server::HandleAccept(std::shared_ptr<Session> newSession, const boost::system::error_code &errorCode) {
if (!errorCode) {
// 没有错误, 将newSession的uuid加入到map中, 一个uuid对应一个session
newSession->Start();
m_sessions.insert(std::make_pair(newSession->GetUuid(), newSession));
} else {
std::cout << "session accept failed, error is " << errorCode.what() << std::endl;
}
StartAccept();
}

void Server::StartAccept() {
// newSession是一个会话, 需要使用ioContext和server
std::shared_ptr<Session> newSession = std::make_shared<Session>(m_ioContext, this);
m_acceptor.async_accept(newSession->GetSocket(), std::bind(&Server::HandleAccept, this, newSession, std::placeholders::_1));
}

void Server::ClearSession(std::string uuid) {
// 将不使用的session从map中擦除
m_sessions.erase(uuid);
}



// CSession.h
#ifndef SERVER_CSESSION_H
#define SERVER_CSESSION_H

#include <iostream>
#include <boost/asio.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <queue>
#include <mutex>
#include <memory>
#include <sstream>
#include <json/json.h>
#include <json/reader.h>
#include <json/value.h>
#include "../utils/utils.h"
#include "MessageNode.h"
#include "CServer.h"
#include "LogicSystem.h"

class Server;
class LogicSystem;

class Session : public std::enable_shared_from_this<Session> {
public:
Session(boost::asio::io_context &ioContext, Server *server);

~Session();

boost::asio::ip::tcp::socket &GetSocket();

std::string &GetUuid();

void Start();

void Send(char *message, short maxLength, short messageId);

void Send(std::string message, short messageId);

void Close();

std::shared_ptr<Session> SharedSelf();

private:
void HandleRead(const boost::system::error_code &errorCode, size_t bytesTransferred, std::shared_ptr<Session> sharedPtr);

void HandleWrite(const boost::system::error_code &errorCode, std::shared_ptr<Session> sharedPtr);

boost::asio::ip::tcp::socket m_socket;
std::string m_uuid;
char m_data[MAX_LENGTH];
Server *m_server;
bool m_bClose;
std::queue<std::shared_ptr<SendNode>> m_sendQueue;
std::mutex m_sendLock;
// 收到的消息结构
std::shared_ptr<RecvNode> m_recvMessageNode;
bool m_bHeadParse;
// 收到的头结构
std::shared_ptr<MessageNode> m_recvHeadNode;
};

class LogicNode {
friend class LogicSystem;

public:
LogicNode(std::shared_ptr<Session> session, std::shared_ptr<RecvNode> recvNode);

private:
std::shared_ptr<Session> m_session;
std::shared_ptr<RecvNode> m_recvNode;
};

#endif //SERVER_CSESSION_H



// CSession.cpp
#include "../include/CServer.h"

Server::Server(boost::asio::io_context &ioContext, short port) : m_ioContext(ioContext), m_port(port), m_acceptor(ioContext, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)) {
// 创建一个网络层服务对象, 其含有启动服务器方法和接受
std::cout << "Server start success, listen on port: " << m_port << std::endl;
// 开始接受客户端发来请求
StartAccept();
}

void Server::HandleAccept(std::shared_ptr<Session> newSession, const boost::system::error_code &errorCode) {
if (!errorCode) {
// 没有错误, 将newSession的uuid加入到map中, 一个uuid对应一个session
newSession->Start();
m_sessions.insert(std::make_pair(newSession->GetUuid(), newSession));
} else {
std::cout << "session accept failed, error is " << errorCode.what() << std::endl;
}
StartAccept();
}

void Server::StartAccept() {
// newSession是一个会话, 需要使用ioContext和server
std::shared_ptr<Session> newSession = std::make_shared<Session>(m_ioContext, this);
m_acceptor.async_accept(newSession->GetSocket(), std::bind(&Server::HandleAccept, this, newSession, std::placeholders::_1));
}

void Server::ClearSession(std::string uuid) {
// 将不使用的session从map中擦除
m_sessions.erase(uuid);
}



// MessageNode.h
#ifndef SERVER_MESSAGENODE_H
#define SERVER_MESSAGENODE_H

#include <string>
#include <iostream>
#include <boost/asio.hpp>
#include "../utils/utils.h"

class MessageNode {
public:
MessageNode(short maxLen) : m_totalLen(maxLen), m_currentLen(0) {
m_data = new char[m_totalLen + 1]();
m_data[m_totalLen] = '\0';
}

~MessageNode() {
std::cout << "destruct MessageNode" << std::endl;
delete[] m_data;
}

void Clear() {
std::memset(m_data, 0, m_totalLen);
m_currentLen = 0;
}

short m_currentLen;
short m_totalLen;
char *m_data;
};

class RecvNode : public MessageNode {
friend class LogicSystem;

public:
RecvNode(short maxLen, short messageId);

private:
short m_messageId;
};

class SendNode : public MessageNode {
friend class LogicSystem;

public:
SendNode(const char *message, short maxLen, short messageId);

private:
short m_messageId;
};

#endif //SERVER_MESSAGENODE_H



// MessageNode.cpp
#include "../include/MessageNode.h"

RecvNode::RecvNode(short maxLen, short messageId) : MessageNode(maxLen), m_messageId(messageId) {

}

SendNode::SendNode(const char *message, short maxLen, short messageId): MessageNode(maxLen + HEAD_TOTAL_LEN), m_messageId(messageId) {
// 先发送id, 转为网络字节序
short messageIdHost = boost::asio::detail::socket_ops::host_to_network_short(messageId);
memcpy(m_data, &messageIdHost, HEAD_ID_LEN);
// 再发送数据长度, 转为网络字节系
short maxLenHost = boost::asio::detail::socket_ops::host_to_network_short(maxLen);
memcpy(m_data + HEAD_ID_LEN, &maxLenHost, HEAD_DATA_LEN);

// 最后发送数据
memcpy(m_data + HEAD_TOTAL_LEN, message, maxLen);
}



// LogicSystem.h
#ifndef SERVER_LOGICSYSTEM_H
#define SERVER_LOGICSYSTEM_H

#include <string>
#include <queue>
#include <thread>
#include <map>
#include <functional>
#include <json/json.h>
#include <json/value.h>
#include <json/reader.h>
#include "CSession.h"
#include "Singleton.h"
#include "../utils/utils.h"

class Session;
class LogicNode;

typedef std::function<void(std::shared_ptr<Session>, const short &messageId, const std::string &messageData)> FunCallBack;
class LogicSystem : public Singleton<LogicSystem> {
friend class Singleton<LogicSystem>;

public:
~LogicSystem();

void PostMessageToQueue(std::shared_ptr<LogicNode> message);

private:
LogicSystem();

void DealMessage();

void RegisterCallBacks();

void HelloWorldCallBack(std::shared_ptr<Session>, const short &messageId, const std::string &messageData);

std::thread m_workerThread;
std::queue<std::shared_ptr<LogicNode>> m_messageQueue;
std::mutex m_mutex;
std::condition_variable m_consume;
bool m_bStop;
std::map<short, FunCallBack> m_funcCallBacks;
};


#endif //SERVER_LOGICSYSTEM_H



// LogicSystem.cpp
#include "../include/LogicSystem.h"

LogicSystem::LogicSystem() : m_bStop(false) {
RegisterCallBacks();
m_workerThread = std::thread(&LogicSystem::DealMessage, this);
}

LogicSystem::~LogicSystem() {
m_bStop = true;
m_consume.notify_one();
m_workerThread.join();
}

void LogicSystem::PostMessageToQueue(std::shared_ptr<LogicNode> message) {
std::unique_lock<std::mutex> uniqueLock(m_mutex);
m_messageQueue.push(message);
// 当未处理的消息加入队列后队列变为1, 则通过锁可以启动
if (m_messageQueue.size() == 1) {
uniqueLock.unlock();
m_consume.notify_one();
}
}

void LogicSystem::DealMessage() {
for (;;) {
std::unique_lock<std::mutex> uniqueLock(m_mutex);
// 判断队列为空则条件变量阻塞等待, 并释放锁
while (m_messageQueue.empty() && !m_bStop) {
m_consume.wait(uniqueLock);
}

// 判断是否为关闭状态, 把所有逻辑执行完后则退出循环
if (m_bStop) {
while (!m_messageQueue.empty()) {
auto messageNode = m_messageQueue.front();
std::cout << "recv message id is " << messageNode->m_recvNode->m_messageId << std::endl;
auto callBackIter = m_funcCallBacks.find(messageNode->m_recvNode->m_messageId);
if (callBackIter == m_funcCallBacks.end()) {
m_messageQueue.pop();
continue;
}
callBackIter->second(messageNode->m_session, messageNode->m_recvNode->m_messageId, std::string(messageNode->m_recvNode->m_data, messageNode->m_recvNode->m_currentLen));
m_messageQueue.pop();
}
break;
}

// 如果没有停服, 且说明队列中有数据
auto messageNode = m_messageQueue.front();
std::cout << "recv message id is " << messageNode->m_recvNode->m_messageId << std::endl;
auto callBackIter = m_funcCallBacks.find(messageNode->m_recvNode->m_messageId);
if (callBackIter == m_funcCallBacks.end()) {
m_messageQueue.pop();
continue;
}

callBackIter->second(messageNode->m_session, messageNode->m_recvNode->m_messageId, std::string(messageNode->m_recvNode->m_data, messageNode->m_recvNode->m_currentLen));
m_messageQueue.pop();
}
}


void LogicSystem::RegisterCallBacks() {
m_funcCallBacks[MESSAGE_HELLO_WORLD] = std::bind(&LogicSystem::HelloWorldCallBack, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}

void LogicSystem::HelloWorldCallBack(std::shared_ptr<Session> session, const short &messageId, const std::string &messageData) {
Json::Reader reader;
Json::Value root;
reader.parse(messageData, root);
std::cout << "recevie message id " << root["id"].asInt() << "message data is " << root["data"].asString() << std::endl;
root["data"] = "server had received message, message data is " + root["data"].asString();
std::string returnString = root.toStyledString();
session->Send(returnString, root["id"].asInt());
}



// Singleton.h
#ifndef SERVER_SINGLETON_H
#define SERVER_SINGLETON_H

#include <memory>
#include <mutex>
#include <iostream>

template<typename T>
class Singleton {
protected:
Singleton() = default;

Singleton(const Singleton<T> &) = delete;

Singleton &operator=(const Singleton<T> &st) = delete;

static std::shared_ptr<T> m_instance;
public:
static std::shared_ptr<T> GetInstance() {
static std::once_flag s_flag;
std::call_once(s_flag, [&]() {
m_instance = std::shared_ptr<T>(new T);
});
return m_instance;
}

void PrintAddress() {
std::cout << "this is singleton destruct" << std::endl;
}
};

template<typename T>
std::shared_ptr<T> Singleton<T>::m_instance = nullptr;

#endif //SERVER_SINGLETON_H



// utils.h
#ifndef SERVER_UTILS_H
#define SERVER_UTILS_H

#define MAX_LENGTH 1024 * 2
// 头部总长度
#define HEAD_TOTAL_LEN 4
// 头部ID长度
#define HEAD_ID_LEN 2
// 头部数据长度
#define HEAD_DATA_LEN 2

#define MAX_RECVQUE 10000
#define MAX_SENDQUE 1000


enum MSG_IDS {
MESSAGE_HELLO_WORLD = 1001
};
#endif //SERVER_UTILS_H

目录结构

Server

builds

cmake-build-debug

doc

include

CServer.h

CSession.h

LogicSystem.h

Singleton.h

MessageNode.h

src

CServer.cpp

CSession.cpp

LogicSystem.cpp

MessageNode.cpp

utils

utils.h

CMakeLists.txt

main.cpp

asio多线程模型IOServicePool

简介:前面的设计,我们对asio的使用都是单线程模式,为了提升网络IO并发处理效率,这一次我们设计多线程模式下asio的使用方式,总体来说asio有两个多线程模型,第一个是起启动多线程,每个线程管理一个ioContext;第二个是只启动一个ioContext,被多个线程共享。下面我们会提到文章对比两个模式区别

单线程和多线程对比:

单线程模式
多线程模式

IOServicePool多线程模式特点:

  • 每一个io_context跑在不同的线程中,所以同一个socket会被注册在同一个io_context里,它的回调函数也会被单独的一个线程回调,那么对于同一个socket,他的回调函数每次出发都是在同一个线程里,就不会有线程安全问题,网络io层面上的并发是线程安全的
  • 对于不同的socket,回到函数的触发可能是同一个线程(两个socket被分配到同一个io_context),也不可能不是同一个线程(两个socket被分配到不同的io_context里)。所以如果两个socket对应的上层逻辑处理,如果有交互或者访问共享区,会存在线程安全问题。比如socket1代表玩家1,socket代表玩家2,玩家1和玩家2在逻辑层存在交互,比如两个玩家都在做工会任务,他们属于同一个工会,工会积分的增加就是共享区的数据,需要保证线程安全。可以通过加锁或者逻辑队列的方式解决安全问题,我们目前采取了后者
  • 多线程相比单线程极大的提高了并发能力,因为单线程仅有一个io_context服务用来监听读写时间,就绪后回调函数在一个线程里串行调用,如果一个回调函数的调用时间较长肯定会影响后续的函数调用,毕竟是串行调用。而采用多线程方式,可以在一定程度上减少前一个逻辑调用影响下一个逻辑调用的情况,比如两个socket被部署到不同的io_context上,但是当两个socket部署到同一个io_context上时仍然存在调用时间影响的问题。不过我们已经通过逻辑队列的方式将网络线程和逻辑线程耦合了,不会出现前一个调用时间影响下一个回调出发的问题