0

0

用Go语言编写一个简单的WebSocket推送服务

尚

发布时间:2019-11-25 16:13:23

|

2683人浏览过

|

来源于博客园

转载

用Go语言编写一个简单的WebSocket推送服务

推送服务实现

基本原理

server 启动以后会注册两个 handler。

websocketHandler 用于提供浏览器端发送 Upgrade 请求并升级为 WebSocket 连接。

pushHandler 用于提供外部推送端发送推送数据的请求。

浏览器首先连接 websocketHandler (默认地址为 ws://ip:port/ws)升级请求为 WebSocket 连接,当连接建立之后需要发送注册信息进行注册。这里注册信息中包含一个 token 信息。

server 会对提供的 token 进行验证并获取到相应的 userId(通常来说,一个 userId 可能同时关联许多 token),并保存维护好 token, userId 和 conn(连接)之间的关系。

立即学习go语言免费学习笔记(深入)”;

推送端发送推送数据的请求到 pushHandler(默认地址为 ws://ip:port/push),请求中包含了 userId 字段和 message 字段。server 会根据 userId 获取到所有此时连接到该 server 的 conn,然后将 message 一一进行推送。

由于推送服务的实时性,推送的数据并没有也不需要进行缓存。

代码详解

我在此处会稍微讲述一下代码的基本构成,也顺便说说 Go 语言中一些常用的写法和模式(本人也是从其他语言转向 Go 语言,毕竟 Go 语言也相当年轻。所以有建议的话,敬请提出。)。

由于Go语言的发明人和一些主要维护者大都来自于 C/C++ 语言,所以 Go 语言的代码也更偏向于 C/C++ 系。

首先先看一下 Server 的结构:

// Server defines parameters for running websocket server.
type Server struct {
    // Address for server to listen on
    Addr string

    // Path for websocket request, default "/ws".
    WSPath string

    // Path for push message, default "/push".
    PushPath string

    // Upgrader is for upgrade connection to websocket connection using
    // "github.com/gorilla/websocket".
    //
    // If Upgrader is nil, default upgrader will be used. Default upgrader is
    // set ReadBufferSize and WriteBufferSize to 1024, and CheckOrigin always
    // returns true.
    Upgrader *websocket.Upgrader

    // Check token if it's valid and return userID. If token is valid, userID
    // must be returned and ok should be true. Otherwise ok should be false.
    AuthToken func(token string) (userID string, ok bool)

    // Authorize push request. Message will be sent if it returns true,
    // otherwise the request will be discarded. Default nil and push request
    // will always be accepted.
    PushAuth func(r *http.Request) bool

    wh *websocketHandler
    ph *pushHandler
}

这里说一下 Upgrader *websocket.Upgrader,这是 gorilla/websocket 包的对象,它用来升级 HTTP 请求。

如果一个结构体参数过多,通常不建议直接初始化,而是使用它提供的 New 方法。这里是:

// NewServer creates a new Server.func NewServer(addr string) *Server {    return &Server{
        Addr:     addr,
        WSPath:   serverDefaultWSPath,
        PushPath: serverDefaultPushPath,
    }
}

这也是 Go 语言对外提供初始化方法的一种常见用法。

SUN2008 企业网站管理系统2.0 beta
SUN2008 企业网站管理系统2.0 beta

1、数据调用该功能使界面与程序分离实施变得更加容易,美工无需任何编程基础即可完成数据调用操作。2、交互设计该功能可以方便的为栏目提供个性化性息功能及交互功能,为产品栏目添加产品颜色尺寸等属性或简单的留言和订单功能无需另外开发模块。3、静态生成触发式静态生成。4、友好URL设置网页路径变得更加友好5、多语言设计1)UTF8国际编码; 2)理论上可以承担一个任意多语言的网站版本。6、缓存机制减轻服务器

下载

然后 Server 使用 ListenAndServe 方法启动并监听端口,与 http 包的使用类似:

