基于 Swoole 实现支持高并发的实时弹幕功能(下)


我们接着上篇教程来完成弹幕服务端以及客户端与服务端交互的开发,首先来实现服务端 WebSocket 服务器的编码。

WebSocket 服务器

我们参照之前的功能介绍教程《在 Laravel 中集成 Swoole 实现 WebSocket 服务器》实现这个用于弹幕功能的 WebSocket 服务器。

注:如果你还没有在 Laravel 项目中安装配置 LaravelS 扩展包,参考这篇教程:基于 Swoole 实现高性能 HTTP 服务器

WebSocketHandler

首先在 app/Handlers 目录下创建一个实现了 WebSocketHandlerInterface 接口的 WebSocket 处理器类 WebSocketHandler,并编写对应的业务代码如下:

<?php
namespace App\Handlers;

use Hhxsv5\LaravelS\Swoole\WebSocketHandlerInterface;
use Illuminate\Support\Facades\Log;
use Swoole\Http\Request;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;

class WebSocketHandler implements WebSocketHandlerInterface
{
    public function __construct()
    {

    }

    // 连接建立时触发
    public function onOpen(Server $server, Request $request)
    {
        Log::info('WebSocket 连接建立:' . $request->fd);
    }

    // 收到消息时触发
    public function onMessage(Server $server, Frame $frame)
    {
        // $frame->fd 是客户端 id,$frame->data 是客户端发送的数据
        Log::info("从 {$frame->fd} 接收到的数据: {$frame->data}");
        foreach($server->connections as $fd){
            if (!$server->isEstablished($fd)) {
                // 如果连接不可用则忽略
                continue;
            }
            $server->push($fd , $frame->data); // 服务端通过 push 方法向所有连接的客户端发送数据
        }
    }

    // 连接关闭时触发
    public function onClose(Server $server, $fd, $reactorId)
    {
        Log::info('WebSocket 连接关闭:' . $fd);
    }
}

很简单,就是在建立、断开 WebSocket 连接的时候打印下日志,然后在收到客户端发送过来的弹幕消息时将其推送给所有已连接的 WebSocket 客户端,达到「广播」的效果,这样,就不需要客户端主动来拉数据了。当然,这里是最简单的推送逻辑,你可以根据需要将弹幕消息保存到数据库或其他存储设备持久化存储。

然后我们在 config/laravels.php 中配置这个 WebSocketHandler 使其生效:

'websocket' => [
    'enable' => true,
    'handler' =>  \App\Handlers\WebSocketHandler::class,
],

这样一来服务端 WebSocket 处理器的编码工作就完成了,很简单吧,接下来,我们还要在 Nginx 中配置使其支持 WebSocket 通信。

在 Nginx 中配置支持 WebSocket 通信

假设我们这个项目对应的虚拟主机域名是 laravel58.test,接下来我们要到 Nginx 对应的虚拟主机配置中使其支持处理 WebSocket 连接和请求,由于 Swoole 的 WebSocket 服务器基于 Swoole HTTP 服务器实现,所以我们要同时开启这两个支持,修改 laravel58 应用对应虚拟主机配置文件 laravel58.conf 内容如下:

map $http_upgrade $connection_upgrade {
    default upgrade;
    ''      close;
}

upstream danmu {
    # Connect IP:Port
    server workspace:5200 weight=5 max_fails=3 fail_timeout=30s;
    keepalive 16;
}

server {
    listen 80;

    server_name laravel58.test;
    root /var/www/laravel58/public;

    error_log /var/log/nginx/laravel58_error.log;
    access_log /var/log/nginx/laravel58_access.log;

    autoindex off;
    index index.html index.htm;

    # Nginx handles the static resources(recommend enabling gzip), LaravelS handles the dynamic resource.
    location / {
        try_files $uri @danmu;
    }

    # Response 404 directly when request the PHP file, to avoid exposing public/*.php
    #location ~* \.php$ {
    #    return 404;
    #}

    # Http and WebSocket are concomitant, Nginx identifies them by "location"
    # !!! The location of WebSocket is "/ws"
    # Javascript: var ws = new WebSocket("ws://laravel58.test/ws");
    # 处理 WebSocket 通信
    location ^~ /ws/ {
        # proxy_connect_timeout 60s;
        # proxy_send_timeout 60s;
        # proxy_read_timeout: Nginx will close the connection if the proxied server does not send data to Nginx in 60 seconds; At the same time, this close behavior is also affected by heartbeat setting of Swoole.
        # proxy_read_timeout 60s;
        proxy_http_version 1.1;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Real-PORT $remote_port;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;
        proxy_set_header Scheme $scheme;
        proxy_set_header Server-Protocol $server_protocol;
        proxy_set_header Server-Name $server_name;
        proxy_set_header Server-Addr $server_addr;
        proxy_set_header Server-Port $server_port;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
        proxy_pass http://danmu;
    }

    location @danmu {
        # proxy_connect_timeout 60s;
        # proxy_send_timeout 60s;
        # proxy_read_timeout 60s;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Real-PORT $remote_port;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;
        proxy_set_header Scheme $scheme;
        proxy_set_header Server-Protocol $server_protocol;
        proxy_set_header Server-Name $server_name;
        proxy_set_header Server-Addr $server_addr;
        proxy_set_header Server-Port $server_port;
        proxy_pass http://danmu;
    }
}

注:学院君本地基于 Laradock 作为开发环境,所以在 upstream danmu 配置块中将 server 设置为 workspace:5200,如果你不是以 Laradock 作为开发环境,需要调整这里的 IP 地址为实际 Swoole WebSocket 服务器运行地址。

