func(s *Server)Run()error { ... for { conn, err := s.listener.Accept() ... go s.onConn(clientConn) } }
在经过错误处理之后,这个连接对象被塞给了onConn方法,我们顺藤摸瓜继续看。
1 2 3 4 5 6
// onConn runs in its own goroutine, handles queries from this connection. func(s *Server)onConn(conn *clientConn) { ctx := logutil.WithConnID(context.Background(), conn.connectionID) ... conn.Run(ctx) }
经过一些处理之后,会调用conn.Run方法,继续看看Run方法里面写了些什么。
1 2 3 4 5 6 7 8 9 10 11 12 13
// Run reads client query and writes query result to client in for loop, if there is a panic during query handling, // it will be recovered and log the panic error. // This function returns and the connection is closed if there is an IO error or there is a panic. func(cc *clientConn)Run(ctx context.Context) { for { ... data, err := cc.readPacket() ... if err = cc.dispatch(ctx, data); err != nil { // error handle } } }
// dispatch handles client request based on command which is the first byte of the data. // It also gets a token from server which is used to limit the concurrently handling clients. // The most frequently used command is ComQuery. func(cc *clientConn)dispatch(ctx context.Context, data []byte)error { ... cmd := data[0] data = data[1:] if cmd < mysql.ComEnd { cc.ctx.SetCommandValue(cmd) }
dataStr := string(hack.String(data))
switch cmd { ... case mysql.ComQuery: // Most frequently used command. // For issue 1989 // Input payload may end with byte '\0', we didn't find related mysql document about it, but mysql // implementation accept that case. So trim the last '\0' here as if the payload an EOF string. // See http://dev.mysql.com/doc/internals/en/com-query.html iflen(data) > 0 && data[len(data)-1] == 0 { data = data[:len(data)-1] dataStr = string(hack.String(data)) } return cc.handleQuery(ctx, dataStr) ... default: return mysql.NewErrf(mysql.ErrUnknown, "command %d not supported now", cmd) } }
这里我摘取了一部分 dispatch 的代码,以我们最常用的 query function 为例,接着看handleQuery函数。