// ListenAndServe listens on the TCP network address and handle websocket
// request.
func (s *Server) ListenAndServe() error {
    b := &binder{
        userID2EventConnMap: make(map[string]*[]eventConn),
        connID2UserIDMap:    make(map[string]string),
    }

    // websocket request handler
    wh := websocketHandler{
        upgrader: defaultUpgrader,
        binder:   b,
    }
    if s.Upgrader != nil {
        wh.upgrader = s.Upgrader
    }
    if s.AuthToken != nil {
        wh.calcUserIDFunc = s.AuthToken
    }
    s.wh = &wh
    http.Handle(s.WSPath, s.wh)

    // push request handler
    ph := pushHandler{
        binder: b,
    }
    if s.PushAuth != nil {
        ph.authFunc = s.PushAuth
    }
    s.ph = &ph
    http.Handle(s.PushPath, s.ph)

    return http.ListenAndServe(s.Addr, nil)
}

这里我们生成了两个 Handler,分别为 websocketHandler 和 pushHandler。websocketHandler 负责与浏览器建立连接并传输数据,而 pushHandler 则处理推送端的请求。

可以看到,这里两个 Handler 都封装了一个 binder 对象。这个 binder 用于维护 token userID Conn 的关系:

// binder is defined to store the relation of userID and eventConn
type binder struct {
    mu sync.RWMutex

    // map stores key: userID and value of related slice of eventConn
    userID2EventConnMap map[string]*[]eventConn

    // map stores key: connID and value: userID
    connID2UserIDMap map[string]string
}

websocketHandler

具体看一下 websocketHandler 的实现。

// websocketHandler defines to handle websocket upgrade request.
type websocketHandler struct {
    // upgrader is used to upgrade request.
    upgrader *websocket.Upgrader

    // binder stores relations about websocket connection and userID.
    binder *binder

    // calcUserIDFunc defines to calculate userID by token. The userID will
    // be equal to token if this function is nil.
    calcUserIDFunc func(token string) (userID string, ok bool)
}

很简单的结构。websocketHandler 实现了 http.Handler 接口:

// First try to upgrade connection to websocket. If success, connection will
// be kept until client send close message or server drop them.
func (wh *websocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    wsConn, err := wh.upgrader.Upgrade(w, r, nil)
    if err != nil {
        return
    }
    defer wsConn.Close()

    // handle Websocket request
    conn := NewConn(wsConn)
    conn.AfterReadFunc = func(messageType int, r io.Reader) {
        var rm RegisterMessage
        decoder := json.NewDecoder(r)
        if err := decoder.Decode(&rm); err != nil {
            return
        }

        // calculate userID by token
        userID := rm.Token
        if wh.calcUserIDFunc != nil {
            uID, ok := wh.calcUserIDFunc(rm.Token)
            if !ok {
                return
            }
            userID = uID
        }

        // bind
        wh.binder.Bind(userID, rm.Event, conn)
    }
    conn.BeforeCloseFunc = func() {
        // unbind
        wh.binder.Unbind(conn)
    }

    conn.Listen()
}

首先将传入的 http.Request 转换为 websocket.Conn,再将其分装为我们自定义的一个 wserver.Conn(封装,或者说是组合,是 Go 语言的典型用法。记住,Go 语言没有继承,只有组合)。

然后设置了 Conn 的 AfterReadFunc 和 BeforeCloseFunc 方法,接着启动了 conn.Listen()。AfterReadFunc 意思是当 Conn 读取到数据后,尝试验证并根据 token 计算 userID,然乎 bind 注册绑定。BeforeCloseFunc 则为 Conn 关闭前进行解绑操作。

pushHandler

pushHandler 则容易理解。它解析请求然后推送数据:

