WebTail源码分析
date
Mar 9, 2023
slug
webtail
author
status
Public
tags
Golang
summary
type
Post
thumbnail
category
updatedAt
Mar 15, 2023 09:52 AM
1. 背景
最近需要用到前端实时展示任务日志的功能,在github上看到webtail这个项目,
基本上能够满足实时展示的需求,浅析一下实现原理
2. 使用方法
用法很简单,直接可以监听http端口,基于websocket协议动态展示日志
package main
import (
"github.com/LeKovr/webtail"
)
func main() {
wt, err := webtail.New(log, cfg)
if err != nil {
return
}
go wt.Run()
defer wt.Close()
// ...
http.Handle("/tail", wt)
}
3. 原理
首先需要构建 webtail.Service 结构体
// Service holds WebTail service
type Service struct {
cfg *Config
hub *Hub
wg *sync.WaitGroup
log logr.Logger
}
Service 包含四个属性:
字段 | 含义 |
cfg | 配置信息 |
hub | 管理ative客户端&分配消息 |
wg | 协程控制 |
log | 打印日志logger |
3.1 属性
3.1.1 config:
传递给tailer
// Config defines local application flags
type Config struct {
Root string `long:"root" default:"log/" description:"Root directory for log files"`
Bytes int64 `long:"bytes" default:"5000" description:"tail from the last Nth location"`
Lines int `long:"lines" default:"100" description:"keep N old lines for new consumers"`
MaxLineSize int `long:"split" default:"180" description:"split line if longer"`
ListCache int `long:"cache" default:"2" description:"Time to cache file listing (sec)"`
Poll bool `long:"poll" description:"use polling, instead of inotify"`
Trace bool `long:"trace" description:"trace worker channels"`
ClientBufferSize int `long:"out_buf" default:"256" description:"Client Buffer Size"`
WSReadBufferSize int `long:"ws_read_buf" default:"1024" description:"WS Read Buffer Size"`
WSWriteBufferSize int `long:"ws_write_buf" default:"1024" description:"WS Write Buffer Size"`
}
- Root: 默认:log/,表示从哪个目录下读取日志文件
- Bytes: 默认:5000,表示tail时从文件末尾多大开始读取
- Lines: 默认:100,表示为新建的连接从文件的倒数多少行开始读取
- MaxLineSice: 默认:180,行最大长度,超过会换行展示
- ListCache: 默认:2
- Poll:是否使用Poll模式
- Trace:追踪worker信道信息
- ClientBufferSize:
- WSReadBufferSize: websocket 读取buffer大小
- WSWriteBufferSize: websocket 写入buffer大小
3.2.2 hub
type Hub struct {
// Logger
log logr.Logger
// Tail Service workers
workers *TailService
// wg used by Close for wh.WorkerStop ending
wg *sync.WaitGroup
// Registered clients.
clients map[*Client]bool
// Channel subscribers
subscribers map[string]subscribers
// Channel subscriber counts
stats map[string]uint64
// Inbound messages from the clients.
broadcast chan *Message
// Register requests from the clients.
register chan *Client
// Unregister requests from clients.
unregister chan *Client
// Inbound messages from the tailers.
receive chan *TailMessage
// Inbound messages from the channel indexer.
index chan *IndexItemEvent
// Quit channel
quit chan struct{}
}
hub 是webtail核心的实现模块,定义了如下属性:
- log: 打印日志的logger
- worker: 执行tail操作的实体
- wg: 协程控制
- clients: 注册的客户端
- subscribers: tail文件信道的订阅者
- status: 订阅者数量统计
- broadcast:从客户端传入的消息
- register:注册客户端
- unregister: 注销客户端
- receive:从worker传入的消息
- index:注册日志文件列表
- quit: 退出信道
4. 启动流程
webtail.New(log,&cfs.Tail)
->
NewTailService(log, cfg)
->
NewHub(log, tail, &wg)
->
service := Service{cfg: cfg, hub: hub, log: log, wg: &wg}
return &service, nil
->
初始化os.Signal信道
->
监听信道: signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
->
wt.Run()
->
wt.hub.Run()
// Run processes hub messages
func (h *Hub) Run() {
//初始化subscribers
h.subscribers[""] = make(subscribers)
//见4.1, 第一次将监听目录下的所有文件放入index信道,后台持续监听
h.workers.IndexerRun(h.index, h.wg)
defer h.workers.WorkerStop("")
onAir := true
//阻塞
for {
select {
case client := <-h.register:
if onAir {
h.clients[client] = true
}
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
h.unsubscribeClient(client, onAir)
}
if !onAir && len(h.clients) == 0 {
return
}
case cmessage := <-h.broadcast:
// client sends attach/detach/?list
h.fromClient(cmessage)
case wmessage := <-h.receive:
// tailer sends file line
h.fromTailer(wmessage)
case imessage := <-h.index:
// worker sends index update
h.fromIndexer(imessage)
case <-h.quit:
onAir = false
if len(h.clients) == 0 {
return
}
for client := range h.clients {
close(client.send)
}
}
}
}
4.1 IndexerRun:
// IndexerRun runs indexer
func (ts *TailService) IndexerRun(out chan *IndexItemEvent, wg *sync.WaitGroup) {
quit := make(chan struct{})
ts.workers[""] = &TailAttr{Quit: quit}
readyChan := make(chan struct{})
// run 方法启动一个协程监听log/ 下文件状态,如果有文件新建或者删除,信息会被更新到out 信道中
go indexWorker{
out: out,
quit: quit,
log: ts.log,
root: ts.Config.Root,
}.run(readyChan, wg)
//监听操作完成后,进行下一步
<-readyChan
//
err := loadIndex(ts.index, ts.Config.Root, time.Now())
if err != nil {
ts.log.Error(err, "Path walk")
}
ts.log.V(1).Info("Indexer started")
}
5. 服务初始化流程以及http请求解析图
来看ServeHTTP的实现
// Handle handles websocket requests from the peer
func (wt *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
//设置websocket连接参数
wsUpgrader := upgrader(wt.cfg.WSReadBufferSize, wt.cfg.WSWriteBufferSize)
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
wt.log.Error(err, "Upgrade connection")
return
}
// 填充client
client := &Client{
conn: conn,
send: make(chan []byte, wt.cfg.ClientBufferSize),
log: wt.log,
}
//注册client
wt.hub.register <- client
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
// hub to client
go client.runWritePump(wt.wg)
//client to hub
go client.runReadPump(wt.wg, wt.hub.unregister, wt.hub.broadcast)
}
来看下hub to client
// runWritePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) runWritePump(wg *sync.WaitGroup) {
wg.Add(1)
//设置tick间隔,如果writeWait时长大于pingPeriod,则WriteDeadline永远不会触发
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
defer wg.Done()
}()
for {
select {
// 等待消息写入
case message, ok := <-c.send:
err := c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
c.log.Error(err, "SetWriteDeadline")
return
}
if ok {
c.sendMesage(message)
continue
}
//当触发WriteDeadline 或者 ReadDeadline时,websocket关闭,执行下面的代码
// The hub closed the channel. Send Bye and exit
err = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
if err != nil && err != websocket.ErrCloseSent {
c.log.Error(err, "Close socket")
}
return
// 重复设置WriteDeadline
case <-ticker.C:
err := c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
err = c.conn.WriteMessage(websocket.PingMessage, nil)
}
if err != nil {
return
}
}
}
}
client to hub 如下
// runReadPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) runReadPump(wg *sync.WaitGroup, quit chan *Client, inbox chan *Message) {
wg.Add(1)
defer func() {
quit <- c
c.conn.Close()
wg.Done()
}()
// 设置读取客户端消息最大限制
c.conn.SetReadLimit(maxMessageSize)
//首次设置ReadDeadline(只要超过ReadDeadline客户端没有消息写入,websocket关闭)
err := c.conn.SetReadDeadline(time.Now().Add(pongWait))
if err != nil {
c.log.Error(err, "SetReadDeadline")
return
}
// 设置pong handler,每次收到客户端消息重置ReadDeadline,也就是第一次建立连接时等待客户端写入的timeout
c.conn.SetPongHandler(func(string) error { return c.conn.SetReadDeadline(time.Now().Add(pongWait)) })
// 阻塞读取client消息事件
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
c.log.Error(err, "UnexpectedCloseError")
}
return
}
message = bytes.TrimSpace(bytes.ReplaceAll(message, []byte(newline), []byte(space)))
inbox <- &Message{Client: c, Message: message}
}
}
功能开发完毕之后,发现每隔60s,websocket会自动断开连接,分析上述代码得知,websocket会有以下几种情况会自动断开
(排除发生错误的情况)
- pingPeriod周期中,hub to client 超过 writeWait 没有写入消息
- 等待客户端写入消息超过pongPeriod
为什么pingPeriod要小于pongPeriod?
writeWait还未设置的时候,websocket可能已经关闭连接了,会造成一些未知的错误。或者浪费一些线程资源?(我的理解)