利用websocket实现单机百万连接分布式聊天系统
lipiwang 2024-11-03 15:54 21 浏览 0 评论
目前搭建聊天消息系统,最主流的有基于XMPP和webSocket实现。文本介绍的是基于webSocket实现单机百万连接的分布式聊天系统。首先介绍webSocket的概念,然后开始介绍websocket项目,以及在Nginx中配置域名做webSocket的转发,最后介绍如何搭建一个分布式系统。
一、WebSocket基本概念介绍
1、WebSocket是什么?
WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。
它的最大特点就是,服务器能够主动向客户端推送信息,客户端也能够主动向服务器发送信息,是真正的双向平等对话,属于目前服务器推送技术的其中一种。
- HTTP和WebSocket在通讯过程的比较
- HTTP和webSocket都支持配置证书,ws:// 无证书 wss:// 配置证书的协议标识
- 一般项目中webSocket使用的架构图
2、WebSocket的兼容性
- 目前已经支持webSocket的浏览器版本
- 服务端技术的支持情况
golang、java、php、node.js、python、nginx 都有不错的支持。
- Android系统和IOS系统的支持
Android系统可以使用java-webSocket对webSocket来支持。
iOS 4.2及以上版本均具有WebSockets支持。
3、为什么要用WebSocket?
(1)从业务角度考虑,需要一个主动通达客户端的能力
目前大多数的请求都是使用HTTP,都是由客户端发起一个请求,有服务端处理,然后返回结果,不可以服务端主动向某一个客户端主动发送数据;
(2)大多数场景我们需要主动通知用户,如:聊天系统、用户完成任务主动告诉用户、一些运营活动需要通知到在线的用户;
(3)可以获取用户在线状态;
(4)在没有长连接的时候通过客户端主动轮询获取数据;
(5)可以通过一种方式实现,多种不同平台(H5/Android/IOS)去使用。
4、WebSocke的建立过程
(1)客户端发起升级协议的请求
客户端发起升级协议的请求,采用标准的HTTP报文格式,在报文中添加头部信息
Connection: Upgrade表明连接需要升级
Upgrade: websocket需要升级到 websocket协议
Sec-WebSocket-Version: 13 协议的版本为13
Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA== 这个是base64 encode 的值,是浏览器随机生成的,与服务器响应的 Sec-WebSocket-Accept对应。
# Request Headers
Connection: Upgrade
Host: im.91vh.com
Origin: http://im.91vh.com
Pragma: no-cache
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA==
Sec-WebSocket-Version: 13
Upgrade: websocket
(2)服务器响应升级协议
服务端接收到升级协议的请求,如果服务端支持升级协议会做如下响应
返回:
Status Code: 101 Switching Protocols 表示支持切换协议
# Response Headers
Connection: upgrade
Date: Fri, 09 Aug 2019 07:36:59 GMT
Sec-WebSocket-Accept: mB5emvxi2jwTUhDdlRtADuBax9E=
Server: nginx/1.12.1
Upgrade: websocket
(3)升级协议完成以后,客户端和服务器就可以相互发送数据
二、如何基于webSocket实现长连接系统
1、使用go实现webSocket服务端
(1) 启动端口监听
- websocket需要监听端口,因此需要在golang 成功的 main 函数中用协程的方式去启动程序
- main.go 实现启动
go websocket.StartWebSocket()
- init_acc.go 启动程序
// 启动程序
func StartWebSocket() {
http.HandleFunc("/acc", wsPage)
http.ListenAndServe(":8089", nil)
}
(2) 升级的协议
- 客户端是通过http请求发送到服务端,我们需要对http协议进行升级为websocket协议;
- 对http请求协议进行升级 golang 库gorilla/websocket 比较成熟,直接使用即可;
- 在实际使用的时候,建议每个连接使用两个协程处理客户端请求数据和向客户端发送数据,虽然开启协程会占用一些内存,但是读取分离,减少收发数据堵塞的可能;
- init_acc.go
func wsPage(w http.ResponseWriter, req *http.Request) {
// 升级协议
conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
fmt.Println("升级协议", "ua:", r.Header["User-Agent"], "referer:", r.Header["Referer"])
return true
}}).Upgrade(w, req, nil)
if err != nil {
http.NotFound(w, req)
return
}
fmt.Println("webSocket 建立连接:", conn.RemoteAddr().String())
currentTime := uint64(time.Now().Unix())
client := NewClient(conn.RemoteAddr().String(), conn, currentTime)
go client.read()
go client.write()
// 用户连接事件
clientManager.Register <- client
}
(3) 客户端连接的管理
- 当前程序有多少用户连接,还需要对用户广播的需要,这里我们就需要一个管理者(clientManager),处理这些事件:
- 记录全部的连接、登录用户的可以通过 appId+uuid 查到用户连接
- 使用map存储,就涉及到多协程并发读写的问题,所以需要加读写锁
- 定义四个channel ,分别处理客户端建立连接、用户登录、断开连接、全员广播事件
// 连接管理
type ClientManager struct {
Clients map[*Client]bool // 全部的连接
ClientsLock sync.RWMutex // 读写锁
Users map[string]*Client // 登录的用户 // appId+uuid
UserLock sync.RWMutex // 读写锁
Register chan *Client // 连接连接处理
Login chan *login // 用户登录处理
Unregister chan *Client // 断开连接处理程序
Broadcast chan []byte // 广播 向全部成员发送数据
}
// 初始化
func NewClientManager() (clientManager *ClientManager) {
clientManager = &ClientManager{
Clients: make(map[*Client]bool),
Users: make(map[string]*Client),
Register: make(chan *Client, 1000),
Login: make(chan *login, 1000),
Unregister: make(chan *Client, 1000),
Broadcast: make(chan []byte, 1000),
}
return
}
(4) 注册客户端的socket的写的异步处理程序
- 防止发生程序崩溃,所以需要捕获异常;
- 为了显示异常崩溃位置这里使用string(debug.Stack())打印调用堆栈信息;
- 如果写入数据失败了,可能连接有问题,就关闭连接。
- client.go
// 向客户端写数据
func (c *Client) write() {
defer func() {
if r := recover(); r != nil {
fmt.Println("write stop", string(debug.Stack()), r)
}
}
defer func() {
clientManager.Unregister <- c
c.Socket.Close()
fmt.Println("Client发送数据 defer", c)
}()
for {
select {
case message, ok := <-c.Send:
if !ok {
// 发送数据错误 关闭连接
fmt.Println("Client发送数据 关闭连接", c.Addr, "ok", ok)
return
}
c.Socket.WriteMessage(websocket.TextMessage, message)
}
}
}
(5)注册客户端的socket的读的异步处理程序
- 循环读取客户端发送的数据并处理;
- 如果读取数据失败了,关闭channel。
- client.go
// 读取客户端数据
func (c *Client) read() {
defer func() {
if r := recover(); r != nil {
fmt.Println("write stop", string(debug.Stack()), r)
}
}
defer func() {
fmt.Println("读取客户端数据 关闭send", c)
close(c.Send)
}
for {
_, message, err := c.Socket.ReadMessage()
if err != nil {
fmt.Println("读取客户端数据 错误", c.Addr, err)
return
}
// 处理程序
fmt.Println("读取客户端数据 处理:", string(message))
ProcessData(c, message)
}
}
(6)接收客户端数据并处理
- 约定发送和接收请求数据格式,为了js处理方便,采用了json的数据格式发送和接收数据(人类可以阅读的格式在工作开发中使用是比较方便的)
- 登录发送数据示例:
{"seq":"1565336219141-266129","cmd":"login","data":{"userId":"马远","appId":101}}
- 登录响应数据示例:
{"seq":"1565336219141-266129","cmd":"login","response":{"code":200,"codeMsg":"Success","data":null}}
- websocket是双向的数据通讯,可以连续发送,如果发送的数据需要服务端回复,就需要一个seq来确定服务端的响应是回复哪一次的请求数据
- cmd 是用来确定动作,websocket没有类似于http的url,所以规定 cmd 是什么动作
- 目前的动作有:login/heartbeat 用来发送登录请求和连接保活(长时间没有数据发送的长连接容易被浏览器、移动中间商、nginx、服务端程序断开)
- 为什么需要AppId,UserId是表示用户的唯一字段,设计的时候为了做成通用性,设计AppId用来表示用户在哪个平台登录的(web、app、ios等),方便后续扩展
- request_model.go 约定的请求数据格式
/************************ 请求数据 **************************/
// 通用请求数据格式
type Request struct {
Seq string `json:"seq"` // 消息的唯一Id
Cmd string `json:"cmd"` // 请求命令字
Data interface{} `json:"data,omitempty"` // 数据 json
}
// 登录请求数据
type Login struct {
ServiceToken string `json:"serviceToken"` // 验证用户是否登录
AppId uint32 `json:"appId,omitempty"`
UserId string `json:"userId,omitempty"`
}
// 心跳请求数据
type HeartBeat struct {
UserId string `json:"userId,omitempty"`
}
- response_model.go
/************************ 响应数据 **************************/
type Head struct {
Seq string `json:"seq"` // 消息的Id
Cmd string `json:"cmd"` // 消息的cmd 动作
Response *Response `json:"response"` // 消息体
}
type Response struct {
Code uint32 `json:"code"`
CodeMsg string `json:"codeMsg"`
Data interface{} `json:"data"` // 数据 json
}
(7)使用路由的方式处理客户端的请求数据
- 使用路由的方式处理由客户端发送过来的请求数据
- 以后添加请求类型以后就可以用类是用http相类似的方式(router-controller)去处理
- acc_routers.go
// Websocket 路由
func WebsocketInit() {
websocket.Register("login", websocket.LoginController)
websocket.Register("heartbeat", websocket.HeartbeatController)
}
(8) 防止内存溢出和Goroutine不回收
a、定时任务清除超时连接 没有登录的连接和登录的连接6分钟没有心跳则断开连接
client_manager.go
// 定时清理超时连接
func ClearTimeoutConnections() {
currentTime := uint64(time.Now().Unix())
for client := range clientManager.Clients {
if client.IsHeartbeatTimeout(currentTime) {
fmt.Println("心跳时间超时 关闭连接", client.Addr, client.UserId, client.LoginTime, client.HeartbeatTime)
client.Socket.Close()
}
}
}
b、读写的Goroutine有一个失败,则相互关闭 write()Goroutine写入数据失败,关闭c.Socket.Close()连接,会关闭read()Goroutine read()Goroutine读取数据失败,关闭close(c.Send)连接,会关闭write()Goroutine
c、客户端主动关闭 关闭读写的Goroutine 从ClientManager删除连接
d、监控用户连接、Goroutine数 十个内存溢出有九个和Goroutine有关 添加一个http的接口,可以查看系统的状态,防止Goroutine不回收 查看系统状态
e、Nginx 配置不活跃的连接释放时间,防止忘记关闭的连接
f、使用 pprof 分析性能、耗时
2、使用javaScript实现webSocket客户端
(1) 启动并注册监听程序
- js 建立连接,并处理连接成功、收到数据、断开连接的事件处理
ws = new WebSocket("ws://127.0.0.1:8089/acc");
ws.onopen = function(evt) {
console.log("Connection open ...");
};
ws.onmessage = function(evt) {
console.log( "Received Message: " + evt.data);
data_array = JSON.parse(evt.data);
console.log( data_array);
};
ws.onclose = function(evt) {
console.log("Connection closed.");
};
(2) 发送数据
- 需要注意:连接建立成功以后才可以发送数据
- 建立连接以后由客户端向服务器发送数据示例
登录:
ws.send('{"seq":"2323","cmd":"login","data":{"userId":"11","appId":101}}');
心跳:
ws.send('{"seq":"2324","cmd":"heartbeat","data":{}}');
关闭连接:
ws.close();
三、goWebSocket 项目
1、项目说明
- 本项目是基于webSocket实现的分布式聊天系统
- 客户端随机分配用户名,所有人进入一个聊天室,实现群聊的功能
- 单台机器(24核128G内存)支持百万客户端连接
- 支持水平部署,部署的机器之间可以相互通讯
- 项目架构图
2、项目依赖
- 本项目只需要使用 redis 和 golang;
- 本项目使用govendor管理依赖,克隆本项目就可以直接使用。
# 主要使用到的包
github.com/gin-gonic/gin@v1.4.0
github.com/go-redis/redis
github.com/gorilla/websocket
github.com/spf13/viper
google.golang.org/grpc
github.com/golang/protobuf
3、项目启动
- 克隆项目
git clone git@github.com:link1st/gowebsocket.git
或
git clone https://github.com/link1st/gowebsocket.git
- 修改项目配置
cd gowebsocket
cd config
mv app.yaml.example app.yaml
# 修改项目监听端口,redis连接等(默认127.0.0.1:3306)
vim app.yaml
# 返回项目目录,为以后启动做准备
cd ..
- 配置文件说明
app:
logFile: log/gin.log # 日志文件位置
httpPort: 8080 # http端口
webSocketPort: 8089 # webSocket端口
rpcPort: 9001 # 分布式部署程序内部通讯端口
httpUrl: 127.0.0.1:8080
webSocketUrl: 127.0.0.1:8089
redis:
addr: "localhost:6379"
password: ""
DB: 0
poolSize: 30
minIdleConns: 30
- 启动项目
go run main.go
- 进入IM聊天地址 http://127.0.0.1:8080/home/index
- 到这里,就可以体验到基于webSocket的IM系统
四、WebSocket项目的Nginx配置
1、为什么要配置Nginx?
- 使用nginx实现内外网分离,对外只暴露Nginx的Ip(一般的互联网企业会在nginx之前加一层LVS做负载均衡),减少入侵的可能;
- 使用Nginx可以利用Nginx的负载功能,前端再使用的时候只需要连接固定的域名,通过Nginx将流量分发了到不同的机器;
- 同时我们也可以使用Nginx的不同的负载策略(轮询、weight、ip_hash)。
2、nginx如何配置?
- 使用域名 im.91vh.com 为示例,参考配置;
- 一级目录im.91vh.com/acc 是给webSocket使用,是用nginx stream转发功能(nginx 1.3.31 开始支持,使用Tengine配置也是相同的),转发到golang 8089 端口处理;
- 其它目录是给HTTP使用,转发到golang 8080 端口处理;
upstream go-im
{
server 127.0.0.1:8080 weight=1 max_fails=2 fail_timeout=10s;
keepalive 16;
}
upstream go-acc
{
server 127.0.0.1:8089 weight=1 max_fails=2 fail_timeout=10s;
keepalive 16;
}
server {
listen 80 ;
server_name im.91vh.com;
index index.html index.htm ;
location /acc {
proxy_set_header Host $host;
proxy_pass http://go-acc;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_set_header Connection "";
proxy_redirect off;
proxy_intercept_errors on;
client_max_body_size 10m;
}
location {
proxy_set_header Host $host;
proxy_pass http://go-im;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_redirect off;
proxy_intercept_errors on;
client_max_body_size 30m;
}
access_log /link/log/nginx/access/im.log;
error_log /link/log/nginx/access/im.error.log;
}
3、问题处理
- 运行nginx测试命令,查看配置文件是否正确
/link/server/tengine/sbin/nginx -t
- 如果出现错误
nginx: [emerg] unknown "connection_upgrade" variable
configuration file /link/server/tengine/conf/nginx.conf test failed
- 处理方法
- 在nginx.com添加
http{
fastcgi_temp_file_write_size 128k;
..... # 需要添加的内容
#support websocket
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
.....
gzip on;
}
- 原因:Nginx代理webSocket的时候就会遇到Nginx的设计问题 End-to-end and Hop-by-hop Headers
五、压测
1、Linux内核参数的优化
- 设置文件打开句柄数
ulimit -n 1000000
- 设置sockets连接参数
vim /etc/sysctl.conf
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 0
2、压测准备
- 待压测,如果大家有压测的结果欢迎补充
3、 压测数据
- 项目在实际使用的时候,每个连接约占 24Kb内存,一个Goroutine 约占11kb
- 支持百万连接需要22G内存
六、如何基于webSocket实现一个分布式聊天系统?
1、说明
- 参考本项目源码
- gowebsocket v1.0.0 单机版聊天系统;
- gowebsocket v1.0.0 分布式聊天系统;
- 为了方便演示,IM系统和webSocket(acc)系统合并在一个系统中;
- IM系统接口: 获取全部在线的用户,查询单前服务的全部用户+集群中服务的全部用户 发送消息,这里采用的是http接口发送(微信网页版发送消息也是http接口),这里考虑主要是两点: 1.服务分离,让acc系统尽量的简单一点,不掺杂其它业务逻辑 2.发送消息是走http接口,不使用webSocket连接,才用收和发送数据分离的方式,可以加快收发数据的效率;
2、架构
- 项目启动注册和用户连接时序图
- 其它系统(IM、任务)向webSocket(acc)系统连接的用户发送消息时序图
七、总结与回顾
1、在其它业务系统上的应用
- 本系统设计的初衷就是:和客户端保持一个长连接、对外部系统两个接口(查询用户是否在线、给在线的用户推送消息),实现业务的分离;
- 只有和业务分离开,才可以供多个业务使用,而不是每个业务都建立一个长链接。
2、 已经实现的功能
- gin log日志(请求日志+debug日志)
- 读取配置文件 完成
- 定时脚本,清理过期未心跳连接 完成
- http接口,获取登录、连接数量 完成
- http接口,发送push、查询有多少人在线 完成
- grpc 程序内部通讯,发送消息 完成
- appIds 一个用户在多个平台登录
- 界面,把所有在线的人拉倒一个群里面,发送消息 完成
- 单聊、群聊 完成
- 实现分布式,水平扩张 完成
- 压测脚本
- 文档整理
- 架构图以及扩展。
IM实现细节:
- 定义文本消息结构 完成;
- html发送文本消息 完成;
- 接口接收文本消息并发送给全体 完成;
- html接收到消息 显示到界面 完成;
- 界面优化 需要持续优化;
- 有人加入以后广播全体 完成;
- 定义加入聊天室的消息结构 完成;
- 引入机器人 待定。
3、还需要继续完善和优化的地方
- 登录,使用微信登录 获取昵称、头像等;
- 有账号系统、资料系统;
- 界面优化、适配手机端;
- 消息 文本消息(支持表情)、图片、语音、视频消息;
- 微服务注册、发现、熔断等;
- 添加配置项,单台机器最大连接数量。
4、 总结
- 虽然实现了一个分布式聊天系统,但是有很多细节没有处理(登录没有鉴权、界面还待优化等),但是可以通过这个示例可以了解到:通过WebSocket解决很多业务上需求;
- 本文虽然号称单台机器能有百万长连接(内存上能满足),但是实际在场景远比这个复杂(cpu有些压力),当然了如果你有这么大的业务量可以购买更多的机器更好的去支撑你的业务,本程序只是演示如何在实际工作用使用webSocket;
- 参考本文章,你就可以实现出来符合你需要的聊天程序。
如果你喜欢本文章,请帮忙关注、转发、点赞,本人将定期更新一些技术“干货”分享给大家,希望能帮助大家解决工作中的一些问题,谢谢!
相关推荐
- 前端入门——css 网格轨道详细介绍
-
上篇前端入门——cssGrid网格基础知识整体大概介绍了cssgrid的基本概念及使用方法,本文将介绍创建网格容器时会发生什么?以及在网格容器上使用行、列属性如何定位元素。在本文中,将介绍:...
- Islands Architecture(孤岛架构)在携程新版首页的实践
-
一、项目背景2022,携程PC版首页终于迎来了首次改版,完成了用户体验与技术栈的全面升级。作为与用户连接的重要入口,旧版PC首页已经陪伴携程走过了22年,承担着重要使命的同时,也遇到了很多问题:维护/...
- HTML中script标签中的那些属性
-
HTML中的<script>标签详解在HTML中,<script>标签用于包含或引用JavaScript代码,是前端开发中不可或缺的一部分。通过合理使用<scrip...
- CSS 中各种居中你真的玩明白了么
-
页面布局中最常见的需求就是元素或者文字居中了,但是根据场景的不同,居中也有简单到复杂各种不同的实现方式,本篇就带大家一起了解下,各种场景下,该如何使用CSS实现居中前言页面布局中最常见的需求就是元...
- CSS样式更改——列表、表格和轮廓
-
上篇文章主要介绍了CSS样式更改篇中的字体设置Font&边框Border设置,这篇文章分享列表、表格和轮廓,一起来看看吧。1.列表List1).列表的类型<ulstyle='list-...
- 一文吃透 CSS Flex 布局
-
原文链接:一文吃透CSSFlex布局教学游戏这里有两个小游戏,可用来练习flex布局。塔防游戏送小青蛙回家Flexbox概述Flexbox布局也叫Flex布局,弹性盒子布局。它决定了...
- css实现多行文本的展开收起
-
背景在我们写需求时可能会遇到类似于这样的多行文本展开与收起的场景:那么,如何通过纯css实现这样的效果呢?实现的难点(1)位于多行文本右下角的展开收起按钮。(2)展开和收起两种状态的切换。(3)文本...
- css 垂直居中的几种实现方式
-
前言设计是带有主观色彩的,同样网页设计中的css一样让人摸不头脑。网上列举的实现方式一大把,或许在这里你都看到过,但既然来到这里我希望这篇能让你看有所收获,毕竟这也是前端面试的基础。实现方式备注:...
- WordPress固定链接设置
-
WordPress设置里的最后一项就是固定链接设置,固定链接设置是决定WordPress文章及静态页面URL的重要步骤,从站点的SEO角度来讲也是。固定链接设置决定网站URL,当页面数少的时候,可以一...
- 面试发愁!吃透 20 道 CSS 核心题,大厂 Offer 轻松拿
-
前端小伙伴们,是不是一想到面试里的CSS布局题就发愁?写代码时布局总是对不齐,面试官追问兼容性就卡壳,想跳槽却总被“多列等高”“响应式布局”这些问题难住——别担心!从今天起,咱们每天拆解一...
- 3种CSS清除浮动的方法
-
今天这篇文章给大家介绍3种CSS清除浮动的方法。有一定的参考价值,有需要的朋友可以参考一下,希望对大家有所帮助。首先,这里就不讲为什么我们要清楚浮动,反正不清除浮动事多多。下面我就讲3种常用清除浮动的...
- 2025 年 CSS 终于要支持强大的自定义函数了?
-
大家好,很高兴又见面了,我是"高级前端进阶",由我带着大家一起关注前端前沿、深入前端底层技术,大家一起进步,也欢迎大家关注、点赞、收藏、转发!1.什么是CSS自定义属性CSS自...
- css3属性(transform)的一个css3动画小应用
-
闲言碎语不多讲,咱们说说css3的transform属性:先上效果:效果说明:当鼠标移到a标签的时候,从右上角滑出二维码。实现方法:HTML代码如下:需要说明的一点是,a链接的跳转需要用javasc...
- CSS基础知识(七)CSS背景
-
一、CSS背景属性1.背景颜色(background-color)属性值:transparent(透明的)或color(颜色)2.背景图片(background-image)属性值:none(没有)...
- CSS 水平居中方式二
-
<divid="parent"><!--定义子级元素--><divid="child">居中布局</div>...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- maven镜像 (69)
- undefined reference to (60)
- zip格式 (63)
- oracle over (62)
- date_format函数用法 (67)
- 在线代理服务器 (60)
- shell 字符串比较 (74)
- x509证书 (61)
- localhost (65)
- java.awt.headless (66)
- syn_sent (64)
- settings.xml (59)
- 弹出窗口 (56)
- applicationcontextaware (72)
- my.cnf (73)
- httpsession (62)
- pkcs7 (62)
- session cookie (63)
- java 生成uuid (58)
- could not initialize class (58)
- beanpropertyrowmapper (58)
- word空格下划线不显示 (73)
- jar文件 (60)
- jsp内置对象 (58)
- makefile编写规则 (58)