// Authorize if needed. Then decode the request and push message to each
// realted websocket connection.
func (s *pushHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    // authorize
    if s.authFunc != nil {
        if ok := s.authFunc(r); !ok {
            w.WriteHeader(http.StatusUnauthorized)
            return
        }
    }

    // read request
    var pm PushMessage
    decoder := json.NewDecoder(r.Body)
    if err := decoder.Decode(&pm); err != nil {
        w.WriteHeader(http.StatusBadRequest)
        w.Write([]byte(ErrRequestIllegal.Error()))
        return
    }

    // validate the data
    if pm.UserID == "" || pm.Event == "" || pm.Message == "" {
        w.WriteHeader(http.StatusBadRequest)
        w.Write([]byte(ErrRequestIllegal.Error()))
        return
    }

    cnt, err := s.push(pm.UserID, pm.Event, pm.Message)
    if err != nil {
        w.WriteHeader(http.StatusInternalServerError)
        w.Write([]byte(err.Error()))
        return
    }

    result := strings.NewReader(fmt.Sprintf("message sent to %d clients", cnt))
    io.Copy(w, result)
}
Conn
Conn (此处指 wserver.Conn) 为 websocket.Conn 的包装。

// Conn wraps websocket.Conn with Conn. It defines to listen and read
// data from Conn.
type Conn struct {
    Conn *websocket.Conn

    AfterReadFunc   func(messageType int, r io.Reader)
    BeforeCloseFunc func()

    once   sync.Once
    id     string
    stopCh chan struct{}
}

最主要的方法为 Listen():

// Listen listens for receive data from websocket connection. It blocks
// until websocket connection is closed.
func (c *Conn) Listen() {
    c.Conn.SetCloseHandler(func(code int, text string) error {
        if c.BeforeCloseFunc != nil {
            c.BeforeCloseFunc()
        }

        if err := c.Close(); err != nil {
            log.Println(err)
        }

        message := websocket.FormatCloseMessage(code, "")
        c.Conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))
        return nil
    })

    // Keeps reading from Conn util get error.
ReadLoop:
    for {
        select {
        case <-c.stopCh:
            break ReadLoop
        default:
            messageType, r, err := c.Conn.NextReader()
            if err != nil {
                // TODO: handle read error maybe
                break ReadLoop
            }

            if c.AfterReadFunc != nil {
                c.AfterReadFunc(messageType, r)
            }
        }
    }
}

主要设置了当 websocket 连接关闭时的处理和不停地读取数据。

推荐:golang教程

相关专题

更多
golang如何定义变量
golang如何定义变量

golang定义变量的方法:1、声明变量并赋予初始值“var age int =值”;2、声明变量但不赋初始值“var age int”;3、使用短变量声明“age :=值”等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

173

2024.02.23

golang有哪些数据转换方法
golang有哪些数据转换方法

golang数据转换方法:1、类型转换操作符;2、类型断言;3、字符串和数字之间的转换;4、JSON序列化和反序列化;5、使用标准库进行数据转换;6、使用第三方库进行数据转换;7、自定义数据转换函数。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

224

2024.02.23

golang常用库有哪些
golang常用库有哪些

golang常用库有:1、标准库;2、字符串处理库;3、网络库;4、加密库;5、压缩库;6、xml和json解析库;7、日期和时间库;8、数据库操作库;9、文件操作库;10、图像处理库。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

335

2024.02.23

golang和python的区别是什么
golang和python的区别是什么

golang和python的区别是:1、golang是一种编译型语言,而python是一种解释型语言;2、golang天生支持并发编程,而python对并发与并行的支持相对较弱等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

206

2024.03.05

golang是免费的吗
golang是免费的吗

golang是免费的。golang是google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的开源编程语言,采用bsd开源协议。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

388

2024.05.21

golang结构体相关大全
golang结构体相关大全

本专题整合了golang结构体相关大全,想了解更多内容,请阅读专题下面的文章。

193

2025.06.09

golang相关判断方法
golang相关判断方法

本专题整合了golang相关判断方法,想了解更详细的相关内容,请阅读下面的文章。

187

2025.06.10

golang数组使用方法
golang数组使用方法

本专题整合了golang数组用法,想了解更多的相关内容,请阅读专题下面的文章。

191

2025.06.17

桌面文件位置介绍
桌面文件位置介绍

本专题整合了桌面文件相关教程,阅读专题下面的文章了解更多内容。

0

2025.12.30

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Go 教程
Go 教程

共32课时 | 3.1万人学习

Go语言实战之 GraphQL
Go语言实战之 GraphQL

共10课时 | 0.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号