WebTail源码分析

date
Mar 9, 2023
slug
webtail
author
status
Public
tags
Golang
summary
type
Post
thumbnail
category
updatedAt
Mar 15, 2023 09:52 AM

1. 背景

最近需要用到前端实时展示任务日志的功能,在github上看到webtail这个项目, 基本上能够满足实时展示的需求,浅析一下实现原理

2. 使用方法

用法很简单,直接可以监听http端口,基于websocket协议动态展示日志
package main
import (
    "github.com/LeKovr/webtail"
)

func main() {
    wt, err := webtail.New(log, cfg)
    if err != nil {
        return
    }
    go wt.Run()
    defer wt.Close()
    // ...
    http.Handle("/tail", wt)
}

3. 原理

首先需要构建 webtail.Service 结构体
// Service holds WebTail service
type Service struct {
	cfg *Config
	hub *Hub
	wg  *sync.WaitGroup
	log logr.Logger
}
Service 包含四个属性:
字段
含义
cfg
配置信息
hub
管理ative客户端&分配消息
wg
协程控制
log
打印日志logger

3.1 属性

3.1.1 config:

传递给tailer
// Config defines local application flags
type Config struct {
	Root        string `long:"root"  default:"log/"  description:"Root directory for log files"`
	Bytes       int64  `long:"bytes" default:"5000"  description:"tail from the last Nth location"`
	Lines       int    `long:"lines" default:"100"   description:"keep N old lines for new consumers"`
	MaxLineSize int    `long:"split" default:"180"   description:"split line if longer"`
	ListCache   int    `long:"cache" default:"2"      description:"Time to cache file listing (sec)"`
	Poll        bool   `long:"poll"  description:"use polling, instead of inotify"`
	Trace       bool   `long:"trace" description:"trace worker channels"`

	ClientBufferSize  int `long:"out_buf"      default:"256"  description:"Client Buffer Size"`
	WSReadBufferSize  int `long:"ws_read_buf"  default:"1024" description:"WS Read Buffer Size"`
	WSWriteBufferSize int `long:"ws_write_buf" default:"1024" description:"WS Write Buffer Size"`
}
  • Root: 默认:log/,表示从哪个目录下读取日志文件
  • Bytes: 默认:5000,表示tail时从文件末尾多大开始读取
  • Lines: 默认:100,表示为新建的连接从文件的倒数多少行开始读取
  • MaxLineSice: 默认:180,行最大长度,超过会换行展示
  • ListCache: 默认:2
  • Poll:是否使用Poll模式
  • Trace:追踪worker信道信息
  • ClientBufferSize:
  • WSReadBufferSize: websocket 读取buffer大小
  • WSWriteBufferSize: websocket 写入buffer大小

3.2.2 hub

type Hub struct {
	// Logger
	log logr.Logger

	// Tail Service workers
	workers *TailService

	// wg used by Close for wh.WorkerStop ending
	wg *sync.WaitGroup

	// Registered clients.
	clients map[*Client]bool

	// Channel subscribers
	subscribers map[string]subscribers

	// Channel subscriber counts
	stats map[string]uint64

	// Inbound messages from the clients.
	broadcast chan *Message

	// Register requests from the clients.
	register chan *Client

	// Unregister requests from clients.
	unregister chan *Client

	// Inbound messages from the tailers.
	receive chan *TailMessage

	// Inbound messages from the channel indexer.
	index chan *IndexItemEvent

	// Quit channel
	quit chan struct{}
}
hub 是webtail核心的实现模块,定义了如下属性:
  • log: 打印日志的logger
  • worker: 执行tail操作的实体
  • wg: 协程控制
  • clients: 注册的客户端
  • subscribers: tail文件信道的订阅者
  • status: 订阅者数量统计
  • broadcast:从客户端传入的消息
  • register:注册客户端
  • unregister: 注销客户端
  • receive:从worker传入的消息
  • index:注册日志文件列表
  • quit: 退出信道

4. 启动流程

webtail.New(log,&cfs.Tail) ->
NewTailService(log, cfg) ->
NewHub(log, tail, &wg) ->
service := Service{cfg: cfg, hub: hub, log: log, wg: &wg}return &service, nil ->
初始化os.Signal信道 ->
监听信道: signal.Notify(quit, os.Interrupt, syscall.SIGTERM) ->
wt.Run() -> wt.hub.Run()
// Run processes hub messages
func (h *Hub) Run() {
	//初始化subscribers
	h.subscribers[""] = make(subscribers)
	//见4.1, 第一次将监听目录下的所有文件放入index信道,后台持续监听
	h.workers.IndexerRun(h.index, h.wg)
	defer h.workers.WorkerStop("")
	onAir := true
	//阻塞
	for {
		select {
		case client := <-h.register:
			if onAir {
				h.clients[client] = true
			}
		case client := <-h.unregister:
			if _, ok := h.clients[client]; ok {
				h.unsubscribeClient(client, onAir)
			}
			if !onAir && len(h.clients) == 0 {
				return
			}
		case cmessage := <-h.broadcast:
			// client sends attach/detach/?list
			h.fromClient(cmessage)
		case wmessage := <-h.receive:
			// tailer sends file line
			h.fromTailer(wmessage)
		case imessage := <-h.index:
			// worker sends index update
			h.fromIndexer(imessage)
		case <-h.quit:
			onAir = false
			if len(h.clients) == 0 {
				return
			}
			for client := range h.clients {
				close(client.send)
			}
		}
	}
}