接下来,重启 Nginx 服务器,然后我们在 laravel58 项目下启动 HTTP/WebSocket 服务器:

bin/laravels start

就可以基于 Nginx + Swoole HTTP/WebSocket 服务器对外提供服务了。

Vue 组件与 WebSocket 服务器通信

下面,我们回到上篇教程实现的 Vue 组件 DanmuComponent,编写客户端与 WebSocket 服务器通信的 JavaScript 脚本代码:

<script>
    import { MESSAGE_TYPE } from 'vue-baberrage'

    export default {
        name: 'danmu',
        data () {
            return {
                msg: '你好,学院君!',
                position: 'top',
                barrageIsShow: true,
                currentId: 0,
                barrageLoop: true,
                websocket: null,
                barrageList: []
            }
        },
        created () {
            // 初始化 websocket 并定义回调函数
            this.websocket = new WebSocket("ws://laravel58.test/ws/");
            this.websocket.onopen = function (event) {
                console.log("已建立 WebSocket 连接");
            };
            let that = this;
            this.websocket.onmessage = function (event) {
                // 接收到 WebSocket 服务器返回消息时触发
                let data = JSON.parse(event.data);
                that.addToList(data.position, data.message);
            };
            this.websocket.onerror = function (event) {
                console.log("与 WebSocket 通信出错");
            };
            this.websocket.onclose = function (event) {
                console.log("断开 WebSocket 连接");
            };
        },
        destroyed () {
            this.websocket.close();
        },
        methods: {
            removeList () {
                this.barrageList = []
            },
            addToList (position, message) {
                if (position === 'top') {
                    this.barrageList.push({
                        id: ++this.currentId,
                        avatar: 'https://xueyuanjun.com/assets/avatars/numxwdxf8lrtrsol.jpg',
                        msg: message,
                        barrageStyle: 'top',
                        time: 8,
                        type: MESSAGE_TYPE.FROM_TOP,
                        position: 'top'
                    })
                } else {
                    this.barrageList.push({
                        id: ++this.currentId,
                        avatar: 'https://xueyuanjun.com/assets/avatars/numxwdxf8lrtrsol.jpg',
                        msg: message,
                        time: 15,
                        type: MESSAGE_TYPE.NORMAL
                    })
                }
            },
            sendMsg () {
                // 发送消息到 WebSocket 服务器
                this.websocket.send('{"position":"' + this.position + '", "message":"' + this.msg + '"}');
            },
        }
    }
</script>

我们在 Vue 组件创建期间初始化 WebSocket 连接并定义回调函数,在接收到 WebSocket 服务器返回的消息时调用 addToList 方法将其渲染到客户端弹幕组件中,最后将模板代码中点击发送事件函数从之前的 addToList 调整为 sendMsg,即将消息推送给 WebSocket 服务器:

<template>
    <div id="danmu">
        <div class="stage">
            <vue-baberrage
                    :isShow= "barrageIsShow"
                    :barrageList = "barrageList"
                    :loop = "barrageLoop"
                    :maxWordCount = "60"
            >
            </vue-baberrage>
        </div>
        <div class="danmu-control">
            <div>
                <select v-model="position">
                    <option value="top">从上</option>
                    <option value="abc">从右</option>
                </select>
                <input type="text" style="float:left"  v-model="msg"/>
                <button type="button" style="float:left" @click="sendMsg">发送</button>
            </div>
        </div>
    </div>
</template>

至此,客户端 Vue 组件就编写好了,当然,这里只是一个小的功能演示,对于复杂系统,如果多处需要建立 WebSocket 通信,可以在公共组件中初始化 WebSocket 连接,然后通过 Vuex 来管理 WebSocket 服务器返回的消息,最后在各自的 Vue 组件中监听 Vuex 数据变更进行本地数据渲染,关于这一部分的优化,我们在实时聊天室项目中会演示如何实现,这里弹幕功能比较单一,就将代码直接写到一个 Vue 组件中了。

基于 WebSocket + Vue 客户端的弹幕功能演示

重新编译前端资源让上述 Vue 组件变更生效:

npm run dev

然后打开浏览器访问 http://laravel58.test/danmu,在开发者工具的控制台标签页可以看到 WebSocket 连接建立日志,在输入框中输入文字点击「发送」,可以看到弹幕效果:

这个时候体现的效果和上篇教程并无二致,尽管这些弹幕消息已经是经过 WebSocket 服务器处理后返回的,而不是直接渲染的,这个通信过程可以在开发者工具的 Network->WS 标签页中看到:

为了体现 WebSocket 服务器的优势,新开一个浏览器标签页,为了方便识别,我们把上面这个已经打开的标签页叫做 P1,把新开的标签页叫做 P2,重新加载 P1,刷新之前的数据,然后点击「发送」按钮,可以同时在 P1 和 P2 上看到弹幕消息,同理,在 P2 上更新输入框默认数据,比如「Laravel学院」,然后点击发送按钮,也可以在 P1 看到 P2 页面发出的弹幕消息:

这样,我们就完成了简单的、后端基于 WebSocket 服务器通信的、支持长连接和多并发的实时弹幕功能。不过真正用于实际项目的话还有一些地方需要优化,比如与用户系统的关联、视频蒙版、以及弹幕重叠等,由于这些不是本教程探讨的重点,就不再展开了,感兴趣的同学可以自行去研究和实现。


点赞 取消点赞 收藏 取消收藏

<< 上一篇: 基于 Swoole 实现支持高并发的实时弹幕功能(上)

>> 下一篇: 基于 Swoole 开发实时在线聊天室(一):环境准备篇