目录

  1. 前言
  2. 原理解析
  3. 匿名聊天室
    1. API网关配置
    2. 函数计算配置
    3. 体验与测试
  4. 总结

前言

WebSocket 协议是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信,即允许服务器主动发送信息给客户端。WebSocket 在服务端有数据推送需求时,可以主动发送数据至客户端。而原有 HTTP 协议的服务端对于需推送的数据,仅能通过轮询或 long poll 的方式来让客户端获得。

基于传统架构实现WebSocket协议,在一定程度上是比较困难的。那么在Serverless架构上实现WebSocket协议呢?众所周知,Serverless架构中,部署在FaaS平台的函数通常情况下是事件驱动的,且并不支持WebSocket协议,Serverless架构下是否可以实现WebSocket协议就是一个问题,如果可以实现,相对传统架构来说,难度是否会降低也是一个值得探索的内容。

其实Serverless架构是可以实现WebSocket协议的,而且基于Serverless架构实现的WebSocket协议会非常的简单,在FaaS平台与API网关触发器的加持下,Serverless架构是可以借助API网关等产品更简单的实现WebSocket协议。本文将会以阿里云函数计算为例,通过阿里云API网关,以及函数计算的API网关触发器实现一个基于WebSocket协议的聊天工具。

原理解析

image

由于函数计算是无状态且触发式的,即在有事件到来时才会被触发,因此,如图所示,为了实现 WebSocket,函数计算与 API 网关相结合,通过 API 网关承接及保持与客户端的连接,即 API 网关与函数计算一起实现了服务端。当客户端有消息发出时,会先传递给 API 网关,再由 API 网关触发函数执行。当服务端云函数要向客户端发送消息时,会先由云函数将消息 POST 到 API 网关的反向推送链接,再由 API 网关向客户端完成消息的推送。

在API网关处的业务简图:

image

整个流程为:

  1. 客户端在启动的时候和API网关建立了WebSocket连接,并且将自己的设备ID告知API网关;
  2. 客户端在WebSocket通道上发起注册信令;
  3. API网关将注册信令转换成HTTP协议发送给用户后端服务,并且在注册信令上加上设备ID参数(增加在名称为x-ca-deviceid的header中);
  4. 用户后端服务验证注册信令,如果验证通过,记住用户设备ID,返回200应答;
  5. 用户后端服务通过HTTP/HTTPS/WebSocket三种协议中的任意一种向API网关发送下行通知信令,请求中携带接收请求的设备ID;
  6. API网关解析下行通知信令,找到指定设备ID的连接,将下行通知信令通过WebSocket连接发送给指定客户端;
  7. 客户端在不想收到用户后端服务通知的时候,通过WebSocket连接发送注销信令给API网关,请求中不携带设备ID;
  8. API网关将注销信令转换成HTTP协议发送给用户后端服务,并且在注册信令上加上设备ID参数;
  9. 用户后端服务删除设备ID,返回200应答。

上述整个流程,完整流程如图:

image

固然,想要在API网关与FaaS平台基础上,实现一个WebSocket协议的功能,步骤是比较多的,但是其实这里面已经有很多工作是API网关帮助我们完成的。如果说将上面的整个流程,进一步压缩,压缩成我们所需要执行的操作,那么整进一步简化后,可以得到核心的四个流程:

  1. 开通分组绑定的域名的WebSocket通道;
  2. 创建注册、下行通知、注销三个API,给这三个API授权、并上线;
  3. 用户后端服务实现注册,注销信令逻辑,通过SDK发送下行通知;
  4. 下载SDK,嵌入到客户端,建立WebSocket连接,发送注册请求,监听下行通知;

在这四个流程中,第一个流程是准备工作,第二个流程是涉及到API网关实现WebSocket协议的配置流程,第三个流程和第四个流程涉及到在Serverless架构下基于API网关实现WebSocket协议信息推动的核心功能。在上面的第二个流程中,涉及到注册、下行、注销三个API,这三个API在阿里云API网关中,实际上是所需要实现WebSocket的三种管理信令对应的行为:

  1. 注册信令:注册信令是客户端发送给用户后端服务的信令,起到两个作用:
  • 将客户端的设备ID发送给用户后端服务,用户后端服务需要记住这个设备ID。用户不需要定义设备ID字段,设备ID字段由API网关的SDK自动生成;
  • 用户可以将此信令定义为携带用户名和密码的API,用户后端服务在收到注册信令的验证客户端的合法性。用户后端服务在返回注册信令应答的时候,返回非200时,API网关会视此情况为注册失败。
    客户端要想收到用户后端服务发送过来的通知,需要先发送注册信令给API网关,收到用户后端服务的200应答后正式注册成功。
  1. 下行通知信令:用户后端服务,在收到客户端发送的注册信令后,记住注册信令中的设备ID字段,然后就可以向API网关发送接收方为这个设备的下行通知信令了。只要这个设备在线,API网关就可以将此下行通知发送到端。
  2. 注销信令:客户端在不想收到用户后端服务的通知时发送注销信令发送给API网关,收到用户后端服务的200应答后注销成功,不再接受用户后端服务推送的下行消息。

