4 ^- d" j& ~/ A& A# z3 Z爬虫(七十)多进程multiprocess(六十一) x( I1 u: l. B- @ R1 B3 q
Python中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。 另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。 下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。 - 8 h7 ~( k/ s& |. z/ H9 b
% G! D( k* t3 a& |: Timport os0 Q" B# i7 H4 d5 t" m
1 E; j7 X0 m7 V2 E: |, H" Y
$ e2 }( f4 N) H) [4 Y+ Q. j
: J& F- |( z1 M eimport multiprocessing! g/ V, D9 n. b: k0 L n: _$ H
& _2 b0 h( |* h4 P: X/ o% K- ' K4 V# f) E6 d. j4 T" T% Y4 x5 U
" t) e9 [% j; H1 o
5 j, l7 w! |. F7 Q2 L
: M. z# \ \& @: n
" @: J) d9 {: Q2 K$ ^3 m
" D+ D; U3 c2 d" h$ s+ V: Q ndef foo(i):
. w, X! m+ y4 a* X5 P7 T2 n9 m5 Z2 ]5 ?5 a( U4 N) ~
0 f( o3 C( Q u9 o* J- g @6 y+ K8 _1 j U3 ~; U
# 同样的参数传递方法* [; p4 t0 P7 _. Q9 T
C- \) Z* B* j5 {$ M/ A2 ~
* J7 @- E8 ^0 J# ~- R9 v7 m; r; Q2 C3 @/ V1 u7 ]0 {2 V
print("这里是 ", multiprocessing.current_process().name)7 p" c- N% A5 b2 p
) O9 W" P8 W4 Z! J, b0 x+ _- 5 n7 h$ G' f: p$ M1 h) h7 `" ?
$ _1 f0 W" W3 t' C% p( N print('模块名称:', __name__)
. e; W6 h* w+ x2 C `5 w' o" C3 D% z
. j) v: H: l1 a1 c8 r7 m" H! A/ T8 o. x4 @ C w$ Z2 T
print('父进程 id:', os.getppid()) # 获取父进程id
' j% K% g/ Y. T5 l, n7 o4 l& D5 g" U8 U! _% z
- $ H( E4 ?" b: N6 U$ Y4 |
. Y# b* b2 z) R5 H
print('当前子进程 id:', os.getpid()) # 获取自己的进程id4 B3 k1 d: X9 U! a4 A1 g
( O: O0 P8 L2 F
$ f+ r% D8 J9 s/ S' `3 P; ]: L' r
print('------------------------')
7 F8 O/ |; }" {8 v& G: K- X5 `1 h$ C7 P
- " s. _6 {" K; b# Y0 c
5 U( `# @' q6 V8 W
$ ?3 \9 L1 z: T/ ?' @6 |( v3 P
& v$ ?7 O* ~1 K' w0 g+ h/ ? - % W7 ?) Z. a6 i& l0 a- q
# ~. z8 u4 T% \ \7 Q3 z
if __name__ == '__main__':# G* X2 C) N/ T( o$ M( [0 I1 Q
- ?# J n7 }, Y4 W' t
0 [% D" @" ^- Q6 Y/ C @
) }/ d" f3 v# C5 R, p' g. q0 D
7 v: p! Q5 ?7 H7 T
- ' k( I/ y# x+ X: _; x
5 }. _: Q) L0 X, E" v8 g3 D
for i in range(5):2 g: E$ P4 o; C& O+ y- F3 S
7 _ A% Q0 t) T( D: j
- ~- n t! m3 x' e
P% o+ Y" m6 B; e4 x, r ~
p = multiprocessing.Process(target=foo, args=(i,))
, x( D8 O7 M# y$ {9 \& C* y. W4 L% _
- 9 C6 ?9 A* m7 o: j2 X, y5 ^
7 U7 C& {$ _; s) M p.start()% ^; \1 c) T7 P0 X) o, l( ]
( |' R e% q6 w. v7 G: p6 F9 _9 e" j3 I
运行结果: - ) y+ G' P0 e' Q- x5 o
2 `$ Z7 ~6 L% \1 A% q5 S0 S
这里是 Process-2
" c6 T* S" E6 L; L9 F0 h; k, w4 V5 Y1 `
- b+ C. X( J$ b" @' O" C2 P
6 W) ~# `: T# j1 G模块名称: __mp_main__ ?: \5 I# A- S8 W
: v' n+ I% _# D" L/ g- % o9 @, ?. U( e, ?
n% z6 W4 e9 v0 E) r
父进程 id: 880
* t( V7 H" Z+ V" }6 \" g, U# R
) M: v0 r" O% I$ W, |
2 ^1 z# l: S) o7 e
9 I2 K% J. [) C- ]" G当前子进程 id: 5260
) H3 b b2 G; \0 w, D+ ~- E6 {6 w' c& \* U3 D. G% n/ T h
0 F$ z4 G! B+ k
, K* _- X9 A4 i0 v3 o2 p$ w--------------
2 u. \0 n9 {7 {- E) l- z0 |+ y
0 \5 Q! \6 m8 A( S- : ~8 H" I# f0 y( s( B" s
" y& S( A6 Q, @1 F& I) d, }" y# r这里是 Process-38 u# z! w! z* T. U/ F
' j7 T" k3 r8 C/ b) g - 4 b- L* H9 Y& ^, K( e
2 d9 r6 K# s6 X1 \( c模块名称: __mp_main__
; O( q) G* j; I* b8 F$ [/ ]+ h. V" a/ o5 q0 l2 H# k+ G3 x
2 F9 h# E# l: p1 V, j; \3 J" J+ x) N9 K7 {& [
父进程 id: 880 X# F! m7 Q) d: b$ B4 X9 d; N
. o/ m1 ^* G }& p- " W. m+ R+ Q3 G
$ G% q+ [7 j# ~* k当前子进程 id: 4912: U' p. K$ y" H
1 N0 t" I: k$ }. M2 L - / h+ g [9 S) _
8 x+ a0 x5 s3 ?--------------
1 I/ H1 v- R# f# C( c/ U7 F6 K7 f. f: R" V: h D" m
- 3 [7 _+ {2 r( a* {% v. P% z% l
; b Z7 k6 v, {9 |这里是 Process-4
9 G. r% n5 X) K' P% }1 m
p) U! ~9 V) O1 x! y" ]6 w; _" g" ]
) S2 `- B# u' ]. g/ \
8 q9 p" b, k- F X' c: G模块名称: __mp_main__
9 s3 I% C; ]% a, d! {7 q$ w# ^8 s' m" X
+ y9 s7 S+ O) I- l
# l e" A$ A" e Y3 H& r% @$ o1 d父进程 id: 880
/ S) y3 U5 t7 x$ ]) {) X4 h* ~& V. W; G5 X
' U2 o/ r! x g, I
4 d+ N& R: z1 P9 q& ?& Y当前子进程 id: 5176
# w+ }, Z) p) x6 g6 i
+ x G& Y) j7 g( ?+ u- z
( _" j5 z7 g8 G5 d, Z s# R: p
9 J; X' \; U: \* v- `0 F8 Z--------------' H7 N. Z7 I3 g. R
$ y# F1 N4 W" T9 n8 S+ ]
- 9 {, b" \, [4 b- r* }( @
# b& z1 J" q0 ]/ g' d
这里是 Process-1% B5 W+ n+ r# x8 D/ L
+ d8 R+ G3 d4 y" j1 j" E+ n$ o
* L% s4 ~$ F0 u( W% r8 U2 ] W# D
8 e# u9 l U P! i& d l/ V" ]模块名称: __mp_main__! a. z9 x0 T$ r- t
4 i# k1 h/ W; U$ H( A; R" G# P
& C/ z4 S5 [& [! y8 N8 F I2 o* K- {$ t% h M$ w
父进程 id: 880/ m6 L/ \4 E9 E! B" V5 g) |
$ }) f- K6 E% C- 7 V% O+ G5 C$ b% S$ _
1 `- h. S: f- |& }* v3 h
当前子进程 id: 5380 Y4 \/ [4 s; v$ X: K6 g. A
- a0 y$ _ U* H" R" @8 E* S4 h% U: I - 9 W' L5 X! {7 N7 d$ N
3 ?* W- E0 r+ f; t3 h
--------------* z. k; l, D+ W# X, Y
+ L& W( M( O/ ~2 f, W
- ' p* R. n! N2 Z J- E, s9 N$ ~5 N
6 x9 |) D, l' B3 ~4 x7 Z/ f
这里是 Process-5% F- ~" D a* z4 l" e/ T: L
5 k. T8 h' m; D
- ( R. B. e* p6 i8 ~6 k) n
$ P# L; q, o0 d# J) |5 |8 ^
模块名称: __mp_main__
# v! Z! b! c8 L: m3 m+ o; n( T3 x8 q+ j9 L' K
) X" L# Y4 n, u3 p* |+ D7 u9 c9 m) e% s# y
父进程 id: 880
: s, j! M0 z) v- Q. q7 G2 f9 @" K9 d# G7 |
+ z1 D; z( ]5 j' r: @0 L- u" ]( L w, D' }$ n! K" E9 w4 T$ w/ `4 ^
当前子进程 id: 3520
; R9 X* q! _$ Y+ W% q: ^& Z2 T% p/ c
+ [( h. S& s7 d6 O! N9 A: k4 l( r W" i
--------------/ j+ Y- h/ d5 _+ \* {; p+ N
2 y0 R+ X4 [. W3 q! D+ Y6 B5 `8 t
1 b% \- o$ P. ?% k6 G. k9 A 1. 进程间的数据共享在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。 创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。 下面我们尝试用一个全局列表来实现进程间的数据共享: - * v/ s; T* O$ Q; D3 l
) B6 w/ `. q2 Cfrom multiprocessing import Process
( ^- \' R& Y5 r! }0 m3 G
+ v$ o- F) }# G7 g& x - % `( k: Y8 t6 f8 H. |3 Z
2 S: Y9 t+ z G& P8 G4 v
5 j$ o5 @- C+ O0 S% B2 } j
6 o" v5 D+ m7 X: h9 C$ h
5 \; ]& r r* u+ `% {) k
% f& l* \) q, H% P! plis = []% J. e5 T2 s# g, A% h' U
' p/ R" i- Y }% F u. H+ a" h
- - V8 b% g8 i$ e/ {9 z! a
1 d, Z# V4 H# [1 [' g: x
1 s& ?$ y& W( q0 o. _+ U2 @
( h* q# C+ u. |8 g) g3 f5 ~1 Y' _ - 6 H5 P2 I' S" S. w0 l) S
- f5 Z) @/ d7 l' i3 U2 a0 M B2 C9 Rdef foo(i):+ u7 l+ s' t$ r* k
; h* T2 G; I5 q - G9 \: A7 ~! m1 q6 q# `+ o
% v! v! k% Y# q+ p$ p V. a$ l1 J: E, _4 E' Q lis.append(i)
6 {! L8 v! W a; J
2 H8 p# @$ x: N
8 `( B" I6 H2 y# W/ g. X; }! g8 w& D! _, _0 X
print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis))
- O2 E- l+ B7 _2 c# [; [( S' _* `& A8 R% C" F2 j7 q
- 3 D1 X7 @; j- H8 D2 @1 ~
: O# X5 x" B8 t- A( |$ G
& g5 ?: C) @+ M
5 H& m+ b5 f3 h
! T+ |, h8 p( k$ X7 a# Q% M
l3 x' K" q0 ]6 t, j& M: Yif __name__ == '__main__':! O- Z7 U2 I) g U5 ~% f
! f: C% G) y3 f9 J( E8 M4 G
- ! v4 W' l, J f% G8 G2 h: x
. R v! q6 w+ ~
for i in range(5):# e9 ]4 ]9 Q }
% u" V8 w# K: W: w5 y2 b( V - O7 l1 ?( ^& W# I
" N& p3 r; ^( z) E" u& q! [6 X2 A& G
p = Process(target=foo, args=(i,))- J* ^/ w7 {9 a) h2 w; B
+ c; t9 @ z% H/ C& }
s2 z- y! ]* l3 W) @# K/ \1 W b: }/ |
p.start(). g. [) y4 y) q+ D8 C1 s
/ w0 I' [2 Q+ u! r
- ( S( q! z% Q" B; i8 f) x8 C+ d& `
- v: t8 q/ E3 R
print("The end of list_1:", lis)# t6 c# Q& t7 q1 T- _/ d
3 O) o) I7 W' R& N% z9 A
1 U: I+ Y" _0 z3 S! J
运行结果:
, D8 H, C7 f8 D+ J6 O! m, h8 K8 ]! a1 v
The end of list_1: []
f' @& p6 E6 w
( @; U" q- b" F/ _' q( F( H
8 j, I1 }6 |. |( n P0 v/ D% L' v$ y j: t
This is Process 2 and lis is [2] and lis.address is 40356744
1 ^' V- V, c$ a4 B6 T4 a
( B; y! h% T2 s# O
4 D4 v! Z4 k. m, e0 k
4 r; O6 |4 v, G. Z8 M& Y8 w2 Z2 pThis is Process 1 and lis is [1] and lis.address is 40291208 m# }# K# |+ S- L' I2 e- d( q) p& R/ b
/ O, z# F* |5 W, I3 e8 | Z& `
1 [5 s$ n1 b% m9 l
: K p. Y* U9 |1 W8 qThis is Process 0 and lis is [0] and lis.address is 40291208
3 u$ E3 U! J0 i: {/ p/ g1 m* |* Y- c5 }; K7 |3 V( m9 v3 W( A
- ' k! P% }7 { R; D1 I
+ ?/ `$ t) p3 X6 l: d# B. cThis is Process 3 and lis is [3] and lis.address is 40225672
+ l) D6 W6 U; h8 {9 B8 F+ C
# u) }# }3 m/ m [8 x( l$ w
! o# o! M5 A( B6 h+ r
/ A2 r' e! ?1 Z1 R3 D" B& VThis is Process 4 and lis is [4] and lis.address is 40291208
' v/ y" g0 W4 Y/ J" w# ]: r: Y4 |1 j$ Z, E& J: r
5 N- y( p* P7 G: m' L
可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。 想要在进程之间进行数据共享可以使用Queues、Array和Manager这三个multiprocess模块提供的类。 1.1 使用Array共享数据对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系:
: n$ D& h) H) H+ w7 w4 h9 g/ G* c+ E" h8 R8 Z7 z# ?& h
'c': ctypes.c_char, 'u': ctypes.c_wchar,
+ @" C) R3 C" U! G& @1 j y. C
4 [( b {& N/ a4 c+ I# E9 t
7 S% d5 h5 c* i6 J. u, y7 z
% ^# ]5 m" ~3 o2 b- f( P7 E$ y'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
8 D8 X4 u% e* g( X5 Q) z
- P* H; f: D' S) x( w7 J9 R# _( T- ]9 T. {: _+ p
9 Y* ~% G$ H! P0 p/ ]
'h': ctypes.c_short, 'H': ctypes.c_ushort,
! \9 a2 B* E; y0 K
. f0 F6 C7 B: x$ C! ]! H) J7 x0 O
, B, [0 G6 F% d3 q% b, b; n, i1 a5 K) v) O& d/ K' {
'i': ctypes.c_int, 'I': ctypes.c_uint,
$ e# t* d& N* ]9 c2 Q* p9 l7 {/ K+ ~8 S
* c; _3 ]1 O* o0 }6 T; t! P; Z7 D; c* q7 c9 A! C6 u- {
'l': ctypes.c_long, 'L': ctypes.c_ulong,. _) b# W* z- ]* t( S$ j
( F3 m% l3 X3 }- m
% D5 J+ ~9 u, i! R9 a2 m: `, p( G; m1 R( ?3 l6 G; j2 \% t) ]0 [
'f': ctypes.c_float, 'd': ctypes.c_double
( D. l j0 H6 b, f+ c2 S% q
7 j$ h* L5 d7 h5 u
6 P2 c T- F; A: H$ }7 J ^' W3 P
看下面的例子:
' T$ D1 _9 U/ r. L* @0 J$ j' |. h; }7 o+ S; k; v6 j2 F
from multiprocessing import Process) j2 g6 x& { D) w, Z/ j
9 f# x: x' i6 C3 w3 F
- : R( t% ^) a4 u& A
5 c: M( Q3 \! T1 G9 Q) O0 x: Xfrom multiprocessing import Array( I3 M8 f+ V" L6 |1 f
8 C4 Y9 h' [, c! P
6 L {+ X" Q2 S: D: ]8 K& E+ |5 p) m# g' I2 e' O3 H% C
& n0 b& U# ~- S, V! ^% ~; ^
+ @3 |* h4 T# ?- & A0 e& [3 i; X' ~* E- ]* X
" E* N! Z- [" L8 P4 v1 C/ pdef func(i,temp):% _( S. K! F# j; o! |6 S. U9 d
; Y5 D' Z- b, f& ~; F4 R - ) b4 B5 g: I5 ~+ Z6 p2 f0 Z
! s8 @/ \' ] {0 H/ }+ a4 o temp[0] += 100) y% G! v7 F8 m
' n' i# B# l U' n6 f/ `" \+ L
- % O% | i+ J+ i) J) p- E3 `
$ Q# A. B1 t; ~3 J. a: Z8 L; `
print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0])3 r% f- l/ \# n$ h$ W
. A8 d! L5 T; \; c! b
$ i0 V7 k7 t% B# k
# L; N0 I1 R4 W' A5 V: b
0 j3 c$ Q' h5 j' e0 h% ^* u( _9 O
7 k' |8 `: N1 L$ P* U$ R
1 E# d0 j1 S4 ~+ {: y( E6 N: r2 h5 J: U& S+ }5 C. h, y6 D
if __name__ == '__main__':$ B$ \9 W8 |; R2 [% O- B" i" \
( l e6 m. ^" q9 z3 F- : C) D$ ~) f9 [$ X% h5 r
1 Y) @9 u2 h8 b' m. p temp = Array('i', [1, 2, 3, 4])
, _- J1 }$ j- r: x
' x& l! w! B5 j' j6 C$ z, _
. K- }2 J' ]2 C# s; D
( h8 a( r$ `$ j8 i/ n/ H for i in range(10):7 f) D6 O! h( N+ D6 J
% p* Q- Q# Y- U) j- P/ U8 Y% ?' j7 K
, \! ^# f5 Q; \8 |
4 Y2 u0 e- ~- R p = Process(target=func, args=(i, temp))
# \* v- @/ `: q+ J, G7 {$ ?
( h) ]" u) `6 }" h
7 G, F3 v1 O# u) R! o
7 f6 N: B C; c: ]2 d# Y' E, P p.start()
9 |3 P& h) [* R v( f9 [1 `1 i4 S5 A( }8 O$ v
9 k* U5 q$ l( ~4 {) F1 e
运行结果:
o7 ], x& H: T5 M6 _7 q
9 b! p6 E: X! x7 j& Q进程2 修改数组第一个元素后-----> 101
# |) U4 g0 [: d" |; B' J+ C" v# X {+ \4 S7 y. s: f
- Q) \6 ^. X$ Q: A, f
2 |# G' W. d: p( z% n% B
进程4 修改数组第一个元素后-----> 201
0 O: s# m4 l+ _5 X4 [' t, k' l7 K
; j9 D2 ]$ o$ r4 f; | - ) a5 V- O) Q# u. D) i& f! s4 g
) I2 C, O! K' ~) }
进程5 修改数组第一个元素后-----> 301) R4 Q8 Z4 _. P+ }
0 Z6 R: \! J# M - ) G/ j( F d4 V5 @" i
; w. @+ _% _7 @6 @/ d s# B
进程3 修改数组第一个元素后-----> 4010 r2 R2 G. I; u5 u% v
; E8 J- w. }" _9 j% J3 V" @/ p
# [) a+ J+ H9 @. u0 S+ L) s; a! ]- J* T9 q9 r, E+ V7 z6 R4 K( k
进程1 修改数组第一个元素后-----> 501
5 [6 _4 @6 s# S6 p( G3 t( r
/ |0 Z K9 A1 I
$ T8 {* l. }9 h, U) r% c* M2 P2 S D* W( J
进程6 修改数组第一个元素后-----> 601
( e/ @( ]0 G9 G! g# g# S4 A% u! v7 `
@9 l8 t) {' f4 N/ M2 K" n
2 v( ?) M. M6 V6 E+ H- G进程9 修改数组第一个元素后-----> 701
2 t, a# a+ f! Y# d( W! G
2 \. g6 g c" t1 p) a# P
, R/ _/ t |) v# ?+ S$ _& h8 l) C v6 ?9 E
进程8 修改数组第一个元素后-----> 801' a+ `" r7 p' j7 J0 G9 q: r
' l/ H' z1 i. n E
5 F+ A' E! d$ ?
* s6 J5 k# m( G# g( }# X( o" i& Q# R进程0 修改数组第一个元素后-----> 9018 G9 Y/ B6 J' X1 T2 q' S; `
/ r( l! w2 z; Q' C( y% ]- P0 C
) K, j( y! g/ u, X' c$ l5 i+ Y1 ^" V0 O5 ]% Y. t$ l
进程7 修改数组第一个元素后-----> 1001
3 m- Z' }8 u, x+ z7 L) T
1 k( u A) O6 s$ W! V% ^/ z
* p5 C) ?1 A* K7 _ 1.2 使用Manager共享数据通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。 - * A8 `7 n* L6 f
, i9 g2 G3 t' {! Y* [: G) v- p
from multiprocessing import Process
+ ~& X! J! N' {) w, @# _. a3 J) t
+ T' ?, Y l8 f
* I* c$ c r% G7 n* H
5 Y* k" d0 S, w& k" B v& S/ Rfrom multiprocessing import Manager ^ ]: U" G/ a& Y
9 d$ u" w( S6 e8 J4 g8 b
- 8 ^* v7 F# F% o/ U" A7 Y3 l! g" L
% w' D2 L+ ?3 v/ }
* t) k; n! E" F/ n$ [, C& q: T8 l9 B2 c9 L% s$ T/ T, |0 b: l# @" W
' u. D) s4 D2 ~# _& ]
2 V# d. G0 V* H" t7 F- Bdef func(i, dic):
: [$ r* ~; \, U- u+ E
* j, l" F" N( L
4 [3 J! b4 L0 L! l6 C2 l0 t! ] a2 M4 C" \/ T
dic["num"] = 100+i
, a8 T$ c2 W9 V2 V0 g8 R
8 g+ o, D8 H8 [8 D6 @8 W# g3 ~/ [
' i- ~/ _6 K) J( m
8 K% c# ~3 I% w7 \) Y; d print(dic.items())
8 a. _. z q3 l' a! b
' Y* ~4 |! X; C# ^6 ?% J- 6 o! r/ i, k- s3 `/ y. D9 \6 c s
, w. s5 m- G+ }+ n
( \8 Y- t5 ?# a* m
4 o* Q' Q1 O# v; [5 Z
- " q/ @' a' i) h6 x( l
$ j3 A+ M1 q( O5 zif __name__ == '__main__':
) m, J3 j" z. d6 k6 P. |1 o2 d
8 }. y% f: q7 A. K6 ], P W - 8 U5 d4 U1 C w$ z$ m, E1 w
3 k; @9 {/ Q% `: A T3 s dic = Manager().dict()5 A1 ]$ a# f M$ t! H' x
/ E; s9 h: v, `3 H
' i) }3 ^ f: ?/ F2 U: l: D% p
; ]: X$ M6 J/ i for i in range(10):
" [+ q: s6 I+ x, O9 {4 e! X5 {2 i+ A8 ^4 M6 a' c9 x; |) ]' T
- + O% U7 _, f% K
( |& J m3 l. I0 U' ^' O+ d- U: r
p = Process(target=func, args=(i, dic))
1 o2 t) x/ @# X, Z: U, u8 ]; R Q( _' _6 l4 H$ o
# z) w7 X6 U% \& f7 K- X$ i
( |+ b4 r3 n5 P N/ u& n p.start()
5 Q- H- ^# i) f# J, H; |: Y+ N* W
0 Q& T2 p% E9 h5 Y& g: _- ( a7 H' K1 B9 Z% c
! ?' N+ X ?9 b, m L. l h
p.join()
0 e* @9 }7 k* V3 Z. b
& p: k% S1 L C& o) n2 y6 u* D
0 t5 F$ E6 P% x# x1 Q
运行结果:
* K, k# |( S+ _/ f* Z1 j7 H1 _* y8 `# ]$ k1 }
[('num', 100)]/ o% p4 M5 l, T9 o& n
) c8 ^9 e+ u% B# y5 c' F
# e$ O _1 [# A# U7 R9 _
* T6 x" w1 w1 T[('num', 101)]3 s A- P- o+ f, ]# q
7 |5 K: x1 B& A" D8 n7 z
- ! `6 R: P9 I+ w, h3 _
- D4 ]2 I* b1 J[('num', 102)]
# U8 o3 ?+ L1 g* J, B
7 N. V8 p' e6 G \. Y+ M - 2 K* K9 N: d$ X5 Z9 a; ^
+ v/ o* G- k H( ^+ |[('num', 103)]
7 k5 d% e9 g+ V( I. v1 G0 s7 j/ C/ J+ q
' W# X- W. a, F0 i7 a2 q# S' \6 M7 G1 O
[('num', 104)]
" n/ m9 D) ^' {/ \3 q5 _2 l2 C' v2 N4 D* I9 k Z- l- v) p
# a- }' ?' r0 v7 F7 T& e8 n0 K+ x8 m% y! @5 g# G; ^
[('num', 105)]
5 U, n/ b0 ]* b1 `/ l& {( v
t+ P/ m9 A1 o' K& b$ N
' U, M5 r# b0 W! Y4 h/ V/ [- g9 \' W* y# Y
1 |$ B0 e8 p* c$ c* r5 D9 Z/ R[('num', 106)]- V9 F |: e! h |+ u- s0 I7 Z
) r2 X. H" D; h8 x- E- F5 w7 ^) O
8 q0 Z4 k& w% A0 ]4 @
8 a! s5 z; r, i; n[('num', 107)]: O3 w" e! v8 ]! j! W0 L# \- `
! t! o( b. c1 P* T
7 `! C9 B0 t8 w! _3 [- G5 E k7 H6 N& U" L
[('num', 108)]0 g/ a7 D) T3 O, E R$ q h( Z
; ^# U& b# G, b) _3 K! D! l. v
- 6 r2 A" M1 _0 s7 v
- Y4 \) m' u. F. ?[('num', 109)]! t6 X0 {. X9 v, l9 h9 s* E
/ q8 `! i) @$ e5 K0 l$ L1 ~. C
/ f _1 F; a9 N6 {: \& D& D4 k 1.3 使用queues的Queue类共享数据multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示: - * Q8 W0 x! D) d% @) F! h
* q1 T" [' F3 C2 q# l4 U
import multiprocessing
7 T1 \1 u& O; N' h6 N7 ?9 k7 A7 N" O8 {& E5 b; Z
4 M$ J( b4 o# S- B% H2 [
' Z/ i" a4 o k+ l. Y- O1 l2 Jfrom multiprocessing import Process/ z4 F& q$ I' r3 l: t
) d0 U/ V+ ?: J6 K
; g. f( u9 c$ |/ u7 t0 T0 X$ _9 j7 b5 C' N; }
from multiprocessing import queues; Y: n" l: G. P0 m9 h$ L4 w; @
4 d/ B) r9 {5 G7 |0 a
- k6 z" V! [( ?- i
. d4 n2 @3 W) r: _9 l" B% w8 N! v1 k. A5 M& P+ r b% E' K' T$ ]
* A' h4 ^% e- O N+ b5 N# x
- , G9 w% \- D! M* f: n" |7 ~
7 `' t; P, H6 U) l
def func(i, q):. T9 M& T+ H: {) }+ h5 Z- S
3 @; j1 _( r( ?# Q D, `, t5 h - * p/ W4 F, I" _6 [/ u" p/ v1 k; K
% L: P6 k, j7 B( Z
ret = q.get()* n @2 i' i, y
H7 [5 [" x7 N: O* M
' Q) l1 T# u% P. w: |; ~/ a* a3 I4 y/ }) @6 Q
print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i))( h8 P9 [8 c8 C. k' ^. Q
6 H+ @, K5 [4 ~ r1 \2 F- 7 O, b0 K+ R. h R
- u6 U0 B% f, X* i q.put(i)
# w! e: w* U& R2 `4 [9 C7 ~- T9 P# e* Y& W2 L' b" s' C
! e+ h) f* c7 P& u- U8 a
3 U \3 E5 @: ], T I9 K1 R V6 y$ C9 i" i9 y
: \' u1 O* X6 S' g! Y2 o
! ^/ l, H W2 B: s9 y4 p5 M
; t( D8 v: q( c6 g, y2 Wif __name__ == "__main__": D9 Q3 D# ]2 `! n3 g' ]
1 H; W# o0 O( I. ~( q6 L! I7 ^
4 L# X- F. c Y3 h
! w& F! h5 S% r1 }$ O lis = queues.Queue(20, ctx=multiprocessing)
1 f. h. o$ {: a1 p' H7 m0 k, n1 K+ p7 }* m& U$ V, E' z
- q. M2 a( O/ v! V7 o% }. D8 R. A6 O7 b! Q
lis.put(0)" A6 ^* W+ b Y( b; q+ m
3 N* }2 ~4 Z* W3 v8 W: B
6 B' L# U( x0 J+ B
. w9 p9 r$ j: j for i in range(10):' J1 H" t# U$ h8 p
( }8 t. l' L# d t, g. c
7 _& Z- C. ]8 p* H; D' q
$ H, L; K( z0 M p = Process(target=func, args=(i, lis,))! b! G+ g7 I3 w$ s! c: j, ]9 T
3 t7 ?1 \; x3 p+ a
- E8 C! T R% [6 |
9 S- I% f( u1 o$ _4 S& }% ^+ Y p.start()
9 s9 n# P2 {1 a, `0 d& J; Q/ G- {1 w! W+ ?% M8 w
( E3 {7 L3 C1 h) R1 \7 [9 _2 t
运行结果:
/ d! K$ r/ P( y
0 r3 n- x" |; o" E进程1从队列里获取了一个0,然后又向队列里放入了一个1
" J8 w# E) a7 I! ^3 W
! E/ V) y, U- X" {0 n5 y3 O
$ A* ~! l& H$ w: @% Y2 y
" c7 m S4 Z: c4 l1 \0 }4 v$ Z$ ?9 R进程4从队列里获取了一个1,然后又向队列里放入了一个4
" u0 q$ s- d) A) m0 W
5 k+ f9 E0 t" V# R7 R- & e9 ~, ]; T0 j& E4 M, v
8 r! r7 b& o9 U* ^" m: U* C
进程2从队列里获取了一个4,然后又向队列里放入了一个2
7 v6 H/ c; x. P- V9 z4 v
: `" `7 w+ z- t3 ?8 x7 b4 @ - 1 v9 S/ y1 p! G r# h. G# K
# \9 ^$ F1 e5 d: s( s9 U+ H进程6从队列里获取了一个2,然后又向队列里放入了一个68 {! y' d) E" U: A1 E
a9 Y [0 l/ U7 |; m( w
; i5 r F' F( F6 _3 x
I9 ?" s2 a( I: N4 z& R进程0从队列里获取了一个6,然后又向队列里放入了一个0
) [' s* n( m' y. c, y, S, W5 m2 R. d8 x* D ?0 v3 [, _4 ?% M: a
- 0 |' h% `! w+ t( x# E
% C! I7 V3 T3 ^进程5从队列里获取了一个0,然后又向队列里放入了一个56 z/ U7 L) s4 s: }5 k* l. U3 X
2 z5 w6 u5 ~* b6 b* m
/ P( G# C0 N6 w$ q0 k& A- }; x t% q6 o" q
进程9从队列里获取了一个5,然后又向队列里放入了一个9
+ {3 L2 n4 [ ~% @9 N! U; L" F; B: B( X
- 4 D0 R; ?0 }4 B3 }0 s
/ C( ~ x0 N+ @/ T. |8 F, l, z2 f2 n
进程7从队列里获取了一个9,然后又向队列里放入了一个7
4 n- o9 M2 o/ I1 k
7 P" ?8 F" ]9 w4 E5 B4 i
8 {% }$ M& A2 l4 b& h
. ?; h- s5 C/ y进程3从队列里获取了一个7,然后又向队列里放入了一个3
2 E5 X2 O8 N# X0 t6 o1 I7 |2 w
$ Q5 \ a" ]/ s) L/ R* [- 4 W. I3 }0 q) j! E
1 R% {! ?& r7 a- D' X进程8从队列里获取了一个3,然后又向队列里放入了一个8
. C3 c' r3 {# Z5 L& f$ Y
, r+ b9 \" V5 y$ a2 w: q2 d# H$ c7 e) a- f: e$ C
关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。 2. 进程锁为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的,这一点非常友好!
6 ]/ A* B- M( l: k
3 K- D" f' E: w! n) ?+ nfrom multiprocessing import Process" b) [+ h& n5 c- \+ o. `# |
1 E# y0 `' t- o8 R
9 _# N* S" U& ?. A. m" R
: Z9 ~3 I0 y+ sfrom multiprocessing import Array& l( q2 e1 @; k9 t
+ \9 a7 i2 a6 p, {' ^" A1 l
! ^4 W! J# c1 A; q2 \
- U6 d" K: l( ~from multiprocessing import RLock, Lock, Event, Condition, Semaphore# X6 @3 \ i( ^
' x% t- V$ A' x5 r Y
5 ?8 j- H2 P; ]& h! q% G" Y: z; W' [1 h4 l. ?
import time( |* l( B2 G$ g0 m( L9 \+ W- h
) i6 O9 ^$ P- f' N' ~9 a0 n
$ z, T0 R. w0 q5 d6 n1 l" o: F, d( q4 ?; r
: {- m8 S5 p/ [; N( S, ]: U
: j: G. p% A6 o8 i1 i( ?5 N0 Y7 q; O1 g- ! A6 F Q" {7 Y+ k6 Y3 ?( d
( l1 D) a2 E' f% N- V9 I
def func(i,lis,lc):% L7 X* g' G( U$ v. _
5 ]9 o9 [" S# }/ e7 [1 @ - 8 V& E- }: B U' G
0 T8 ~' }, A0 r8 K2 | lc.acquire()* E+ f9 L$ ]3 {( u- P, O
Y; f) I1 c y8 ^8 J* C; `
$ N8 |. x0 O* R) y5 |. U( m L. Q" F/ v) {! I6 R3 ?
lis[0] = lis[0] - 1
) F; @; P+ c( p" c4 q. V0 h
; T' C6 |' ^: F* w6 F ^3 C
+ a e( p/ F0 K) s( y' v h: d+ c: z1 G) L6 x% P8 u
time.sleep(1)
* v+ |- r9 ^& t0 t) G) [: L: |4 _4 |' c' m0 K! g
3 l# {$ n2 x' B4 o$ O" n: J) C
- |. E$ x$ r/ h" \9 [ print('say hi', lis[0])
6 h- G& w" b2 X( \
( p3 _+ z) ~& O) @ ^% D
! \- |7 v: C. P6 Q8 ~; v8 Y, s5 `
/ ? _0 s# @3 ~8 R E0 N lc.release()
" J( ]+ O. x9 P. F+ q% y7 p' ?
' n# O( m5 o f- 7 X$ y- R+ T5 m8 p' Y8 T
+ m. `8 l5 h8 h7 L5 T: \% y
, S" q* Z2 y2 |3 E" z& c% B+ N" }
+ H, _4 q @5 Y2 F( ^# U* N
4 d" m; ~4 {4 q3 b5 K% ]if __name__ == "__main__":
+ U; S [0 }3 ^, M6 j% {. c: z7 {/ |4 t8 z Z
& ?7 i$ ~6 R% F) U, l
+ x& O. k' G$ Y( C array = Array('i', 1)( D1 h9 D3 @' p6 w. h! \ n! ]
" T5 l- V& Q2 _6 q
- a' F* `' Z% d8 J% S0 _1 v4 P, O/ i7 x6 ^* H. `2 p. ?3 `5 L, D
array[0] = 10! \- Q6 \, C+ T) R: } i- p. z
( ?; X+ q0 K) C. @ g" w" M( }
$ m' M% e" n- n$ J9 q! R9 ^& p8 Z, V: ?$ p2 H+ X+ Y
lock = RLock(), R- u, V2 a2 r( d, x4 W# v
4 j2 Q9 r( Q. S- J( ^' u6 B R- ; O2 c) z5 t3 ~% B% d4 c* J4 [
d5 U f; C4 K4 q2 R for i in range(10):
* ?$ k, c6 i2 t3 V& S( ]
$ F; k3 Q2 O& L( e7 L0 F' J
- r- V; q( l, y1 x8 u6 @6 W& Q: V4 f/ h! f. U8 ?3 o4 N, p
p = Process(target=func, args=(i, array, lock))- M) B3 `" P! t
/ q& u+ U0 o/ _
9 k+ h+ l) R0 R5 @3 m, }1 ?; U1 R1 m; g
p.start()! Z6 R6 f+ e$ c/ C, i, \
8 C7 L" x; I8 b7 D
0 \8 E" L9 ?0 w/ \" Y8 @. `4 F
运行结果:
4 i# p" Y( n3 t% v: O8 E% f
n2 J5 K1 z- g- J& ksay hi 9
$ N/ v6 g- c0 P1 k$ N! E4 D3 ?
' R+ C K: G# k( o. x# R8 a
3 x: y8 X: ^2 d1 h! S0 R+ m5 c, U0 f/ Q$ B. l
say hi 8! }6 u# o( V% s' f" k
9 J* M- ]) \+ ^. ?4 w- Y- , x3 Q6 X2 l, F' `$ ^: A7 m
/ N, L [! \2 A' g: t- l
say hi 7( }. f1 _0 V3 q, r
$ \7 T- A4 y) e9 t
- / p! e! F+ x+ k' u' {# H
; Y5 Q' e. C* M: d; t# psay hi 6
2 ]" ?1 p7 H l9 Y
u& `5 V5 |) I
0 V5 m/ x4 m5 x u
* c3 U) L1 _- ~. Jsay hi 5 D# S) H- }( b" P; F
% u4 n7 t8 q" C. P z3 T- . |7 l6 u8 Q' _8 l4 }( M- B& H
& S: Y2 ~5 N# L- b$ o6 L; msay hi 4; K. T" U" p6 _
: b0 ]$ n0 L' U5 | - 3 o0 t7 |1 |" \+ u! H9 N v* [
1 a+ H4 }( o3 a/ |7 p& |+ i
say hi 3
! ^- G. [% E9 u6 d% @
) V, \8 p6 F. c3 P) i* ] - ' p/ k- O- n& S' {9 Z+ h# x7 l
# V+ v) z( E, {9 O9 d/ {
say hi 2
$ l: l9 `) t, g. V+ N( \! N. z1 ^# h8 T0 U' \: k1 M# o+ j" o
: @6 |% c+ a8 u6 l ]0 {) o% t- D3 V: B2 |, V
say hi 10 w* _2 ]3 r- o7 f' @
" j" P4 X9 H4 v+ `
- , R5 r( f( b: {, b
# Y9 m9 R+ B1 S
say hi 08 u+ J$ n: n% m# v, H& l
( g3 j! g! e+ I( |; `- m
% a: Q& }# S' c4 c 3. 进程池Pool类进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。 比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。 进程池中常用的方法: - ; B( ^/ q' ^, o& g
0 `( X& T! x0 Z, g# I! E7 E
from multiprocessing import Pool* r7 r u- a: Q* l
8 z3 L7 l; \8 O; O9 j
& H. Z$ e7 Z2 Q. d( Q6 I
9 A! [9 F$ j w) S& R; Q) q R7 Wimport time
9 h. ?1 U" Z3 K$ B: h
4 D6 U- |% ~6 n6 m1 y; a' d" }
9 q7 a/ e. F2 D7 H( n9 s- S4 K8 I3 \ } z! d
' Y" v1 N+ _9 q+ z& |4 ?0 ~- C8 t( ?( H/ L- ?# M0 A
9 h6 V, a' }! H0 K% D& |) e T8 k2 S# |1 D9 E5 b
def func(args):
* p4 `# m- T) |& K+ k4 j+ K! Q: ?* y1 l+ U
- 9 B- @3 m1 j# M, `" l. K- Y5 g
2 L" w* b5 U m: A
time.sleep(1)
1 }' j- @6 U; y; l4 {. X! @( {' a7 e- q
+ G& k5 M+ L9 d/ s& x# X$ q7 O! Q* s
7 Q9 |$ U5 O( t; b+ N2 ~, L* \; E print("正在执行进程 ", args)- M+ ?8 I0 |/ D/ G, G, }
! j4 ?" H, ~. R
% E7 M' V9 j6 y3 O L
h9 m* s) R4 m( `2 @6 Q* S+ d- G, ]% [$ [& X1 j, h% H! c
! P4 C( a3 s& D
/ `+ A! `; C0 t- W$ T1 o+ L
( H7 o6 y* g, n8 j+ Lif __name__ == '__main__':
) ]: S' g, M5 U( y
( S- w7 s' J# d- ) o* D) z2 k9 C, m
3 B0 W4 H6 D1 h" X) u* w/ e! G2 L* c% j5 R2 C$ @/ ]7 r) T
3 Z4 P5 q" A0 y* o) _' o
0 a% p4 c8 a! X' U, e1 L% h7 ^. \
* }# i9 g: p% N# V p = Pool(5) # 创建一个包含5个进程的进程池
4 q8 }0 m* R0 R5 n7 l' Q1 J5 b& d+ w( A) c6 |( _
- # S( y" d9 g7 Q! v5 q
" K! H4 Z# N, J' @+ m/ c: a
) @( t9 j1 e* p/ ?/ I/ l- i# E% _7 H0 }7 G# }2 d
4 H) \. E# S# `5 C6 j0 x7 @4 Q! Z2 p" ~" Y- s @
for i in range(30):
' t! A9 i0 b9 D t+ Q
9 [3 v2 O5 R% J( n" m& E- A
" t9 u( Q3 {* r$ p+ K& K+ @! a2 H3 ^! F# x# p& j& T
p.apply_async(func=func, args=(i,))% h2 s; q2 X# _
# e$ n b% ]* I3 T# m% y) S- " J& l" v0 D l% c$ g
% `/ `6 Z% j, U% H5 h
7 P4 M- M# ^/ J
- B6 p! g7 D# a' O4 p! |) l( \
" h' T7 o( l6 Q2 S% F) T2 v
" s) J0 |0 _( B. W' i; E5 W# H9 P p.close() # 等子进程执行完毕后关闭进程池
) [% m) F6 q' R6 N. y0 `* T% P3 w V
4 R# b+ B+ A3 T( V0 x8 T! t* l7 I0 r" l
# time.sleep(2)
1 X4 M- I% }+ m. ?" M+ i- d" R' \% e# Q+ k9 d; `6 Z
2 o! L5 m2 a5 |, z4 l" ]8 I; z
8 d$ I( [6 L @& c! c # p.terminate() # 立刻关闭进程池
; }2 R5 C. O! Q5 t& e7 u
& ]1 Y) p# O6 h( r @0 w% x9 k V" Z
1 {5 h+ e& s3 e
; p9 w/ n* i$ I" v- Z; l1 c$ R p.join()+ G6 |$ m* G$ S- `# v% T; q
+ {) i o( q. j# Y- L
. R7 |/ {5 j& T) o2 h
# {) `8 A- t# k& A$ R8 E3 [% W请继续关注我 0 B; y: m% c7 P5 y) N4 Q& _& F
9 c6 C y, Q" P3 T8 x3 U* s; U
|