% C# O3 d; u! d0 f
爬虫(七十)多进程multiprocess(六十一)
B" J2 M$ t# D/ `5 lPython中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。 另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。 下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。
' U* @' S' u3 k4 j" E( |2 T, |. i
( T) v9 u/ J2 g- M4 `5 @import os
! d a, G/ H4 d# F" N: t% Z. O3 k6 [2 b# e$ H* F; M' ?2 E M
: C4 O& h+ n0 ]- |0 U
1 _$ c5 [7 \2 D7 ^import multiprocessing
8 g7 H' m6 G* k( m. g8 y& ~+ d+ q
) @6 q. B/ V+ k+ y+ A
, R! r1 m4 Y9 W5 T% k2 N- W- `& X! U8 A/ F9 D( V: Y
" J& D7 P% e c x& s; A" @
, C4 a8 N: }. Q3 x) k/ N
% c# Z, k( H9 z5 [) \% ^3 m4 ^
2 v6 i# V* ]8 M' u% {, ~- j% Z% ]def foo(i):
( U( W/ w. @3 T: P- X+ K, P$ \# w6 m; i) @' T' V6 s
- & O |, q1 v9 B0 u3 b
" A% J# Z' T) c, e' I # 同样的参数传递方法
2 {( K d/ D% C9 w' \
; H: o/ z6 E* a- {+ L% G6 q
k( @! K* U' l' y5 I, m
% T7 A& s' h9 x; m* \2 J print("这里是 ", multiprocessing.current_process().name)8 C! M% D: k$ I" X
( y+ ^: @' r5 M% ]7 t
1 r- i; b( G2 G* F( W; e3 B" W5 D% D
print('模块名称:', __name__)! V5 F. U B' E4 J/ `, z
5 a) L" _6 L7 M- }$ ]+ \& W: J8 t- 4 Q8 l( z5 [% Z7 D
4 d% O. o" A1 b7 h print('父进程 id:', os.getppid()) # 获取父进程id1 S9 K( B. {" W) g j" b' i6 j
* q5 H7 m! z5 A1 E; ~9 h6 a' ^
- % g3 o) U, d& S# L% L
9 Q; T) D7 L, S
print('当前子进程 id:', os.getpid()) # 获取自己的进程id" t# z& m* }0 I0 K# {9 A
& `$ }9 H! Z+ |* }. {& S
- 0 h) l% C. Z) P0 _% H) n x
) m4 Q F% T& f& }: p* P
print('------------------------')$ t- Q9 G* B. w! z
& O8 {5 F9 j8 h4 Z
- 3 \, v+ Z& L2 P8 A; O' N! ?
1 V G6 X* v+ f2 I/ Y
) q& e3 q/ V; w2 {/ y! ?! s% z
6 _4 l0 w0 [ A% E- y) R
' _- |: U4 r5 J/ R2 I5 I* ]+ [
, j, ?/ e; W0 V$ ~( \if __name__ == '__main__':
* @; Z3 l) V8 j; C6 q
3 z! \( x! Y% S/ ], u- / k9 `" e2 w+ T. |8 L q
7 k3 X' D5 |$ W" a- k4 O
3 ?+ {9 K4 r) X% [+ e6 ?. F
8 `9 R; w; K# h - 3 E; ^% m3 l, l# g: S s; i
( x, X K3 E% Q! n' {
for i in range(5):
3 m: A7 U, @1 v4 `2 C3 A& O, o
/ A% l/ R# o+ Q+ a6 I( h
6 ?0 H" O7 ~, G& a0 v
/ |. L+ U$ V, \ p = multiprocessing.Process(target=foo, args=(i,))' y: }8 s# X4 N! L3 Q$ v0 k' J2 K
7 I- A0 k! b- Q+ k& a
& s+ ]) ~* M/ C R/ t- L6 D! g1 o2 y3 V5 D$ F
p.start()8 t) n0 c9 u% D' T& z& D: o( R
7 Z, H9 m' l5 i5 f0 R" f7 X* @# N& \/ ?5 @9 r8 C+ N
运行结果: - 6 K3 Z8 b' H( `) [9 o
1 S0 M" h& [( s0 n( q
这里是 Process-2% N- e8 p3 ` P9 \: K
# G3 x/ w8 @) b0 Y
, x( @* O* g; c; N3 ?2 F
: {/ m. U, |8 g8 b; d1 Q7 B模块名称: __mp_main__6 d+ G0 P& v u- A
# C4 g+ Z% Z5 ^- V
2 [) e% ]/ c- |1 Z3 W- W% ^
' x; E- K/ I; Q% S; x- {父进程 id: 880
: Y F' ~8 V9 F" y( D9 v& h) |# o" b" Y3 D0 _
$ R) H9 Y9 Y9 {$ u1 V
& t6 N$ c. Y2 I9 x$ j3 Y6 z当前子进程 id: 52603 B2 @7 t7 ` |" t: Y: B. ^: u
' j. P9 b$ h# [9 { ^
# l7 C1 U6 N; k, a) B
( {" v6 Y4 J9 L6 D--------------
8 e; X; a$ c% @1 D/ F$ N. r; N7 b0 g7 f; R7 @# ]/ L# C
- 1 e7 s$ r7 o" a. {- ], `' N
4 v0 E# Y. v4 N
这里是 Process-3
( K3 v: \3 R- a3 R4 }) G* X9 V4 h. ?7 T8 p g" o9 g2 N! w# S: m
: x3 B& Q0 s7 ]' t- Q7 q9 u+ D: P0 C, E$ O0 l( r k
模块名称: __mp_main__' S( U) S: g4 W, E7 k
7 g9 i9 S! c. X% |
- * i; v4 f2 Y' T! q
1 A7 O3 c1 Y$ `( ~; x. E父进程 id: 880% v* z& R ~# V0 y
( R5 N4 N' W6 a7 n) v
0 Z1 g9 m& W( M7 l: r3 D/ d- F: N/ O, L9 N9 O, w
当前子进程 id: 49129 e4 \6 [; L3 i- b6 |
( u9 b5 t& y. Y( j& \* D
+ h6 w+ T" @" V) t* W- T) ^
* E) Q+ D v6 v9 r--------------
) E" `3 ^) b6 L* s2 [9 U
& s- D: N- f: o- I1 |- p- 1 [7 y; U2 ?- ^
; ^# ^$ j7 l5 \. @ X这里是 Process-4
, m$ f2 z$ v( D" H: u# p5 k$ r; L) L
/ @% y3 h7 W% w" y - - v4 ^' M: \6 e M8 c( Z, P: V
% V3 T/ v; w7 C1 N0 }0 `8 R* C模块名称: __mp_main__
) }. j4 k! z$ f$ d
6 L# k! J( C( q. ^* |; q, i" |6 Z - $ A, e) Z, Z3 @+ m* q( R
9 V7 g# U6 X' v4 v( m$ L# c
父进程 id: 880
3 o+ [8 a6 z- a2 Y+ v' ^* p; n& s2 {7 c7 S3 i; [% Z
- % i0 G- z) Q: C4 F
5 N+ Z0 O. ]& ]% V7 s) k6 ]6 U当前子进程 id: 5176% R# \( \5 x2 ^) ^% r
! Q5 x, R+ g! H" d) X! j6 L* D
- : U8 |# s# d5 v( f
6 K3 t6 U5 T- h$ V; a; [
--------------
: }) N, m5 D1 a8 W
& P0 u+ K' P) ^$ I" h
5 @! k* G6 `( ~! B& Z
- Q6 @ \' Q) S& {# m9 C这里是 Process-12 w) v2 d K) Y: n3 E. k$ [, Y
- _# @# H+ a# e" {! D$ w4 A- 3 O; Q, g, s/ m! C& n
* C8 s$ p) V" X模块名称: __mp_main__) O. S0 i0 x" d! y7 x* k3 T1 M
+ D1 S, H- B. R/ y* _0 |
% }% U. F: K% m8 f2 h' a" L
. a, D/ w& K, N& e3 z父进程 id: 880
4 f* X6 ]- b, j9 C# `4 x$ @
. @7 a. k; p! R* Q) b; g' z; |- d2 Q; S1 }5 I4 d
6 h" u) d6 `5 P1 P) K
当前子进程 id: 5380
7 c; N( n* {! K3 [) t$ P
6 \# c- f- o7 e9 V# e8 c4 X
, J, e4 f7 T% b- j8 h# ]' p
! K" p& w* k0 [+ [--------------3 m) F! n r8 z& b
& P1 ~4 L% Z1 c
0 C3 {) i2 b& |5 \$ {8 e
; L* f& W2 I1 ]& ?6 F这里是 Process-5: Z$ c- t7 V9 J; W
g% Y9 k3 W+ W: ~& p
6 \2 j8 ]5 Q% Y- ]+ I1 F
! {& r. k' n: p3 t模块名称: __mp_main__7 B7 B( {7 p+ O6 v
; I. j) R* V# o! S3 h p: y
9 _7 F* w- a5 \# M5 F" R& h* p( B# s' a' d) g
父进程 id: 880
5 Q! j$ n" |! ?: N" D$ q, r4 R# t6 [( b3 K% ~% J' _
$ v3 ?1 ]/ b7 t7 W! Y X
- g* H$ t ]) [& v当前子进程 id: 3520
' E W0 \ r, d6 |4 F; ?8 ~8 r
' \* c2 k& y/ Y) A) _$ O L
# o1 j! S/ j) Q6 J
: x% v9 J: m0 @5 X( M+ ~! s) j--------------
0 Z4 x; J7 s4 I" l: e) ~2 K
1 ^, v6 r5 G* C+ l% L F: o) C
: g) Z% F' t" `& q0 l8 w 1. 进程间的数据共享在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。 创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。 下面我们尝试用一个全局列表来实现进程间的数据共享:
: H& F) h+ A. P, S; @5 m2 o- M
2 ?& ] d& L+ i8 Jfrom multiprocessing import Process v3 C7 C' C" D4 U4 e
+ T$ ~/ p* P" K; ^/ a
- 3 V7 b& ^( G8 ?0 ` F) @
6 u) m2 f+ `" Z2 U, W& K
% X5 D2 z; b1 g3 O5 n+ C2 w
& ~, K6 n% i2 h9 A& W1 ?; x
7 j' M! P: T" ^/ }$ ^ k' ~3 e/ t/ `
lis = []
; R+ J5 ]# @3 A& @7 Y. I( P5 M$ X' l% f8 ]6 q# A
7 j+ Y; U& g8 f8 ?+ H: \
X6 K ?; F3 G6 [7 j0 X( ?# Z9 U% H7 E
' c0 a9 }1 v2 U6 S2 \- T2 q: D- h, }0 y: u. @
+ v2 C1 C" i/ l t- N
def foo(i):& ]+ S: j$ ?: s7 ^9 r
4 s( \# ^4 q* w o& f* Y - ( n$ i3 }3 d% c, ?# |
% G( I, j \( v2 s4 I- X3 g lis.append(i)
' c: o: w: T4 R# O- ?
/ H- W8 `* ]; l) V% l
7 m: _5 Z* @; E% T0 l& \- i
1 \' y" D5 z% v! g9 y2 @ print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis))
' b: f ]7 k$ B. U2 k* x3 a& C* V* \% f ]
- % V) d; d0 P0 v; U. C: F. e
0 o8 O' g# m$ K) h; H
+ |3 y* S z8 e5 B3 Z$ X4 m+ d1 x3 ~6 p5 I" t! L" H. T
- 6 g7 A) r( Q8 x/ Q) A7 A
2 h3 V7 g. j* m6 K) A
if __name__ == '__main__':
2 P' n$ z9 k1 Z" k& O% h: ]& X8 @1 R' Z) B( a
- " }% m. ~0 o& d( }
* P# _1 d4 M! ]7 p3 s for i in range(5):. K+ W" m1 v7 M. v! u
7 o [& J( Q( Z' |+ h/ {
! `! ]! @. m1 u+ a( H1 t
' a2 t5 k4 {, p! ^, i p = Process(target=foo, args=(i,))
% R N( @# E9 S% B
) W0 I1 e+ M Z0 n0 B7 m
3 c" N7 M8 R# n8 M' n
! V" @' |$ k0 v' B5 a. ~% {) h p.start()/ m( o7 C. h5 ~% Q: E
8 y0 Q, V" q$ k! J/ C5 ]( O
) _, B% `+ u$ z. [% J4 E
3 \! @+ b4 l) u* K1 | print("The end of list_1:", lis), Z$ T- Z4 d2 }2 j
. D0 ^! _) p% G' n4 P6 I
9 h' a$ W$ q$ i9 }) V
运行结果: - * l* }( k5 l/ \ Z; p, d
7 D6 M: m% P- i2 WThe end of list_1: []
1 G' B! d8 f) L+ c/ N6 j, e: s& G
- 1 v( O* b) |8 ~5 H
7 P5 ^% _$ O" J0 oThis is Process 2 and lis is [2] and lis.address is 403567444 |) A- G9 ]# i
2 } @9 x$ L" ]4 @' o2 _
- ) W* A; J5 q$ i! D+ d
3 R* l! B! _4 D
This is Process 1 and lis is [1] and lis.address is 40291208. k( V% p/ T, D6 R
- @* M" q% g: G& w+ A! j
- + T8 D) S# V: F- P$ x7 z+ l1 s" s
0 {% N) Q1 R4 @. NThis is Process 0 and lis is [0] and lis.address is 40291208! S6 N) B& W8 ?/ f: f& N6 ~ p
4 _0 |- u, s" X5 f$ b% |2 k" z
: l6 g2 y' N: N% a; p
3 L; h2 o; N) a& O% LThis is Process 3 and lis is [3] and lis.address is 402256720 G' P5 X: `5 } x1 Z: j, [
3 {1 _- {/ |7 r. y0 K% M' [- ! U y8 O) C, W# E+ g
) @( i+ J" N+ p' H' Y: N& YThis is Process 4 and lis is [4] and lis.address is 40291208( M$ a d( |) i9 o
" j2 B7 T* W: D& l6 W& J
/ L' p% U/ A$ n
可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。 想要在进程之间进行数据共享可以使用Queues、Array和Manager这三个multiprocess模块提供的类。 1.1 使用Array共享数据对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系:
# M7 ~: d3 H' s" I* C- r
# h. S4 ~1 M7 c; g+ h'c': ctypes.c_char, 'u': ctypes.c_wchar,5 d d: y* b, \1 x
6 G$ u$ [) {9 m# V- ~3 p; A5 ?% ?- . X+ `1 k9 m- s" {% Q/ D$ |
9 x. Z+ R) w0 C3 c'b': ctypes.c_byte, 'B': ctypes.c_ubyte,% e0 C6 z: D' Q" d
6 }" Q j$ A1 i# \( x
) ~) ~4 D) n$ }$ K% U2 v- v$ `$ F4 w( R' B6 J
'h': ctypes.c_short, 'H': ctypes.c_ushort,5 R# e4 `; X, l+ m g
0 `. g4 {8 q4 W& s g
# [0 b, @7 h/ L) R' Y0 h1 w9 I& _2 I7 t
'i': ctypes.c_int, 'I': ctypes.c_uint,5 H1 q& ]6 y- }
9 y0 b& p+ n/ g, B, h$ W
2 g- S1 ?1 b4 s( L7 W8 e- w7 O: _7 T( C. r5 N
'l': ctypes.c_long, 'L': ctypes.c_ulong,
* }& V3 L7 z2 {3 A, x
( m) E; B+ Y& f' H* X6 i1 y
$ A, d: b+ P* m3 q0 i
7 t. G2 S" v4 j: v* @* F* a'f': ctypes.c_float, 'd': ctypes.c_double
3 M( Y: g- s) r1 t: k" d% m+ M) c
: N: y3 j, W2 u# H' J
* K' F# X5 R& w% l" d. r5 K
看下面的例子:
# \6 ^8 B5 R7 m4 D3 s4 `( d( O" A5 @
W. \1 _2 h+ a7 N( pfrom multiprocessing import Process* z6 A; E4 Z) L. V- k. I
# w8 ?7 V+ Y7 |2 m* v. R5 Z# [7 N+ H( g$ T
- 4 g1 Z1 m& l6 u/ X" O& @0 A5 O+ K
5 l" R+ q1 |) ]" J) Xfrom multiprocessing import Array
+ ~/ V% u" a+ |! G
' m5 r3 `7 C) h- t% B( r
+ U" O+ W) d; `2 @0 t; @8 J; B1 }9 M% }. J. M7 G
9 R( A; A/ m7 `& t& _
! v2 u& m8 Z" \
: i# ^: |3 W, d% J( e* O+ P& E. m! x8 ~! C
def func(i,temp):6 @) @7 \7 X6 o4 I* A9 Z% ]
3 n I( |: W# C# m
- 4 F& ~8 n: P, s0 N7 k2 J& t
( E6 q$ c1 G/ F6 S L% B9 a7 k* j temp[0] += 1008 T; M' r X# e1 q/ B3 ?+ ^# \
8 D( g2 p8 I8 k0 @
+ k/ i6 ~+ N( @" h7 L% G5 A+ m& V# Q c) h) d& c% M' s- ^) |' V
print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0])1 [/ b8 K, Q" v( D E
8 e1 Q/ p, x- V( R+ U% G3 ~. u/ Q8 l
- ; [$ e! Z$ |& R: n/ V5 O
6 l& D- R. |8 e [4 n- ?
W' s. m4 `; }& T
; ]0 o* K. _/ G# G
- 8 g% n/ u; ]* C- ^# ?. F, G
' f) r2 E7 U, uif __name__ == '__main__':
" U3 Y% s3 E' h% O! k& K' P
! ~! z6 p D, Z- {3 i y% u0 G4 _6 t - 7 C) |% g- c, L& S
( a8 H5 y8 o! u- @' N) L
temp = Array('i', [1, 2, 3, 4])
4 D6 [& @- j, u3 Z: k. ?+ z
( T J; M% L# E* v2 T! Y! t - 4 b T( C4 L/ e! j; N5 M M
7 D* R+ C' @- d0 `1 b2 i for i in range(10):) i9 ~) t. i2 `0 H9 q5 T! o4 u
3 R& y5 {) x: }) V
( U$ s( N* N& S( b+ P! \ n6 ~/ y' N0 A7 ^7 ^' V( s
p = Process(target=func, args=(i, temp))% x N3 _3 E" m7 p' A8 E
* }* @- F0 l' [3 P( e/ U% m
- 5 s% j/ _, a. N5 [6 m N
; g4 x! M9 W" i6 w% r) D p.start()
. D$ t: k7 X- @& F5 ] v, g$ ?+ d1 g1 F+ b \. J+ O5 U
1 P6 Y( B% U3 s
运行结果: - : j3 ~: S* r, D, Q, J; m' o
8 l* N; |8 [; g2 E% ]8 P+ P! R
进程2 修改数组第一个元素后-----> 101/ x& O4 U* |! g1 |9 G' i
' v, p2 W! r! p6 ?1 G+ S
- 1 J- ~' C3 ^4 l9 H3 E' t
" n: \2 E D2 x7 |" }进程4 修改数组第一个元素后-----> 201
) `8 M6 d) m" V8 [; s; Y
5 v2 `( p- H& e' P- `* S - 9 A+ _% D% m- i/ p% B2 Q
) c( e6 T( Q$ ]' x4 b1 j, d进程5 修改数组第一个元素后-----> 301
% Q& {- C. V j$ Z9 L: o
. F1 u4 J+ y% e1 ~) }2 T - + q8 O* Y0 s* \! p/ K. N% V! _
% w4 D, Y/ @, K$ Y$ T
进程3 修改数组第一个元素后-----> 401
" F2 Y8 \. c/ T6 b) D, X% X
: F' x6 {4 {" N, N X
6 b4 U/ e# l! I6 ^. P: k7 t7 }1 \6 ?( f b7 X" |# F; \
进程1 修改数组第一个元素后-----> 501 K# i! D( `# w9 g: \! K
) T# @2 F$ U, G! x( B9 p/ z$ A# h
- & l @ z& E! s/ l, v' N8 z& t; y3 K$ i
! h. f9 ]+ o) _4 `1 \) C
进程6 修改数组第一个元素后-----> 6013 N9 p- z& O+ ~$ B* i! ~
" p7 P+ x0 q6 f7 m - : z i* W5 V* ~4 T. j& w
9 O; _' } B" ]* \4 c) e, q
进程9 修改数组第一个元素后-----> 701
* b7 u6 n& i8 _3 c
$ i1 @5 {7 m, u- F# E4 @ - 4 x3 W# m- [) a
6 ^7 j* H! x2 _. z) z! C( @! M7 E进程8 修改数组第一个元素后-----> 801! O" B" V$ F$ o
* H! @' i) q. y* z0 G3 Q - 8 @9 n$ d" P# L
9 |6 e% d; G" v; H
进程0 修改数组第一个元素后-----> 901
: R" c) L: p8 j+ Q: R# Y
' }+ K# w$ V2 C8 f8 c9 n- T - 7 G; e& U; Q" j
5 K8 A) K9 n2 N8 ?- Q
进程7 修改数组第一个元素后-----> 1001) O* O+ o; ^& D, F& H, v
, v. c( ~0 I% O# ~6 S1 q# z
$ B: b1 p5 \$ F" J4 e
1.2 使用Manager共享数据通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。 - , U8 Y9 g% k- i& ?9 m# I4 B+ }8 ?9 K$ }
5 r, C5 n8 g7 o( n5 u! \* M
from multiprocessing import Process
% c7 l. q, I7 p2 P3 M8 B
[( n/ R7 e& P3 O ^ - % f" {# F! p; f4 Q
) p/ U7 L! O9 A9 }
from multiprocessing import Manager
) f/ E7 K& T" W: q& y( M+ B# q! ^. h: A+ p' X' Y0 D
- ; i2 u$ T$ x! m' \/ ^
$ Z) j; Y: N+ _
% }9 A( m4 h5 }2 k5 w# D/ K
5 w- \0 r9 P1 y - 3 j s$ i. d& X1 ?% a4 H, I0 Q! V8 e
5 v: D% z9 R. _( N" x1 I! a) }
def func(i, dic):
) O& j) o3 l* Z$ z. b6 j D. L2 ?+ ]7 R
4 B b# z5 @- c' V) ]/ y
# ~# X2 N; h7 O' p" |) L4 v dic["num"] = 100+i' Q$ I5 |7 W% Q* X
( I8 L! I1 G4 G0 I4 w& l
t- u( Y' E& v8 i. m. b7 C z4 j/ a1 K; F( b v* Q
print(dic.items()) d2 h# v! G* u$ D6 B3 t
, [9 O5 X5 u& C6 g" B& u2 S- a4 O# j! @8 b0 }
) t6 x& u( i; t& _; o3 K' G+ U; j* x) O* ]2 c+ m1 S+ G
% h6 T8 b' j7 C8 I( A
- 7 I! L+ A5 ?$ }) p7 z
5 u4 M% F5 K# n8 gif __name__ == '__main__':6 N7 ?: @& u7 C/ f3 A {
6 L7 Y0 `: b3 e& w2 t
! e! J t( H3 }( Z8 Q! E6 @1 E4 Y, ^
dic = Manager().dict()' q: d9 q+ H* S e% O9 T2 a% s
9 \' X' P+ @2 b7 q+ N
- b B" [) y; d; z* H
1 A. J3 M6 p4 h c- ?' `
for i in range(10):
( a3 T( z5 N2 u. V$ A: d8 |
0 L+ f1 H) ~1 _0 F* y - - c) b# L( O' y( G# w) h- `
. ]; @2 ~7 }( M- u3 m: x8 t, ^
p = Process(target=func, args=(i, dic))) ]( ~& K" `6 M+ x4 E0 e$ V: f
; B& l" ^. x6 U$ }" h
- * Z# k, o! m2 P) n! G- I: G
9 b0 D4 `, u4 @0 @% q p.start()
( s7 \' |1 n% H* n4 @: k) `( o, ?) B7 i9 `3 G$ O
- ( i0 @$ V7 A% J
: ~1 J b) _& _& E" b: c; V% T p.join()+ W5 [% j) @0 v! l5 @7 K; n8 P2 L+ f
* t2 r: r' P; X( Z9 [) |; V
* {: Q" O3 O4 q- U- B! {6 w
运行结果: - # m$ d& W& s! e, K: n+ U! Q
' {! H3 G _9 H[('num', 100)]6 f* R: a7 f/ A1 S1 R3 l
# Y# k+ Z7 X6 j# w' G9 B
- 2 d( N5 k X( _6 U/ W
& A9 a1 d5 W z( U8 P; u[('num', 101)]
! Q0 Z' f$ Y' ~6 C8 J$ b) B" ^- |/ V# O, y3 m
) F. g( Z, K- T
, ?- E2 g9 E2 c) C[('num', 102)]
/ Q6 A- |, a# o% }% F9 O
3 c; j3 Q& A; R5 d- 1 ]. H2 V# g7 b8 o
% H x3 t) R* [6 E% V. z+ S
[('num', 103)]! L9 v3 N' g* T2 Y, N
: P! Y6 X6 x9 [1 @/ O
, i3 I ?7 Y: g4 l3 B6 w) y$ X6 K2 j3 I
[('num', 104)]
3 A: H6 u/ U' J% t: _4 [: a* B" n% E, ]4 V
+ G6 v( A' [, a% @. R( S5 ~7 L0 _; n! s; u0 M4 \) O
[('num', 105)]: p" c8 m$ T' U: t7 ^
) B6 M# i$ M9 d0 J- J, }% b6 ~- 9 `! u, D1 A6 J1 }
, `; C" P5 o. r; j. h- i[('num', 106)]
5 e) _) o- ] L# Q
* o6 U" ^3 q& `" m+ b2 a1 n - ' \: i5 {5 K6 B: c
) \, U6 ?! G/ u4 ?) `[('num', 107)]; s5 l0 j- {- G3 V" d0 \
3 d; h$ L1 V; m+ C; H; e
6 d. \. U: O. n( q- f+ J4 X" g H4 A
[('num', 108)]
# r# q0 d5 B! _% N# X' `' ~
9 f& \1 P% T& L9 R- , z6 M2 h+ i x) C. B7 e$ E
" F3 a. j3 e( ?2 {0 o* i+ l
[('num', 109)]+ B* q. V: m* N1 J
2 ?( l K! Z8 x# Y( ?% T
2 q+ Y4 x, R0 \3 o
1.3 使用queues的Queue类共享数据multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示: - 0 O6 K5 M% \/ C: I6 h
6 @0 w# t% b+ h# q0 j( a$ @9 |8 Bimport multiprocessing0 p" @/ H, n7 I+ |2 t+ f$ W6 Q9 Y
+ S! t- L% x. Y8 z: x - : U. j; Y- ?% y+ ]! u( `' ?
% \7 V$ e0 F5 @1 w% l5 Ifrom multiprocessing import Process
! ?3 [1 |) M+ j i
- L) d# G$ k6 } [. V
6 T; }. c5 h! \6 R, b" f0 [+ [6 n& o) |3 \- Y# u. r A
from multiprocessing import queues
% N x3 X# V" j; `: d; o# x: a! g0 _' u5 t* ^3 |+ \8 `
?% ~! X# {2 N! O0 e+ g
6 L0 B% }0 o9 `! o3 b; b- j
' J# a9 { D6 b) Q8 C3 p3 V
5 I& M6 G. u1 N( ?5 f1 d+ H
1 s7 r H; j2 q' C0 N: e( {3 l; \. w4 a+ w& L
def func(i, q):! z8 t5 t' i8 I$ r
5 r, H+ U( z; Z! Y! R9 G
- ; D5 }) C3 Q5 V# ^2 [ w( O
: ?; r# Y6 ^3 o ret = q.get()
8 \: I" c( I8 U0 C4 D- s4 h* _( l) A+ P
- \ `4 k- a! }: a
, Y# {4 H0 W) ]* e( z" m
print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i))
* S* o. L& q q. j3 d/ c& F! A$ C1 D V4 I+ @/ T4 y1 |
- 6 W: l! z6 v6 P$ ?* {' d. S) v3 J
$ ~# }9 k( s2 {, \% Z+ S4 j
q.put(i). `0 e# F- h2 ~" |* Z
! ~" X* R3 y& L. r
, X/ U! K: X. E9 e t6 L$ ~& y+ [) u+ \( {) Q- F7 Z
6 g( ^9 V. ~; l% N* v4 l
" |- z3 U( X; }, k0 ]- ( C9 Q& x) [8 i) Q' M: W& T
/ u/ o: k3 l' M1 sif __name__ == "__main__":" {& f7 L( W% f! d: A# ^
+ C/ s2 t8 ^* [; T& @
6 s9 t" ~0 R& k
9 o4 h V' }! L+ n& G# e lis = queues.Queue(20, ctx=multiprocessing)& x, ?# c P: x# X
/ ^1 G2 i4 h% K# w+ n
: o8 c3 [ M) {9 }) A- e4 y/ H; c5 r
6 f* u4 Q: H5 R r0 u+ d lis.put(0), Q' c; E; B0 I! o
u: \5 P; W9 m# X o, M
- # ]. z- I! N: M
$ ]% u1 C" }# f' O @2 I6 F
for i in range(10):, n4 E) x1 l+ Y
8 }- f0 x/ q" o+ q) L. @8 y" u
6 A: H5 |7 X* H4 F- m/ x6 i( }) q* \, g1 p n6 `: f+ O8 c
p = Process(target=func, args=(i, lis,))+ Y' n+ _/ \& K
3 D% k/ H: t7 I- s
- ! i' B) [8 W! B) a
& u% D: f5 x" O& e: P+ Z. ~, P
p.start()- T+ |' }2 m/ P" o
! l" {+ \' p; T$ S5 ?+ W& A4 ^6 K7 a9 r [" v6 C
运行结果: - 9 e$ n4 M( S- {/ K6 @% m: }, B
0 A1 ~6 X: X7 C- o6 l* T! ~7 r9 D进程1从队列里获取了一个0,然后又向队列里放入了一个1# V: o- [) K( F) i! u( q
* y' d c9 {! `
' B0 D3 E5 f. ^! Y* ?) l' N: f1 ~: N& w( T& ?6 O# P8 x! Y
进程4从队列里获取了一个1,然后又向队列里放入了一个4* u, B% D3 A, y: V- X5 J
: b9 L1 H# G7 {% g6 y2 R2 R. T
( h) i- c, n8 W) R: n
. O/ X) ~0 s m% r$ r进程2从队列里获取了一个4,然后又向队列里放入了一个2; b: t+ z. }$ o. b% C* H" W1 D$ P
, a2 C) m P( \& ^; }
- F9 _4 H6 }. I7 x: U. i& f% K2 l; n3 s4 P+ S% V
进程6从队列里获取了一个2,然后又向队列里放入了一个6
/ B" ]! d3 t+ j& f3 N$ h9 w" e8 C' c3 F' i4 x# A" G, e
: ~) n$ d8 S; F! a
( \. n4 `5 U& N' E0 Y! J进程0从队列里获取了一个6,然后又向队列里放入了一个0- l3 p# R% V; y' j
& `+ P. g8 o) H% g8 Z! v$ D
* I1 H& B2 B! |9 ]! k+ T: ~- h8 X- ]
进程5从队列里获取了一个0,然后又向队列里放入了一个54 P, \; ~- U- _8 W1 v- O" t+ K
% y& x6 ~0 ]9 u, N1 {- g- A1 S
0 E( a% O3 H2 L7 i K3 L$ p) _5 I' u2 Y+ {. E( x7 Q$ ~* M
进程9从队列里获取了一个5,然后又向队列里放入了一个9* L& z8 a4 w( D- Y
7 [" O8 m" P6 A8 c7 B& w
6 t& T; p u( |; R j) D% k4 e ?$ c @
进程7从队列里获取了一个9,然后又向队列里放入了一个7
& P1 }) M/ {) I( b B+ L
6 x* S! D6 T% m2 f2 T) N- ; }4 r% i" B: G1 q) t# u' l
0 ?+ N1 j' H9 O& [$ g* i; `进程3从队列里获取了一个7,然后又向队列里放入了一个3; n% I0 D) L; y, v/ T7 \5 w9 `
. o0 R9 c: h+ b |1 U
* q% q* H+ c+ S( n
5 G3 t5 v% y P进程8从队列里获取了一个3,然后又向队列里放入了一个8
9 Z |, T3 u+ G& C0 R- d- L& Y+ |" j- T6 z
) }+ J* |- A* `) c0 b# `, j0 b- A- k
关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。 2. 进程锁为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的,这一点非常友好!
6 s1 A& P' y8 t. V7 t1 Q
6 u2 n$ Z: O0 S' ffrom multiprocessing import Process
| O$ r; O8 N+ _& L7 `# _: n/ W( B! I3 d3 T. ?6 R+ r
- ! c9 J5 G+ W, Z( w' x' H
8 Z: w: c' t3 P; dfrom multiprocessing import Array# C; {8 Y$ D; }/ d! G( j
8 T& C9 G7 I, v9 L$ Y
- 6 D. w+ d. q1 f {7 I' m
" x ]: E0 ]$ A0 ]! R. z
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
) l* ]9 a: A3 ^# u9 f- @
" O1 i! s0 r3 c N, {8 k b
9 C4 n' v: \/ A- x6 h4 W8 Z
3 W0 t, B! |. W9 Z% J8 {7 I% n# ?import time
* S0 U, `8 V% r$ h1 H" L
/ a) e; S9 w+ k7 k
" K$ v+ a2 P5 D) Y: W! t) w9 a1 I9 L L! V$ E7 [3 o
( U3 w3 c8 U3 w" O
4 Z" Y5 T6 |, W ]4 d( [, G
3 b3 P$ L! s& y" x' q
0 w. \. X/ b1 y1 P8 q3 ]def func(i,lis,lc):' M5 j, C9 i0 n) |$ B& m
. R) @/ U1 C# O! g6 X; r
6 }& }- B0 V4 E' {- V" j1 [& a+ ?) T4 f7 \' @2 C, s0 {
lc.acquire()
4 J0 R( F3 U* m% q% s- k+ q" V/ E! }4 g; y% U+ L: I
- S, B! W# E5 {. l3 g8 \/ M
+ ^5 `+ x$ h; O3 |, N lis[0] = lis[0] - 1
- t: e( o* {" K0 ]' e- p8 [
3 K2 n& [, S, {& U- e' c
: t$ y# |+ d1 p% [& r$ @" U
) K' R, w7 O2 }& O2 r time.sleep(1)
8 e+ x8 ^# T6 A4 y6 Q: t% m) t) F+ y1 y/ h8 o+ X- j7 L4 K3 b
- 5 b% }, N- q$ V
4 T% I# m# v* U# w8 K# t. e. T print('say hi', lis[0])+ o3 |* z6 P; L" c# K8 t3 U
* _8 g5 f: E4 f: ~
: O* W J+ `; c) e- ?" ` D, p) i4 u+ c/ }. ~. L% x8 `* o
lc.release()
: Q4 R! O ]# p5 p& T) O- D- h
# k6 p. k8 m0 [) T
: |2 }3 ?+ Z# m8 F( s
X) T, y1 b5 _6 Y$ h+ q% J5 d0 o; v/ }# J2 p
% G' [7 X; a3 K& [& h& A: n- 7 q/ D% a, ]+ o1 ]% C
. ~( K! w7 f8 i" P& W/ ^% |
if __name__ == "__main__":- c/ q, W1 Q! |& }5 ~* ~
9 A# d1 k- I; z: c/ r% ^" Y* `
" C. { Z5 i. @8 K. s+ O
e# `/ q5 ~* y* m5 J, t array = Array('i', 1): N7 J7 }5 O2 H8 I4 W8 U6 L0 t
# a6 |& O% z$ m9 y& X
- , e& ]* k9 o, Z3 F
1 }. O* g5 y0 _7 M2 c3 ^" i% l
array[0] = 10# e, b1 q1 W7 L
2 t, O: P _- _* a - ( `2 O6 N% y! C& o1 u7 |
3 l( H: r* @4 G
lock = RLock()
3 L2 n6 a) t+ e# n. I' j% a' J- j# a# h
- 3 [% k; o2 s+ E; L6 b
+ b) H, D: v8 j! @) i( p
for i in range(10):
2 J* ? ?& r4 j* K( o2 d6 y1 z0 s7 {& _7 r
- 8 c# M6 }# }, S
: n* }* v- ]7 V0 ?
p = Process(target=func, args=(i, array, lock))5 x2 z% e4 e4 q5 L Y6 [6 L
/ x' R5 H3 S+ }, T
- + ]6 |4 o3 S) s$ R0 ~
0 b5 S. I; s$ s! |
p.start()+ K) m* B' z; k7 ]
" J& O0 X) h5 f4 `) N
$ s. ^; k! D7 B5 M& g7 m" U
运行结果: - " t6 @4 [& c8 j" X; [6 s3 p2 l
; [/ i C8 O! l* nsay hi 9, L0 @/ K2 B5 J6 E0 `% p6 c" R
7 s* E# U y( @$ v7 T - & n5 z9 X; O% G1 l7 w; a
( E$ V, G5 F* J4 b3 Y# o# dsay hi 8, A# m5 S0 g' i* S' Y, k, v# J
" b( e& Q& A8 B/ m+ ?* e - ' @2 k' [. r1 l9 A: u' o: _, M
/ R% F) E( R) K4 ~0 U9 Nsay hi 75 y e$ I8 l, z: z+ B1 @
2 K! n' K2 M$ K. O2 Z- d' e$ ?! t! z
+ G% I5 T; i' I: z9 C* ?
, ]5 p$ L, t# j7 \+ J) @say hi 6
4 O1 p( H4 L8 \( O' L: |
$ p/ l: L, H8 j0 c2 y( f- / A( s4 ]/ v3 n8 C: X7 @4 q
. r) X/ ~% s% `/ y4 j: \7 \; M: zsay hi 55 z7 d# }$ D% R) o$ `5 L0 \
; {4 r$ I; ^; e/ W6 r& M7 Z, @
0 v+ I- F7 g; ]% R/ M1 Y$ i4 A2 W q$ X7 u! a! m, P
say hi 4
+ y d9 r* E) Y' W! E% M) y4 r
2 a7 q% o* `& }1 E8 H
- {+ g5 R+ b) i& b; z) h# o% F3 V/ ?: D
say hi 39 v3 U3 W" B/ s
! G3 L% i$ e7 D# l% `7 P: H- |
" I+ d) ?2 ~) q0 S
# P" Y8 [; E9 I+ {7 G1 }say hi 2
) b; I: u% `) q$ z/ S( W
+ `4 o) g! A; \; P3 c- ; N( K* R* p/ ?
9 @: E& z: ^/ t* l2 M' ?/ X& b0 Qsay hi 1
$ Q, O9 l4 {* l$ ^" M! m" N" C3 X$ I2 R! \
- 3 p0 N* d' V( O* y! g$ x- J3 ?+ H
" X% w' ] W- p3 L
say hi 0
. p3 A1 b3 s- o4 A \, D( ?- m
. @% |6 `- L$ I3 ]& V' c7 ^, M* z! k. j0 q' V' |) _5 x* O
3. 进程池Pool类进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。 比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。 进程池中常用的方法: - 5 d3 |% X6 m% i
! t+ P& @6 m1 U; v3 y! p2 d/ u
from multiprocessing import Pool/ b7 C+ U. ^8 T9 [$ {
# F/ _1 H4 B& R$ b1 i9 v( @& K& K - ) W+ T) x6 r v. R* X
/ L( d* ~0 ~8 ]4 B" a
import time: G9 M2 v: k! i7 c+ x5 \
; _; ]0 S7 g# u# ^; F- ^6 i - 6 e5 C1 _4 N" F! L6 D& m
6 C' b) ^/ Z# l: S% R5 x
1 D# _) V. Q5 P% C; ?* o! s5 ]3 \
5 N$ `( u0 {. X3 m1 ~( G2 m
, r2 P7 `+ Y9 m5 i" U1 M _ y: W$ M5 l! O( Z6 A! J9 u/ t
def func(args):
- J3 `( A4 W# n
/ M. @/ N% B6 ]9 G1 R- 2 ]* \ n* c' ?% }
4 t1 t: H" E* s$ \* Z0 D time.sleep(1)
: p* w( x- f5 w# w p, ]3 e
7 y0 T8 V# q' |! p; C4 ?: f! G - % H; l& `2 ]6 J
# [5 h: i4 c W: S: S: z, F
print("正在执行进程 ", args)
9 S/ w$ p$ D0 ?
" D" @) q3 F% ]! A, s7 g% R1 i
{. J2 C- f3 L" o
0 O5 T0 n/ O9 d1 \" \- W5 p; `& A4 {
$ e3 S7 Y* x# R4 ~/ \ c" w+ A' `- # G+ k& Y7 Z4 {7 m* [$ D
/ }2 y6 `: n# N! C+ }( vif __name__ == '__main__':
: M& J2 ]7 {7 Q% ?
! H5 E5 }" q+ R6 I2 R( x
8 a! L9 j& A! e# x- p: W4 X! O, P5 F+ x9 g* z* I [/ ~6 y9 t7 n
, B( S& g. u+ m8 Z# A" q9 \
# ^8 E& i; z& L2 g. o- g4 _* b# u- # S3 N2 R6 W: I
G% j2 e! x5 g& p v p = Pool(5) # 创建一个包含5个进程的进程池
9 O! P6 A/ K' i ?2 ~5 p4 r, d1 e! n9 C* O5 V1 x( O+ b
/ B( [, `7 v8 [- b; E6 L. ?" |- m0 i4 f/ T0 s1 k" J
$ h/ A) E) ?8 ]( `9 \$ _' F8 M
( \7 C. q' |3 E
: C+ q( Z' [9 G( f& q& W: Y) m3 g5 C4 `* v2 S( l- I7 Y; ?: ?
for i in range(30):( \ O# y7 B, [8 |
6 R6 J% V% _7 c4 G+ L
% K9 \1 Q9 B. R% n- S3 U4 V4 J8 V. Y0 x- ~' }6 x
p.apply_async(func=func, args=(i,))
- E7 o2 @& ?( b' J0 ]4 X& l" E4 r) ^6 k
6 g% b8 T8 m4 q3 k' u8 n) a- l4 [) V
9 D! h( B# T" }0 G4 N6 A- y
( |+ ]) q: b* ? B( Q( P
( e$ P8 x- } N: \
/ T5 J; I0 ~3 m' y
# c% @" u& O: B4 U% @% z* B- S5 T( P' V" d p.close() # 等子进程执行完毕后关闭进程池
1 A3 @& }+ l; b- x5 x3 g/ Y1 W/ A
! ^5 |, b c' L: Y7 P/ D. `" e- / g% r. h& o [1 a( [& l# K& M
3 _6 a ?) D9 D5 w0 s" |
# time.sleep(2)) l$ Q( W* U* v, I- Z+ s
3 z+ ], X, c) k2 T8 _- ? - . b- U* ~4 G7 F& T5 k* F( O; I$ u
+ D, p4 ]5 s/ W( c$ Q
# p.terminate() # 立刻关闭进程池) u% S- O+ ^( Q% D& D$ F* z4 ]( \! G7 \$ ^
' W Q6 `4 P9 n' l" f' A/ U3 Y
: K0 T& h& Y) h. A1 E- Q) F' p( P/ {/ N& v# C# d
p.join()1 S! ?9 [/ R8 D8 k
+ f( b4 \5 C8 L* r- }
" D9 P& A( Z4 v0 ]% ^) g: i
* X% q0 M/ F; h4 r请继续关注我 5 a4 N M% U! V* J, ?
( H9 T7 y$ o0 x. U
|