匿名聊天室

API网关配置

首先,我们需要在函数计算处新建三个事件函数分别对应三种信令,或者辅助三种信令进行工作:

image

创建完成三个基本的测试函数(使用默认函数代码即可,之后会重新实现这三个函数的业务逻辑)之后,我们需要在API网关处配置这三个测试函数的相关接口,首先需要创建一个API网管分组:

image

创建API网关分组之后,可以对该分组进行域名的绑定。这里需要额外注意的是,绑定域名之后,需要开启WebSocket通信状态:

image

配置域名之后,我们需要在这个API分组下面创建四个API,这四个API,分别用来实现三种信令,以及一个上行数据的接口:

image

websocket_register: 是实现注册信令,对应后端的函数为register函数:

image

websocket_notify: 为下行通知请求,协议为HTTP以及Websocket,无需配置后端函数:

image

websocket_clean:注销请求,对应的后端函数计算中的clean函数:

image

websocket_send: 接收上行数据的普通请求,对应后端函数计算中的send函数:

image

创建完成之后,需要将这些API进行发布,并且创建应用:

image

创建应用完成之后,需要对websocket_notify接口进行授权:

image

并且创建对应的AppKey:

image

完成上述配置,我们即完成了一个基于Serverless架构的WebSocket协议服务的框架搭建,接下来,只需要根据业务需求,进行对应函数的实现即可,这里所涉及到的对应函数包括注册函数、传输函数以及清理函数等。

函数计算配置

为了实现基于Serverless架构的匿名聊天室的功能,除了配置API网关之外,还需要对之前我们所创建的三个函数进行业务逻辑的实现,所涉及到的函数以及对应处理的业务逻辑主要为:

  • register函数:注册函数,当函数注册时,将用户的Id/设备Id存储到对象存储中;
  • send函数:传输函数,当一个客户端发送消息后,通过send函数接收,并将消息通过API网关的下行通知请求发送给在线的其他客户端。判断在线的其他客户端的方法是通过对象存储中的object来进行判断;
  • clean函数:清理函数,用来断开连接,并清理链接对象存储在对象存储中的object信息;

这其中register函数主要是将客户端在发起请求建链时携带的x-ca-deviceid进行持久化,可以选择存储到数据库中,也可以选择存储到对象存储等其他可持久化的平台上,以便我们可以随时查询和确定客户端的链接ID,这一部分的代码实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# -*- coding: utf-8 -*-
import oss2
import json
ossClient = oss2.Bucket(oss2.Auth('<AccessKeyID>', '<AccessKeySecret>'),
'http://oss-cn-hongkong.aliyuncs.com',
'<BucketName>')

def register(event, context):
userId = json.loads(event.decode("utf-8"))['headers']['x-ca-deviceid']
# 注册的时候,将链接写入到对象存储
ossClient.put_object(userId, 'user-id')
# 返回客户端注册结果
return {
'isBase64Encoded': 'false',
'statusCode': '200',
'body': {
'userId': userId
},
}

send函数的作用主要是两个:

  • 接收客户端通过API网关发送过来的信息;
  • 将收到的信息推动到目前已有链接的其他客户端上;

除了上述两部分作用之外,该函数还会涉及到意外断开的客户端清理相关的操作,例如当向某客户端推送数据失败时,可以认为是该客户端已经断开链接,此时可以讲该客户端的ID在对象存储中清理掉。当然,这一部分还可以进行更多的能力建设,例如:

  • 针对用户发送的信息,进行鉴黄鉴恐的筛选;
  • 针对用的发送的信息内容,进行部分的持久化和分析,进而判断用户的聊天热点话题等;

针对这一部分的整体代码实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# -*- coding: utf-8 -*-
import oss2
import json
import base64
from apigateway import client
from apigateway.http import request
from apigateway.common import constant

ossClient = oss2.Bucket(oss2.Auth('<AccessKeyID>', '<AccessKeySecret>'),
'http://oss-cn-hongkong.aliyuncs.com',
'<BucketName>')
apigatewayClient = client.DefaultClient(app_key="<app_key>",
app_secret="<app_secret>")

def send(event, context):

host = "http://websocket.serverless.fun"
url = "/notify"
userId = json.loads(event.decode("utf-8"))['headers']['x-ca-deviceid']

# 获取链接对象
for obj in oss2.ObjectIterator(ossClient):
if obj.key != userId:
req_post = request.Request(host=host,
protocol=constant.HTTP,
url=url,
method="POST",
time_out=30000,
headers={'x-ca-deviceid': obj.key})
req_post.set_body(json.dumps({
"from": userId,
"message": base64.b64decode(json.loads(event.decode("utf-8"))['body']).decode("utf-8")
}))
req_post.set_content_type(constant.CONTENT_TYPE_STREAM)
result = apigatewayClient.execute(req_post)
print(result)
if result[0] != 200:
# 删除链接记录
ossClient.delete_object(obj.key)
return {
'isBase64Encoded': 'false',
'statusCode': '200',
'body': {
'status': "ok"
},
}