4.1 IndexerRun:

// IndexerRun runs indexer
func (ts *TailService) IndexerRun(out chan *IndexItemEvent, wg *sync.WaitGroup) {
	quit := make(chan struct{})
	ts.workers[""] = &TailAttr{Quit: quit}
	readyChan := make(chan struct{})
	// run 方法启动一个协程监听log/ 下文件状态,如果有文件新建或者删除,信息会被更新到out 信道中
	go indexWorker{
		out:  out,
		quit: quit,
		log:  ts.log,
		root: ts.Config.Root,
	}.run(readyChan, wg)
	//监听操作完成后,进行下一步
	<-readyChan
	//
	err := loadIndex(ts.index, ts.Config.Root, time.Now())
	if err != nil {
		ts.log.Error(err, "Path walk")
	}
	ts.log.V(1).Info("Indexer started")
}

5. 服务初始化流程以及http请求解析图

notion image
来看ServeHTTP的实现
// Handle handles websocket requests from the peer
func (wt *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	//设置websocket连接参数
	wsUpgrader := upgrader(wt.cfg.WSReadBufferSize, wt.cfg.WSWriteBufferSize)
	conn, err := wsUpgrader.Upgrade(w, r, nil)
	if err != nil {
		wt.log.Error(err, "Upgrade connection")
		return
	}
	// 填充client
	client := &Client{
		conn: conn,
		send: make(chan []byte, wt.cfg.ClientBufferSize),
		log:  wt.log,
	}
	//注册client
	wt.hub.register <- client

	// Allow collection of memory referenced by the caller by doing all work in
	// new goroutines.

	// hub to client
	go client.runWritePump(wt.wg)

	//client to hub
	go client.runReadPump(wt.wg, wt.hub.unregister, wt.hub.broadcast)
}
来看下hub to client
// runWritePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) runWritePump(wg *sync.WaitGroup) {
	wg.Add(1)

	//设置tick间隔,如果writeWait时长大于pingPeriod,则WriteDeadline永远不会触发
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		ticker.Stop()
		c.conn.Close()
		defer wg.Done()
	}()
	for {
		select {
		// 等待消息写入
		case message, ok := <-c.send:
			err := c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if err != nil {
				c.log.Error(err, "SetWriteDeadline")
				return
			}
			if ok {
				c.sendMesage(message)
				continue
			}

			//当触发WriteDeadline 或者 ReadDeadline时,websocket关闭,执行下面的代码
			// The hub closed the channel. Send Bye and exit
			err = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
			if err != nil && err != websocket.ErrCloseSent {
				c.log.Error(err, "Close socket")
			}
			return

			// 重复设置WriteDeadline
		case <-ticker.C:
			err := c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if err != nil {
				err = c.conn.WriteMessage(websocket.PingMessage, nil)
			}
			if err != nil {
				return
			}
		}
	}
}
client to hub 如下

// runReadPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) runReadPump(wg *sync.WaitGroup, quit chan *Client, inbox chan *Message) {
	wg.Add(1)
	defer func() {
		quit <- c
		c.conn.Close()
		wg.Done()
	}()

	// 设置读取客户端消息最大限制
	c.conn.SetReadLimit(maxMessageSize)

	//首次设置ReadDeadline(只要超过ReadDeadline客户端没有消息写入,websocket关闭)
	err := c.conn.SetReadDeadline(time.Now().Add(pongWait))
	if err != nil {
		c.log.Error(err, "SetReadDeadline")
		return
	}

    // 设置pong handler,每次收到客户端消息重置ReadDeadline,也就是第一次建立连接时等待客户端写入的timeout
	c.conn.SetPongHandler(func(string) error { return c.conn.SetReadDeadline(time.Now().Add(pongWait)) })

	// 阻塞读取client消息事件
	for {
		_, message, err := c.conn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
				c.log.Error(err, "UnexpectedCloseError")
			}
			return
		}
		message = bytes.TrimSpace(bytes.ReplaceAll(message, []byte(newline), []byte(space)))
		inbox <- &Message{Client: c, Message: message}
	}
}
功能开发完毕之后,发现每隔60s,websocket会自动断开连接,分析上述代码得知,websocket会有以下几种情况会自动断开 (排除发生错误的情况)
  • pingPeriod周期中,hub to client 超过 writeWait 没有写入消息
  • 等待客户端写入消息超过pongPeriod
为什么pingPeriod要小于pongPeriod?
writeWait还未设置的时候,websocket可能已经关闭连接了,会造成一些未知的错误。或者浪费一些线程资源?(我的理解)