' Q$ S( ]+ Y0 t( a. E$ J3 v2. 功能点介绍5 [7 j$ B+ e* t7 K
MySql其实就两大块,一块是MySql Server层,一块就是Storage Engines层。+ i" z! l# `& f. G% r5 N
7 q7 W9 K/ ?: I$ d5 v- R
<1> Client * [; I( O* `) k8 x( v不同语言的sdk遵守mysql协议就可以与mysqld进行互通。 5 g/ d& I/ h! h; U( ]2 e+ g2 s p0 ~% E4 v
<2> Connection/Thread Pool- U4 \) _1 V1 I. G% z5 {6 B' ?
MySql使用C++编写,Connection是非常宝贵的,在初始化的时候维护一个池。/ C6 B$ k) c3 L) d
& c( X9 a1 X! d( B ^' O* O4 n7 ~<3> SqlInterface,Parse,Optimizer,Cache - s9 Q9 ~! k$ L. U对sql处理,解析,优化,缓存等处理和过滤模块,了解了解即可。 8 a9 M1 w5 Z/ V$ s& M 1 a8 N2 O; V6 C5 x<4> Storage Engines & r# l+ r" O6 G: h) ]负责存储的模块,官方,第三方,甚至是你自己都可以自定义实现这个数据存储,这就把生态做起来了,🐮👃。 / i3 k- k- k+ C 7 @ _" N: ]) q. B# F4 D三: 源码分析* b% N* Q, }" Q! f. J2 }- L
关于怎么去下载mysql源码,这里就不说了,大家自己去官网捣鼓捣鼓哈,本系列使用经典的 mysql 5.7.14版本。% ]/ X, z- g9 h- t5 x9 |
! O5 P4 ? i! i) H
1. 了解mysql是如何启动监听的( n7 g* i6 h9 {4 i0 H+ |( D
手握百万行源码,怎么找入口函数呢??? 😁😁😁,其实很简单,在mysqld进程上生成一个dump文件,然后看它的托管堆不就好啦。。。8 B8 V7 y6 |% S + |5 q% U. y; j! ~0 e9 g& y
3 c' h* r9 { S8 c& z- z ' p9 r& C. |, D7 L- T! f: d" t- B9 y从图中可以看到,入口函数就是 mysqld!mysqld_main+0x227 中的 mysqld_main, 接下来就可以在源码中全文检索下。8 X5 F+ E' ?& x
& ]. C: G" E5 A( i B
<1> mysqld_main 入口函数 => sql/main.cc+ z+ q+ x. R6 y7 U
( a' F6 x* A w% G1 t' \ 2 t( U8 h/ b, J- Xextern int mysqld_main(int argc, char **argv);3 z/ R# X4 Z+ b& x' K
* [# `" o+ u' W# k. K% z0 h3 c rint main(int argc, char **argv) ; s3 C+ y0 J$ O' M$ x2 D{2 I% i- f8 a% }2 y( u1 Z+ c
return mysqld_main(argc, argv); 8 P) d3 [5 d' o6 D1 X( c I W' v} 1 B3 r9 T. x6 D/ n9 _, c1 K' n( s2 i) u) ]) _0 ~
5 ~# M2 Z; j6 u
这里大家可以用visualstudio打开C++源码,使用查看定义功能,非常好用。: A& V6 ]1 K, i
% p3 d; ^' \# D/ Z
<2> 创建监听 ?4 `% S" D$ L0 M
. Q; o6 D6 W, I% W
! _3 F- m* B* f6 o
int mysqld_main(int argc, char **argv) 0 t% P: T0 c( H2 J' i: d( Z) k{ % ~! d% n% Z) c. X0 D //创建服务监听线程1 D0 D; i+ O5 s
handle_connections_sockets();* H M7 p) K- h- Q, o+ `2 c# t
}9 H' _& u# i; C. w! P; B
6 `/ g: Z6 D/ y
void handle_connections_sockets()" S0 w7 M$ Z: D1 v- P) Q
{! ^8 I% M, j3 M! \6 o8 _
//监听连接8 k; W) N9 f7 c1 A p* w6 t
new_sock= mysql_socket_accept(key_socket_client_connection, sock, - _6 y! Z3 U" ]3 k$ ]/ t (struct sockaddr *)(&cAddr), &length); ; m; t+ e! D: B6 l3 r - w! Q/ p) b& {) X if (mysql_socket_getfd(sock) == mysql_socket_getfd(unix_sock)) " H1 H. P( `1 s" M' x! @2 k thd->security_ctx->set_host((char*) my_localhost);) D3 l# [- {. }2 q, s+ B! P/ w
# ^1 r5 }: J- o
//创建连接* o( H5 S; p2 |. z2 N/ ]* }& U. x
create_new_thread(thd);2 W1 P: D% T7 H6 g" r9 V b$ ?, g
}5 j5 R* l) e. D5 n8 C# O/ t R
0 U3 f$ M5 e, u9 b7 o
//创建新线程处理处理用户连接 2 t9 b* I, i' N$ ]$ b! `. l' d \static void create_new_thread(THD *thd){ , Q ~, Q' m: U0 v$ }6 w; X5 v* q o! b/ B2 w0 _1 E
thd->thread_id= thd->variables.pseudo_thread_id= thread_id++; & J8 _% f! G {. X5 u7 f 6 Y6 r# Z. k+ Y1 [ //线程进了线程调度器9 x; e* \5 V/ Y {
MYSQL_CALLBACK(thread_scheduler, add_connection, (thd)); * _. X/ z: T$ [9 Q4 I) Y4 H( Z
} 3 S) u* h$ h: v 1 K0 A( `3 o# R; r 7 @# J; A) N0 S8 W至此mysql就开启了一个线程对 3306 端口进行监控,等待客户端请求触发 add_connection 回调。/ \' |% A% T5 V& P
/ b& O& C- u' g: c4 c; i; F. Q0 R2 J" I
2. 理解mysql是如何处理sql请求* M7 J: Q. ?: n# ?( I
这里我以Insert操作为例稍微解剖下处理流程: ; h) z7 |6 n: T7 ^' L $ ^$ u& T3 O7 r$ e' Q$ p' m当用户有请求sql过来之后,就会触发 thread_scheduler的回调函数add_connection。" O. f7 d. ~! O
" w; |! Y" Z3 |( r, b. C
1 e3 w( |6 Y6 u( W! Rstatic scheduler_functions one_thread_per_connection_scheduler_functions=5 c5 ]: R8 j0 S# e+ G8 [
{ 5 e- J. n: ~% r" A y2 g 0, // max_threads / Y0 e+ k$ V7 w) \( [ NULL, // init; ~* z: v- V2 C
init_new_connection_handler_thread, // init_new_connection_thread " J9 c$ z) i$ H* j create_thread_to_handle_connection, // add_connection + z+ p- q4 Z4 s. N NULL, // thd_wait_begin 1 y% U4 k6 a% J8 a; t8 ~ NULL, // thd_wait_end L+ f& |8 f0 e u7 ], Y* o NULL, // post_kill_notification ( }7 \/ {8 _/ a. \ one_thread_per_connection_end, // end_thread+ g+ g- y5 U c4 g7 K; @ {# m8 v
NULL, // end : k: q$ b7 ]$ Q}; * z5 Z! v) t$ d- I/ n7 w- T. I& X0 e* X" n, p# ]% x$ _/ h3 Z
' P3 w4 J0 i+ T
从 scheduler_functions 中可以看到,add_connection 对应了 create_thread_to_handle_connection,也就是请求来了会触发这个函数,从名字也可以看出,用一个线程处理一个用户连接。+ s7 A M4 z) t$ m! b$ W
) A0 b& x- ]3 l5 }. D" A, e<1> 客户端请求被 create_thread_to_handle_connection 接管及调用栈追踪+ Y: [$ S8 H" ~8 V. s" ~/ a
3 ~/ s# G# v2 _: V4 Pvoid create_thread_to_handle_connection(THD *thd)1 V, I# d5 j5 Z6 ^; l
{9 s8 g8 |* t l. ~
if ((error= mysql_thread_create(key_thread_one_connection, &thd->real_id, &connection_attrib, ( f; g% {" W$ p: e5 D3 a handle_one_connection,(void*) thd))){} 2 F% b% M J( J- G2 L7 H3 h, Y4 |} ! X; |* H3 k1 ~//触发回调函数 handle_one_connection H4 H: g. K2 U( a: l* v: G( J
pthread_handler_t handle_one_connection(void *arg) 0 I) O6 D+ \$ {* n: s{ 8 \- g* N; E8 G+ O do_handle_one_connection(thd); ' L$ f6 Y2 J$ g/ z+ e0 |} J; G+ j& |6 v; F$ S( a% x
//继续处理 3 x+ C- h7 e% r; l" i. x0 i0 nvoid do_handle_one_connection(THD *thd_arg){ ! `1 \- X# h2 v4 _4 m while (thd_is_connection_alive(thd)) . V0 }, i7 F% W$ t! t1 _3 a2 [ {9 y" e+ ^2 q) B# r6 R8 ]
mysql_audit_release(thd);) O7 x7 I* n& V( `9 J3 |1 m4 u H1 H
if (do_command(thd)) break; //这里的 do_command 继续处理9 N- i. K; }0 p
}- {, W1 t4 R8 A: `% q* ?
}. K% m1 `! o+ J k+ W; x8 _
//继续分发 + S1 m% q1 \( J" o8 P9 `& k! G7 I% s( Lbool do_command(THD *thd)2 }' T& F7 Z9 |- h* d: @ l5 ], d
{0 ^0 P3 g# j! N5 U1 f
return_value= dispatch_command(command, thd, packet+1, (uint) (packet_length-1));: n, X2 R# B4 G6 J, [0 v- M
}* ~8 H& t0 h. B" ^5 |
bool dispatch_command(enum enum_server_command command, THD *thd, char* packet, uint packet_length)6 @3 T6 Q: t, `( `
{ 2 L. M9 E5 u. F, u7 g8 l switch (command) { - g% z" U; J0 O case COM_INIT_DB: .... break; 6 p6 j8 w; |2 T0 s ... : f+ f: R- B; Q! ~, s0 s case COM_QUERY: //查询语句: insert xxxx$ n0 o N1 d* I% [$ T( j$ v1 p% z# m
mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); //sql解析, x% W7 ]) r9 C9 I! }/ d
break;' {) Q+ M x/ o7 A6 U+ n2 y
} + a- ?2 [ ^ Z/ @ I2 \2 `( G}- g& [# n. E" j2 I$ H; m
//sql解析模块 4 F- ?% z( C/ C# c+ ]" D+ \void mysql_parse(THD *thd, char *rawbuf, uint length, Parser_state *parser_state)1 o' J( R; ]7 ?# s. F4 C
{! K( O* M/ |# ]
error= mysql_execute_command(thd); % D! v( L+ g6 S9 {; M} G) o5 x& c; v- {
- r( r% }1 n' k8 m
0 A! V4 [6 E6 f, {! N% ^5 F, K+ e
<2> 到这里它的Parse,Optimizer,Cache都追完了,接下来看sql的CURD类型,继续追。。。 / t( }7 F" W) @( H- S# U; e6 F3 N# D7 E; q* |
//继续执行& t2 C. ^( B$ h$ M$ X7 b6 l5 _
int mysql_execute_command(THD *thd): p0 B( I9 K! o. _* Y6 n; r
{ ! U) [# A0 B, E X" T switch (lex->sql_command) # C+ v( n! |+ ~# F7 V
{ p/ L* z1 N9 `6 U: d
case SQLCOM_SELECT: res= execute_sqlcom_select(thd, all_tables); break; / d7 }1 m, Z1 s$ q7 T, j- X# E' I; q; P5 E
//这个 insert 就是我要追的 ) B5 M) a9 N. d! a. y case SQLCOM_INSERT: res= mysql_insert(thd, all_tables, lex->field_list, lex->many_values, ) C, N* O" q6 B# |) m; L lex->update_list, lex->value_list,- ^; l: U3 Z: n1 ]1 q" P
lex->duplicates, lex->ignore); 4 g0 a$ U0 X: ~- ^+ d/ E! t3 D7 F& X9 k } + v0 {. g& E, q; }4 e} 0 h; e, E& O0 E/ x8 M0 N//insert插入操作处理 / @4 ?& w5 C) C! U- {& e. fbool mysql_insert(THD *thd,TABLE_LIST *table_list,List<Item> &fields, List<List_item> &values_list, $ s, C# O2 ]1 n( g) a List<Item> &update_fields, List<Item> &update_values, / m* ^. ]' Q) ~+ E. V( Q) b" Q i enum_duplicates duplic, bool ignore)1 Y- |! _; s2 U) L. E% N1 {* b2 H
{3 k9 e; ?) W, }+ F, X$ k+ _
while ((values= its++)) 8 l+ q; k, d* ~2 j3 ` { ' Q9 p, g+ R2 F error= write_record(thd, table, &info, &update); " G; g) H2 b, y } / a$ Q. k+ \2 i+ o/ f" \} , _) K5 G+ m! E/ X4 _/ G//写入记录" L2 v7 X! s) I; Z8 t. p
int write_record(THD *thd, TABLE *table, COPY_INFO *info, COPY_INFO *update)# a$ z" ^, s) J: m: \
{ % O7 G+ [8 t) m/ y6 M5 Z if (duplicate_handling == DUP_REPLACE || duplicate_handling == DUP_UPDATE)2 b2 I T9 W9 N2 _. w" C2 d
{ ( l) }( m& v2 a3 W // ha_write_row 重点是这个函数7 c- [5 c: N; j; d: o
while ((error=table->file->ha_write_row(table->record[0])))2 g* o M. { B- |" K4 L7 F
{ 6 z1 ^; z# X- m( m/ ?% ?' a3 e! e' n3 \* g ....8 U6 M1 F. P* y4 |; B
} 0 p) [' b5 C% |5 ]* B( E } & P% c* v7 A" d$ g}. T$ Q5 m, ^- D0 j
5 y3 K( _* k3 c0 w% I2 q9 y: W
7 r& O( Y, J* a1 H/ R- [( _8 {3 Z 4 ?! `. v2 x! B% {可以看到,调用链还是挺深的,追到 ha_write_row 方法基本上算是追到头了,再往下的话就是 MySql Server 给 Storage Engine提供的接口实现了,不信的话继续看呗。。。 6 `8 \5 g' K2 ~+ x/ P; {6 b2 r+ H) s7 j* u# l, I. r# V) p8 q
<3> 继续挖 ha_write_row 0 p) `. b- l `7 v C& i5 j: R5 V! K& g8 E. d' h
int handler::ha_write_row(uchar *buf) " f$ a2 J: E; O9 i+ u7 U2 Q8 h{ - v6 g1 t. |) r) s A2 D4 S$ T MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_WRITE_ROW, MAX_KEY, 0,{ error= write_row(buf); }) $ I0 s' k! F/ T! }: F7 R/ t} h" t+ d6 m4 k5 R' D" F3 q 0 c1 }6 V& e2 W/ `, h- \' {+ W//这是一个虚方法 & k' R& B/ L* v9 b$ h) lvirtual int write_row(uchar *buf __attribute__((unused))) 8 Q& t# g8 x* _+ k6 J& M2 z{, w# Z7 N# z/ n6 U9 ?) u
return HA_ERR_WRONG_COMMAND; ( H: `2 M. ?) V( w& k}, W5 {# y0 [9 o1 I
; p5 M% \8 F3 I. s$ c
. z0 |& U, L" p
看到没有,write_row是个虚方法,也就是给底层方法实现的,在这里就是给各大Storage Engines的哈。😁😁😁5 [4 y+ z$ @. W$ u# f7 y6 ?