在send函数中,涉及到向其他客户端推送相关信息的操作,针对这一部分,需要引入API网关提供的对应的SDK实现。

通过API网关提供的对应语言的SDK,可以非常简单的向下行通知请求接口发起请求的行为:

image

最后一部分是clean函数,这一部分主要是当客户端发起断开连接的请求时,通过API网关触发函数计算,将所对应的x-ca-deviceid信息在对象存储中进行清理,其整体逻辑为:

1
2
3
4
5
6
7
8
9
10
11
12
# -*- coding: utf-8 -*-
import oss2
import json
ossClient = oss2.Bucket(oss2.Auth('<AccessKeyID>', '<AccessKeySecret>'),
'http://oss-cn-hongkong.aliyuncs.com',
'<BucketName>')

def clean(event, context):
userId = json.loads(event.decode("utf-8"))['headers']['x-ca-deviceid']
# 删除链接记录
ossClient.delete_object(userId)

至此,我们完成了匿名聊天似的服务端建设。

体验与测试

在完成上面功能编写之后,我们可以在本地进行基本的测试。在测试过程中,主要有创建链接、发送消息、接受推送的三个部分。

关于创建链接、断开链接以及接收消息部分,可以通过WebSokcet的相关模块实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
const uuid = require('uuid');
const util = require('util');

const register = function (editor, deviceId) {
const ws = new WebSocket('ws://websocket.serverless.fun:8080');
const now = new Date();

const reg = {
method: 'GET',
host: 'websocket.serverless.fun:8080',
querys: {},
headers: {
'x-ca-websocket_api_type': ['REGISTER'],
'x-ca-seq': ['0'],
'x-ca-nonce': [uuid.v4().toString()],
'date': [now.toUTCString()],
'x-ca-timestamp': [now.getTime().toString()],
'CA_VERSION': ['1'],
},
path: '/register',
body: '',
};

ws.onopen = function open() {
ws.send('RG#' + deviceId);
};

var registered = false;
var hbStarted = false;

ws.onmessage = function incoming(event) {
if (event.data.startsWith('NF#')) {
const msg = JSON.parse(event.data.substr(3));
editor.addHistory(util.format('%s > %s', msg.from, msg.message));
editor.setState({'prompt': deviceId + " > "});
return;
}
if (!hbStarted && event.data.startsWith('RO#')) {
console.log('Login successfully');
if (!registered) {
registered = true;
ws.send(JSON.stringify(reg));
}
hbStarted = true;
setInterval(function () {
ws.send('H1');
}, 15 * 1000);
return;
}
};

ws.onclose = function (event) {
console.log('ws closed:', event);
};
};

module.exports = register;

发送信息到send函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
execShellCommand: function (cmd) {
/*
cmd 是客户端发送的文本
post到ShellApi是send函数对应的接口,例如http://websocket.serverless.fun/send
*/
const that = this;
that.setState({'prompt': ''})
that.offset = 0
that.cmds.push(cmd)
axios.post(ShellApi, cmd, {
headers: {
'Content-Type': 'application/octet-stream',
"x-ca-deviceid": deviceId
}
}).then(function (res) {
that.setState({'prompt': Prompt});
}).catch(function (err) {
const errText = err.response ? err.response.status + ' ' + err.response.statusText : err.toString();
that.addHistory(errText);
that.setState({'prompt': Prompt})
});
}

完成客户端的核心逻辑编辑之后,可以通过HTML和CSS实现部分页面,便于测试。如图所示,当我们完成页面样式的编辑,和本地逻辑的编辑之后,我们可以打开两个窗口进行项目的测试。我们可以看到,当我们打开两个窗口之后,每个窗口都会随机的生成一个客户端ID:

image

如果所示,当我们在左侧窗口输入一个字符串并按回车发送:

image

可以看到在右侧,出现了左侧窗口的ID,并且出现了左侧刚刚发送的信息。此时我们进一步测试,我们可以在右侧同样输入字符串,并按回车按钮发送:

image

可以看到左侧,也同样出现了相关的效果。至此,我们已经基于Serverless架构实现了匿名聊天室的功能,完成服务端的建设,和客户端的测试,可以确保项目创建链接、发送消息、接收消息。

总结

通过函数计算和API网关进行WebSocket的实践,绝对不仅仅是一个聊天工具这么简单,他可以用在很多方面,例如通过WebSocket进行实时日志系统的制作等。单独的函数计算,仅仅是一个计算平台,只有和周边的BaaS结合,才能展示出Serverless架构的价值和真正的能力,意义。这也是为什么很多人Serverless = FaaS + BaaS的一个原因。
通过本文的抛砖引玉,希望读者可以进一步对Serverless有更深的认识,可以将Serverless和更多的触发器、事件源等进一步结合,探索更多有趣的应用,并将其更简单的应用到自己的项目中。

欢迎您关注我的博客,也欢迎转载该博客,转载请注明本文地址: http://bluo.cn/serverless-aliyun-websocket/ 。有关于Serverless等相关问题欢迎联系我:80902630

微信号抖音号