$ }# V; r$ `2 O% S( |
爬虫(七十)多进程multiprocess(六十一)# c- u& V: z, Q, N$ O
Python中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。 另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。 下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。 - - ^7 F' V; L s- q. H( t
) X0 s% ~$ f& z& F+ Vimport os
3 q1 }! n) A' a ]8 f5 W4 N+ L* T% {6 ~- i% Q
1 T% `, D8 V9 J+ z2 h) J" G/ ~$ j7 A) `, X0 H$ E: Z" c6 |
import multiprocessing' u e" S9 T3 v7 i3 T7 W Z: }
6 z2 q4 K' L. j2 c4 B p
- / c; x& A8 e5 o1 x* l0 K: @
) p+ J5 z8 Q0 g+ B
4 @1 h H# B" ^9 u) r3 e
* S& Y, O' W8 X& {6 \6 D5 g
# J+ j* M# n, x0 _3 E5 z, ^. ~, g4 C; d. B4 Y! k3 o1 Y6 L: x8 V
def foo(i):/ k* n+ o/ Q' L2 j
8 r; c# O7 ?/ L
4 C# E, q7 U' E- E0 o3 [8 y
! e% {! ^# X# L9 Y: h # 同样的参数传递方法- O5 C5 A+ Z9 k! k
# U. \: e) N- R( Q( Q6 u- 8 m$ i3 K* I. b6 u6 Z! m
; d9 Z9 a% v Z: x3 y' S
print("这里是 ", multiprocessing.current_process().name): P5 I0 z$ x5 l3 R7 L$ k% r, J
% _ s0 _' L# T4 p! {7 B - 0 a: ?7 |6 H5 O/ I
' Z3 N) x8 `( _$ t6 y1 |6 Y; x print('模块名称:', __name__)" F2 Q) g; N `/ [: F, G
& t7 Q6 W! H6 g, O5 \' |
- 7 Z; P$ J+ t% S2 \+ n0 z+ G
; h2 w2 n" R0 \6 S, S2 s+ G3 c
print('父进程 id:', os.getppid()) # 获取父进程id. [6 H$ T) m5 Y; R' F$ D
# a$ P( e, K) P' K" ~ - 3 }) [5 X7 Y \+ z- d7 M
% y5 c# w- n! o" i
print('当前子进程 id:', os.getpid()) # 获取自己的进程id
9 O; [1 L5 G9 e( w' O5 z6 X9 s0 X3 |3 J+ d
5 G+ L% E" S+ U
2 ~5 Q7 Y5 ]6 @- | \; f3 Z0 q
5 Y1 d4 g. u; q# W- B print('------------------------'), a* x, L. Y* |
: G, l& E* _; B# R
) p1 }: v) {, w' G$ c" r* B8 p# z" k1 ~9 G5 X4 ?: ~; s
* _0 M' W" x1 ^. V$ I, T
$ j. {& ]/ S) H1 ~* w
- 6 o3 g8 j T$ u* T( e/ g
* x+ d0 P1 z" ?: n
if __name__ == '__main__':
0 L: u& b6 e* U( S2 |0 f8 q
8 K7 ~9 @6 K# g1 K. Q0 [- }. B - 6 u& s% c' F, M1 P' l2 X
N; h& ` x. N1 F- L. g; |7 Q; C$ t6 K
% q& j B* H- G
- & R# ~1 H; Z$ D- U* e0 ~# f
9 W* {+ }! D& i for i in range(5): X o% [ @: C9 c
0 E" |, N6 A2 d0 k. \: q* Q* o
3 o3 ^, x8 c' w" C
7 L2 G/ ]# _/ c+ O+ v p = multiprocessing.Process(target=foo, args=(i,))# \2 e: f* a" ~/ o, B. ]) k
3 ?' c3 J, a! P- g
- - v8 H2 ?! H2 _5 O4 c
# V" J% A4 o6 X" F: X3 D p.start()
6 Z0 d2 V( g+ y# y/ y+ R
0 ~2 S4 m% N# ^" Q5 S2 \5 A* G) o* U
运行结果:
; z3 J* r& m7 E3 i$ C/ [ u$ j& Q
# W- e9 h' M$ x这里是 Process-2+ u( z/ x0 v; v
+ O+ j! J& o$ @
- " D/ u, ?: ~7 N2 e2 w# V1 a- {
7 B# S |0 ~9 ~( j模块名称: __mp_main__3 |0 q- U7 I9 h) N6 b4 }* ?' H3 y
6 V/ n* s6 e6 a5 [
( _/ y' J a9 U8 ^6 T4 T/ E' p- {' u; o# {' n, c
父进程 id: 880: a4 E( I% Q& r2 ?, W6 u
e( c$ W4 D9 F: `1 {. g- 9 O3 M- D/ g1 n T3 ^
* l' ], S9 ?* y
当前子进程 id: 52608 N0 L" \& W7 c
6 Q# L: }) L' C) K- H8 ~4 X
- 7 e% W5 w$ b) u }
& }% e4 H( N; J8 J; R" |) Y--------------- }. ~; `6 \" u3 u
( y" f2 c/ m# u' V4 a
- : C" f7 Z+ K9 U
3 u: z9 |8 n1 u4 J. }: d这里是 Process-33 m V; ]0 l" V d& N+ b
' ]8 ]+ I6 G) t0 J* X! ^; k/ f, s - ' x q) o) Q/ p$ ^$ R) n' Y& y
! b$ P; }2 D& }/ D4 b% p2 t模块名称: __mp_main__
8 ~! l* k8 r' n7 Z% C% R
7 }+ c! o" [2 r( x
, S f9 @& [' Y0 o4 n4 m( b" r8 {# [8 N$ x& G$ b
父进程 id: 880- F0 z% `* B0 D
4 l3 K) }! W2 E) u
- ) D5 g4 K2 H& p I" {- a6 P
6 [: i2 Z+ _. O: Y& D* s当前子进程 id: 4912
. S1 [% M. x1 _6 `% F
5 ]- a9 P6 t8 ?" o4 @) s
0 _7 x! |- @2 I* L8 u/ e8 X1 V& H, p# ?
' W" P+ F3 S4 ^- m' ]--------------
# _2 `8 o T V N; P& a& y4 C2 l% E& i3 y$ O7 u: z% H0 K
- 1 V: r; t. |" `% K8 r/ e
5 _+ ?$ s" X) e. z. q6 ^) h" q& Y0 \
这里是 Process-4: ?# \5 s2 o8 ?9 X
- G7 V7 _( H9 V( M6 E
- + g3 {- m( o* Q! D# p, B
T0 C9 S3 b+ e D模块名称: __mp_main__ K% `7 W- r; P2 l7 o/ T
6 R" s g3 K& M* ] - & x0 B7 u1 H" |2 r
2 i7 }6 @& s, L6 h% ~) {5 h
父进程 id: 880
' G, q( f4 n Y- N M( l6 `) }# x# [- h
- , p' v7 t+ @( T# |& a( N8 ^
4 f$ ` Z' B/ I) F6 `- p当前子进程 id: 51767 H: S C$ m8 ]% ~1 {
% L: z) M8 D7 P' l; P
( y3 b3 W! z1 J7 s9 _( e) m
+ n, K( _' ]3 n: Y: N0 O/ m& H--------------* Z+ |& o" Q! w/ [9 o
* k1 _6 |9 h7 n% Z9 v
- r3 k( X. V' r" ~: w, x% D4 o, \9 \/ o- }' B
这里是 Process-1
/ {1 s& {+ N/ a7 t$ R u7 b6 S- q, e& W- u# V! Q
$ ~# Z9 M( s! |/ x* G& k) P9 Z. p8 c' m9 ?- D9 G4 l9 _
模块名称: __mp_main__
( s0 {4 v. U* x; N& j) U7 p. [- b4 G \- l" }8 ~8 D
1 r6 _& S8 ?, d( m+ v
8 x" C+ Y( R7 E/ n0 @& q" s) j父进程 id: 8809 @1 ]& ^! y! x8 @
1 U' d6 f$ m% a8 n, `
% ]+ x u6 b5 b) U7 z
, @- Z& O& |1 k+ A4 a当前子进程 id: 5380
5 i7 {- n9 H" t( F3 l/ v5 X, @
- d4 N/ h8 Z+ h7 ?1 M
8 v6 t# I- P. c, c6 T# v
0 a2 x- m5 r) U6 e( M' w--------------
" j( d) K; _3 Y; Q# r1 j
) H4 I" c4 }" M* P+ k. }4 Q; h- , t/ b& P" d0 n# ^2 ~
' h7 t. S2 |1 W7 g+ `这里是 Process-5
; o5 z1 X) x$ L9 j, b1 c* ]1 | i0 i# H2 I/ _9 C3 q
# Y* `- k4 }* }# e9 B6 f/ _- u; K2 V- g3 I2 N; d
模块名称: __mp_main__
5 z: f" }: [" D C% G' k( f0 A
! e. v7 A$ D4 Y2 u6 Q: ~
) X2 `0 Y* U4 I }8 T+ R2 ~' r
8 B! X- i8 l% C/ e- \2 F+ P, l: F父进程 id: 880" V, `" h8 D! ]" b8 l
H1 f$ s$ k2 r' k
X0 L% G1 t T, ]) T& E8 R' T' a) I' c0 r$ ]9 u
当前子进程 id: 3520
1 b& s8 M: T% D8 t9 n! H0 b3 O R) O0 E
7 C% `8 m( M7 p0 R% y. W2 ~
0 k9 d7 y' } N. N3 r" F6 K8 O--------------5 f( z& P/ s X, y) n- [
& f& }* B# f) W
% S( X% X' [+ X% j4 a+ ~% I; b 1. 进程间的数据共享在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。 创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。 下面我们尝试用一个全局列表来实现进程间的数据共享:
* O% ~. Z3 f) ~: A: \' k# g/ H& z/ o/ |
from multiprocessing import Process. [! p& B0 \ o; k0 X3 r" n+ N% y
) \9 d. N$ B# q
- $ M9 i) d9 w* Z+ m
* {0 {- i }/ C$ o
' w2 y8 u: i8 i% @
- j6 ^# p, y/ M$ x8 Q+ R$ B. K- k
9 h" I! [* h) {5 M& c! }2 Y) O& ^+ S f" ~# `0 c3 _
lis = []6 Q/ ]6 ~, L. g
/ S5 h6 }9 j3 u" [
- $ Q) i6 V6 c/ m/ [" `# s* i
- _1 _7 B( f% `. m
N3 {. r) }2 P. m6 w2 s8 E* I: v
4 l, O3 \# J5 e9 q - v. o( G) W3 R# F3 W5 V
" S8 ?9 T$ t9 G i; T. Z3 |
def foo(i):
9 o, V6 w- {, C- Q: @
( f' X% \! A r% I% B! s; d/ A' | - 0 v% U( p" _5 {; ^& j/ S# {0 D( a6 y' D
. q: I' c: H. S4 S# g( w' }6 q
lis.append(i)1 C4 p+ \# N2 W
+ @4 U/ I% v& R2 p3 i4 t, O" a
$ j, L2 L4 S7 Q, l% v5 h9 g9 h- Z1 S$ M; G4 N3 `8 v1 i
print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis))9 U( H1 ?/ y2 h9 v Q
* ~* ~- d8 }0 b2 c9 w- 1 Y8 ] c4 \7 O/ [" g' ~7 d% j
* G, k" h, t- _7 V9 Z# g
3 b+ G' u, ]+ E( W" P8 b' K7 q/ L- _. \( I& K5 J5 `$ n
- 2 U" r- h. Y/ R/ L
, d! @" P+ X# m- W0 i
if __name__ == '__main__':$ i9 {. W4 P3 k4 _
( @. g% i) [4 f5 q3 Q' R e
% ^8 ^7 ?- B$ Y0 k. u; J0 r( S
& y. t5 {" I* j4 { c for i in range(5):" U7 Q. [% Y+ n' k. V" q6 [
" |3 ~" n$ M. q6 H+ q I* _
+ }" k0 s3 [% E4 q( D" `: [6 ?! [0 t& W
p = Process(target=foo, args=(i,))
! ~4 R! M9 E# [+ k3 K8 P- x* I
# _' f. w+ n9 M' i8 m
8 I/ m6 t( }. X" |! i5 H4 c, _/ g- V6 \0 Z+ U# ~# l& R
p.start()! t. p7 L# n( v/ L) v
4 c4 w& i6 p8 X; X1 _2 i, s' I6 H: @- ( @; O/ K; X D% {1 o+ t
1 D" x1 _5 y8 Y% m9 [! y
print("The end of list_1:", lis)
! S, Y4 b. {" S! b8 X
" l6 f$ Y7 G' I" v* N, w; F& I7 P( o2 o5 W
运行结果: - 8 n# h6 ?# @- J/ F
/ J; K0 d4 t5 J, M9 D" h) b
The end of list_1: []5 X8 l2 o+ K$ e! k- F4 Y( r4 ]5 Y
/ S& @. H) ~) W1 J- y% |/ i
8 ~/ u. J4 V) J- B% i9 E0 o7 C } ?) |9 X4 ?8 k# h$ d5 w5 Z5 ~& @
This is Process 2 and lis is [2] and lis.address is 40356744
) R ]" V) ~. y% Q* U3 g# q
8 |0 ?0 r8 f. ?- ^: e, R- 2 G- ?& @: w/ D7 o
' w' ~# D7 _; b# y# I( V- J' G
This is Process 1 and lis is [1] and lis.address is 40291208( L* M- }* m6 `& }
8 M Z& c" d+ Q# m - + m1 a A6 R2 o) t
2 Y6 ], f* P+ uThis is Process 0 and lis is [0] and lis.address is 40291208: j: M' j4 G5 M9 d/ z- g9 ^" z
# u, o/ q; u3 o/ G. F3 _0 M$ K
- * ?+ Q* P& q; ]! v S7 K' k2 ~ O
: v8 P) x7 |5 h% a/ I* d
This is Process 3 and lis is [3] and lis.address is 40225672' Z5 ]- X8 J, H: }" {) g' Z
9 q4 w& U4 ]/ c& K4 v1 J
7 T5 r% g9 d8 I; y: a3 I
7 f1 c: X( |& y& EThis is Process 4 and lis is [4] and lis.address is 40291208+ |, D3 c+ h! f+ R* S! Q! ~8 m/ u7 a
7 t7 m! ^& V' {& L
8 ]" n+ A' a, b" C. y" u/ v3 j
可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。 想要在进程之间进行数据共享可以使用Queues、Array和Manager这三个multiprocess模块提供的类。 1.1 使用Array共享数据对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系: - 2 e' l% V2 j4 h6 ^- i" T& P( L
n* h! [6 z1 u1 L+ n* _'c': ctypes.c_char, 'u': ctypes.c_wchar,
% a( j# u& D/ h. `8 f: h+ q$ ` Y' j- G
- / {# C* @" |" `$ n7 E
$ @8 [% {* H, |! r/ X9 c'b': ctypes.c_byte, 'B': ctypes.c_ubyte,8 h& N& o: W3 Y( y8 w' @. F
/ S: V1 x/ F" ]+ g5 @7 `/ R( s( y* M5 x
- 2 M2 o: }0 V. k* z, G7 v+ T
9 B0 r6 p: d C4 Q* `'h': ctypes.c_short, 'H': ctypes.c_ushort,5 ^- E4 L9 e3 C. s
0 ?2 C" O" Z2 d+ Y1 W( G- j
- - ]( ^, Y6 x+ I, L/ W2 G5 ^
7 r+ u `# [# F/ U5 _'i': ctypes.c_int, 'I': ctypes.c_uint,
* N+ j9 o! l) i9 A9 L2 a7 f a3 T) L) g5 @3 O2 N! p
5 {& s. H. t( Z. |* N$ u& Z
) a# A( n$ m3 W'l': ctypes.c_long, 'L': ctypes.c_ulong,
: V7 P+ h( O! E! A0 E
1 @. R$ b# y; o$ ?- 3 q2 L( m/ F& |
' D P- q* o( t' ?8 O
'f': ctypes.c_float, 'd': ctypes.c_double2 T$ W8 K; B" j/ S! K( q6 e( }
& J: j+ W7 F S0 S1 M! |" R: D" b9 w5 W) E2 `! `7 F! i# \
看下面的例子:
+ C4 b Q. x4 D6 }' i6 _. W# g. P+ U
0 t& n: I& b$ N0 m' Y% R% M$ Tfrom multiprocessing import Process
2 L$ K" }3 K- G' M, Q& U) D) P- `: Q) L0 S
. f6 C+ u8 Y1 C2 l" @; e, R. S- e- G" K9 J: I* t& `
from multiprocessing import Array. g4 D- J/ S( l! d# [
0 Z5 F: a- ?7 Z c9 K
1 D2 R0 V6 S- \% i( V" |* F" ~0 N
! }) C8 L: [3 E5 x/ a5 H
4 y; k# ?! W" S2 R( O$ A0 N( M. F
# | M8 M/ R& W4 `/ K
+ k2 g/ [3 L7 k$ H! C' v3 O6 k
+ _. [; E$ i" R* ?* @# n0 _) jdef func(i,temp):
9 j/ u, x& ?" f. p( ^- U& q* D6 K4 K
. g9 b5 q- f6 p7 c, l. |7 |0 t
( ^4 s" @4 o& s5 E- I" y4 P temp[0] += 1005 F: H+ u4 J, A/ [
& z) i( x; W# L, w; }
: A$ F, t v( ~ G$ C1 D) k: V, N
print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0])
& h0 m7 L# p' p2 J8 o: q9 `
1 r" E! b j4 |0 x! y5 R
! {) i7 K; q6 {) {1 u& s7 Y4 ?
1 [4 i1 ^' f9 }1 l. w8 ^6 t n/ c" w# n8 x) E4 @
- o& ?3 q$ l* H3 L- ! h" w( |! S- ?2 d3 [
2 J) l" I: Q3 j2 c2 h$ aif __name__ == '__main__':
8 D. D% \5 i3 h- Z& x( h* |/ m/ }" R/ Z# W9 [$ @$ j
- 9 d* V |3 \: q
% b% G7 Z# d7 }7 @) I- I) s: @
temp = Array('i', [1, 2, 3, 4]): s5 C, c: m. `& F0 Z
3 S$ A6 N& ?/ p+ X - & N8 I2 x9 [$ V. ~8 C" A$ Y
9 X# N4 M& C8 u+ n; z7 ^ for i in range(10):
. ~ H9 H1 F2 E9 d$ _
/ r' e2 p5 M8 i
' Y" Y0 a# k9 z
2 N+ u. |8 o6 K9 o, S' \6 F6 B5 h p = Process(target=func, args=(i, temp))0 B6 G9 a) j; o: X0 }2 Z/ t7 {( c4 Z
% n- S3 n5 @) T- 0 u: r4 J3 W* D! y# A/ H
6 \9 k* s7 S# j p.start()4 h# c/ D9 T. f7 | ?$ G3 g
% x: A: {9 Z- E2 f' q! |# w4 l1 l/ F( R
- R# }3 ]5 W1 e
运行结果:
( C* s+ H- U+ O/ c) I
* v) W: X3 T" h" h/ B进程2 修改数组第一个元素后-----> 101
- N( K g1 H& N! G) X& K
9 w6 [: S2 c8 {- ) H" L/ B, m6 O4 n |
J1 x7 Z5 N5 j! W2 l c/ }进程4 修改数组第一个元素后-----> 201
6 _% w3 Z7 ^1 H+ J! D( Z0 l& J6 ?; x9 S2 R: Q
/ |. h: V) Y! p: L1 D1 ?# ^) n5 m
2 X5 R4 R1 T* S7 P" w1 B1 c$ `2 d进程5 修改数组第一个元素后-----> 301
) V* I7 R( L; L e# A
* E7 i' E1 o1 h2 y2 g$ A5 i6 }
7 ^8 F4 r& e5 Z- c; r* _$ V! M$ I: g
. z/ S# B7 m( ^) ?6 p7 ^6 H2 M进程3 修改数组第一个元素后-----> 4017 v G- a- q1 S
7 a2 z/ d/ r9 n9 _$ R& [7 G% \
+ m" r2 c0 X0 [1 D. U# p; \ e9 P3 G1 b# g' d
进程1 修改数组第一个元素后-----> 501
. w h1 a6 ^' M' V7 I# v2 ^" q
( ~% x2 t" J2 n* u6 h
" k- d0 C: q$ @+ P0 }- n, |0 l0 z$ a$ c9 U
进程6 修改数组第一个元素后-----> 601+ ~/ Z2 v9 p B' h# q& R3 ]+ g: A
$ G. `0 l3 W0 ^+ \& ~1 R9 |
- 4 k. \% d! W. D6 T& @& E1 r7 @
6 ]; t. T% t3 [3 A7 l3 K" T
进程9 修改数组第一个元素后-----> 701
5 }) m' R. l9 d7 [# r; X$ T0 v8 H: \5 Q+ s( m
- ! ^* G8 c9 o [6 _; m7 b+ p% ^1 Q
7 @7 z5 [& U( Z; c进程8 修改数组第一个元素后-----> 801
8 N8 D2 S; F3 d
: `' P6 [, o; ~) z2 O
% [6 J$ C3 ^) |* D5 M3 ~, _ V. G! U) k
进程0 修改数组第一个元素后-----> 901
: w& }. I" f( o0 b# m4 g: \+ Y% q( |) z% x7 L; y& A) H
w* }( T4 I3 C7 t7 G- Y2 M
' T; w' o6 j. N/ B: N% m6 ~! c. s5 x) m进程7 修改数组第一个元素后-----> 1001/ U! g: k6 J. I& K" @$ y
, O }4 V4 W7 }& m$ O& {4 Q8 N4 {, ~
1.2 使用Manager共享数据通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。
: D5 k1 R& }" r7 i8 J# W8 W- r7 D" h1 D1 b K# k
from multiprocessing import Process* ~/ j$ V, K+ J& v
; }3 g; i; O1 k- d. j- , g& ~# i- G7 J' b J" W
0 p- y; \ W4 e: V" L; ?8 Rfrom multiprocessing import Manager; m+ \3 }/ H" H0 ~
1 A" a, P; @* F8 S/ x7 l& I9 w5 S
, ]3 ]/ s) v/ d! }0 q: N# ^. j) Q% L% R
2 k( R$ C! c) Y. @) X9 e. h
1 g' n) }8 i/ P/ Y! ^4 J- S$ B# F
4 I1 P1 ?3 r3 N5 K
4 U7 f# f: t; s5 _+ J; Fdef func(i, dic):
) E. V; f. B9 P$ z) i' Z$ v) U0 ?! E. c
7 v8 G7 B7 E1 c2 U# [5 I! `) k1 ], [0 e9 A# w9 ]0 u
dic["num"] = 100+i4 T+ D0 G* T6 o0 s1 q% M6 f
, f, Y$ K6 Y% H i$ ]+ y; {2 o& m
- ( h6 ~! P' N; f2 n1 g$ i2 [
6 r9 p$ J4 E' W6 S3 R. {% C+ U print(dic.items())
2 i9 V& T. O- v. C5 h4 ]+ u7 k
- * B0 m( t5 v% f. e" [" k8 a
2 s# v1 h/ I" |- L; W
3 o# t8 h8 ^ x
6 B, j' I, W$ K4 P, i. X - 4 l4 ^ L! ~8 V% h3 J/ M/ r6 K
3 q( b9 a3 }! Z7 K3 vif __name__ == '__main__':
: t% T& [& }1 f Q; X# S! o
! o' j% [/ b7 p$ l; x2 \. O% u - , |3 U9 R; d3 b3 v8 ]: n
! ?. P# m+ l- g. e% G1 l" o
dic = Manager().dict()
* g% l* I% ]/ f7 v2 A3 K# F% w4 u
3 z5 d1 q- }9 q) L5 `
0 B" a( k$ K9 f! y4 t
, g* l; u/ E- B, v4 U7 j, ? for i in range(10):
# x# X, K- b" {+ w/ @' S& {
5 K% X8 d M! O- 2 |1 n7 C* I3 I5 P/ [& ]
+ j* B" a! ~7 W. T
p = Process(target=func, args=(i, dic))
4 }$ E9 _7 H6 ~4 `, Z$ A) b/ N& l' U7 y. ]$ H; |
- ' a* I$ b3 }0 ^
2 u8 G$ }' C4 \. y p.start()7 p$ K8 X$ L% L k- q+ c) x
9 P2 @% n# B6 E
' n& _5 p3 O# K5 f* v/ E/ O. Y" n- K8 X( Z6 }3 q' q
p.join() H/ h0 l3 J/ Y. w/ O
2 k# ^3 y- {( m; Z* T; M& H, x/ ?6 a
运行结果:
" U# U. v4 K3 J3 L3 J7 a' f. Z |5 U/ I9 ~" ] G5 I: c* i% Q X5 L9 d9 J
[('num', 100)]
6 [0 X+ F$ Y6 S9 `# G! c% ^- F/ l7 G
- % w A$ ]; U g# U
6 L& p" n- Y: n B0 e0 |+ v1 ~[('num', 101)]
6 l2 t3 n/ y+ [9 n9 B" }9 T
3 e4 [* i/ Y$ f9 h
6 \; |, ]: r# C7 a
8 w# j2 R7 z0 F* E: V0 ^% t1 s- G[('num', 102)]. P& D) H( \+ p A( u+ x; k
$ l: V4 j8 A( _: m- # U1 Q F- Y# v& l* {
0 z+ N' Y J# u[('num', 103)]" ^) U6 h4 C, x. k& i7 W
; l, ]) `2 G4 B9 @! W' Y% e# j
! b$ {+ G. {2 Q7 p3 n$ f
* B# ^5 D2 A% J# I; w/ `; `9 s[('num', 104)]
& u; u/ S2 o+ Y3 C' q( \1 F, ~* {$ l% q% l/ ]$ w: i# e; H
- # m$ v& C$ }. ^7 P& k- |9 o# W
# t7 S) i- X' m& f! \; E; z
[('num', 105)]
+ X! \ t2 j# p7 e0 d5 K6 }
8 A: n: f$ a+ ]8 ~( d- W4 f, I! ] - ( y) [- `# [# N* a) f" Q
$ n7 @6 T5 N+ p! s5 R
[('num', 106)]
- Q M6 w. _1 Q$ }* \
: S; J+ L' K3 {" n5 Z
+ G5 k S [7 v" d( M; G
- `" B6 L- I; }8 H, _* [! s: x[('num', 107)]
4 b* t; j/ J; t8 \4 u5 ]' s9 B+ c! Y# e0 H. R7 @% o& Y' k
( u. b$ N: N% s5 G& s8 w1 g2 G# O0 h, G$ N% e u
[('num', 108)]$ O/ B6 U( `6 p. C
! m4 q/ D4 S9 P. [' z# F
- - o& \# o1 l! a9 n0 d+ i- v
^$ `# q& H$ T7 ?
[('num', 109)]
7 ~7 y! H8 @: A$ A) A2 t" M, Y5 {0 e% f) h
, g! F+ B7 {9 K: U
1.3 使用queues的Queue类共享数据multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示:
3 q7 D- H; `3 z9 A& _; M! c N& \* Y7 R7 n }2 `5 f* _/ m
import multiprocessing: `# d. D7 X+ \/ F3 O
& s5 {- k3 ?" e1 L
' s: K9 z0 p6 Y( B4 u1 [
) o5 d5 |* K0 H( X) m9 Nfrom multiprocessing import Process& y% y6 n! n2 O. D
: C' `: k1 }" _8 ~' F( F( e6 r
- ! c9 s( y9 z; Q( o9 j$ M
6 R' z( ^" q {6 h7 M, A& d. Qfrom multiprocessing import queues4 [. C0 e; r. F
3 J4 ]( f0 a( ~# C$ X ?/ h4 S
& o5 y& C- P8 I; z
6 K0 q8 f' W* ?0 E1 G- R p2 X, N, }+ q& C: t1 P" \/ O, c
/ o5 P. K0 }* x Q& T% C
7 v. w. @0 l; q! D; o; ]: {# r5 H4 l2 s, N# q: U; ]! ]
def func(i, q):! `/ } V7 C1 B6 t9 V6 j, i
! N- ^; A- O% ~# K
- v; ?0 ]; ]3 D: y; A# g5 |- M+ B9 [
; }4 a9 @# K0 H; u* ~ ret = q.get()
6 R X1 ~- w! ^) n# w7 U0 B9 `8 E* n" v* O6 t9 b Y% w- ^
6 [) {& d, _% e$ B1 W, n: @
& s1 |* x5 W6 I, m+ {* g3 n5 x print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i))
9 E, {0 ]! M% ~# L) d
5 D. i: x7 I# f- _$ C- ( X5 k9 ^& w' ]: t4 h
" i- \ W5 r2 m! ~6 N: Q) z
q.put(i)8 H5 g; A* u R, e( i5 m' f
( J/ N! o8 c# z7 `: X6 B - 4 ` U/ H8 A4 v
0 Z) N. p7 q& r" K$ P# W" C% y- w% Y
7 w% e& d/ S k7 M/ N
* H! q6 t+ l2 H& ~& p
% r: e6 q( g. }5 Q9 h, Iif __name__ == "__main__":; e, r( o1 j" H1 ?
8 ^, g8 l7 V4 d- + p2 Z4 q, x. V# N
8 K8 }+ u8 O% z0 k! X; ?. E: t3 T: i lis = queues.Queue(20, ctx=multiprocessing)
# Z4 z7 R6 { m% p8 f5 F$ C I
* ]% x7 ]* ^9 T2 G
4 D2 w6 b0 g# u+ \" ~# q! h, o8 u3 g% N! u8 j; V. Z1 ~+ q+ W
lis.put(0)
9 S0 Y T6 @- v& x. Z4 Z, {2 W7 P# ?: A- b K' w
- 5 b4 x1 d( |) k T
4 f" u7 K0 x2 N) g. L6 a
for i in range(10):
# W! \" h# L! B; k e _. g" N* |6 O4 Y! A& l
- / P; F C( k1 \5 u
& O+ P! a7 G+ t2 r p = Process(target=func, args=(i, lis,)): ~ E4 E- A2 x! n0 k) y+ @
6 | I' e7 p2 g( T- Z3 A: }' r
- * M! {& ~/ c/ ]+ [8 s5 u) i+ m
& ^9 W& k5 z5 f f) t
p.start()# F! K( D0 |# V+ C% N
; U8 {) N2 v8 B- H* M, w
( h+ |& p5 m7 R; Y' p' e# }
运行结果: - 9 O, M( R4 @- v4 Q. }
- e; b! R, e: H3 e2 q9 X) @8 \1 E
进程1从队列里获取了一个0,然后又向队列里放入了一个1
! x0 u) k) J4 m- o
' I, X( U. Z3 e" o; U - q* n8 M0 E0 ^0 S
! Q' {4 a/ R6 z. a# N进程4从队列里获取了一个1,然后又向队列里放入了一个4
9 s: d8 u8 c5 Y: p# m3 e. j; ]( ]1 A& Z& u7 T+ u
- $ y2 u/ x& A" F5 G* P& F
# }. r7 a$ w) K, V! F
进程2从队列里获取了一个4,然后又向队列里放入了一个2
6 F+ t* `/ m9 W/ C) j1 [) e& ]- z# m8 H }8 u# N
Y: B* m; R& j: l& {1 R* Q' a$ ^! g! B/ `: P4 x
进程6从队列里获取了一个2,然后又向队列里放入了一个6
" _( v- ?1 r7 `% S. Z `- Q# {9 H
- 1 ^" ~/ M( B% \$ o$ x* V. ]( u
, @; M" x% X( ` z; ~进程0从队列里获取了一个6,然后又向队列里放入了一个0* k! d, H6 l. M+ V& m
* g* R, A' _# F0 t5 k- K/ D4 @. ]) T
& H$ ~. N& Y9 s) B
3 V& K3 e+ \2 u$ L# Z: K- y7 d进程5从队列里获取了一个0,然后又向队列里放入了一个5
- p" I( Y6 `- B1 n! | W
. Q" @* x) x3 I# v- ( L# O' t: B9 U. z- k5 Y; @! |
- ]# Z" U! D- G: c进程9从队列里获取了一个5,然后又向队列里放入了一个92 S5 e0 U5 U5 g" Q/ |4 a
, J7 ?* f+ }, }0 g: O
, U* d7 T6 h& U5 K1 K/ r0 \" Z! p: Y! p& }& W
进程7从队列里获取了一个9,然后又向队列里放入了一个7! f" Y4 w$ p* ]6 h- f+ A/ n k
6 k; X" ?# T0 W& p- : I8 o- O. P7 N" C$ v P
$ `0 K7 T4 x+ L9 s& d$ l. z* _$ @
进程3从队列里获取了一个7,然后又向队列里放入了一个3
7 d- x+ G" u( K" P# m$ q) C5 V2 `0 k# M8 L [% X' K
: j0 Y6 B1 L! |6 j9 @- E; j+ ^% K P5 z
进程8从队列里获取了一个3,然后又向队列里放入了一个8: X" V/ H' h: a' I5 f
1 l7 I6 n! @9 \+ E/ Q$ y3 p& y& Z
$ M8 k8 z/ b- u( ^2 c5 v( b) _
关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。 2. 进程锁为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的,这一点非常友好! - # @: k7 T; G! ^! p7 Z* }' e7 \2 L; N
Z2 G/ _- q+ S! G& |from multiprocessing import Process$ A C$ e$ Z' c" V
' |) j v* p: O. Z+ o - % [* q: f- V" E! D% s
- H. M$ m$ S+ `& |8 f i: S
from multiprocessing import Array
1 z: k+ k0 G$ V9 ^. e6 F3 K& W$ N: b. I2 I, m) [6 ^
# }2 j" P6 z' `/ {4 e4 t" \2 O6 F9 c* i) _9 A2 b: }7 ^
from multiprocessing import RLock, Lock, Event, Condition, Semaphore% U6 A' ]+ y6 y: s7 k6 A) h
& H& T9 P9 K3 {2 {1 v% I5 B [
- % S% B: U8 N$ N! F1 p: @4 J9 u* {' s6 C
: A! C; Q8 a! a" M% S6 J, f, pimport time
" f3 L- m: d/ \" b" S3 N
: T6 p/ I* N" _ W, u P8 X7 ~ - 1 l! I8 @9 Y# t) T
1 j- N" Z( }' ?0 D/ X; f9 D
4 [! F7 C1 y. j9 u
' R& X, \$ T- V P$ m' h) x7 p! O5 w
- 9 A6 y6 N8 d8 i8 P. K
9 k) b! j8 ^6 Q' V. rdef func(i,lis,lc):
- p& ?; j6 T4 Q) ?; s$ C9 D1 k2 L( {$ y4 V
- 4 }3 `0 N. y. U4 A; C3 a9 S; q
8 z" M' L4 Q2 e; Z1 J6 P; q
lc.acquire()
( E9 ^ j. Y: J i) j4 p" A7 [+ ?, _) N; K) Y- N9 n
- ! L- x, r n# k6 U
% V5 I' Y" o2 g, C$ z8 V/ Q+ @ lis[0] = lis[0] - 1
0 a6 f% I' u. W; N6 S9 j/ @! R6 S/ v0 v
- 4 ]2 R* [- }6 O6 Q; | G
6 ]2 P( Y5 g6 |! S: Y3 y; s, Y5 J
time.sleep(1)7 D6 a. o) E9 }0 N" f2 W1 G3 h
g( T' s4 @ P& v
- 8 X# G& m( c; J( S7 _% y) Q
- e& n/ A" y$ U1 L9 Q
print('say hi', lis[0]) `$ D. t! ?6 k$ G5 M* c4 `& T. K' `" e
% t! F+ i0 w" G+ |. e3 T0 ?% H! r9 f
- + \0 |' ]* U5 s. A5 K
7 j6 e; {. e* J, y lc.release()
# z; m: P) T" |5 u1 Q
4 z W" A! @8 m" E3 s! k# B
- a5 y; ?# h# u) l
O5 X2 K9 b. o; |3 P% [
- \0 A% p( w; g" l( E( n$ n f0 N- E8 w! E3 p7 q; [8 m& ^. T/ z0 M* V0 Q/ u
- 7 V; d4 Y( P ]% {0 T
- `" M" A) d" P* l! z8 v
if __name__ == "__main__":
" w/ k% i# a7 L) ]# T4 r5 a# ]: l5 Q
4 M. R8 U& n- W8 ~/ a* \2 J
* x/ w! y) z3 C% \ array = Array('i', 1)% q% G) H' F) _6 ~
# I- [& M5 Y9 g+ x% `+ g% v
2 \# {+ s$ c% i7 }8 w
* M! _) C2 p0 r- U$ k array[0] = 10
2 M# v1 `; p' g- y& |$ @
1 C" e) b' X9 x
, R3 Z- V' _0 ^
% {' \$ J2 B6 _) F# N- o0 u3 J lock = RLock()
+ [6 {2 }9 Q4 O0 ^, Q" L9 T
/ s3 m8 t6 E8 z* R1 k- L
, D. ?. x- u, i3 ?4 j! g9 V( p# F [9 y' X; U
for i in range(10):9 i+ O- r3 h' i. H
7 T3 S' h% I: }, ?$ P6 c- 2 \& Y- K( l0 O$ |
: |$ u6 l! d! e9 z. @3 _7 z& |3 _ p = Process(target=func, args=(i, array, lock))
, _6 n" R. X1 `6 E- P) I
# X7 Q+ d$ \. h$ E& M7 J& _ - ! n' Z* C/ |/ p) F; y2 v) l5 w! f
c% \$ p# w8 k' A; `: H& q p.start()
2 O7 A8 t) U/ f/ I U8 V2 g6 s4 t& R7 H6 E; M5 E
/ v" D* ^# {( {+ n
运行结果:
8 R, U6 S* z, R+ B8 T7 |8 M0 T5 B; G
say hi 99 k9 Y$ a. e- K# ~4 e$ N4 j
' Q% U( J, x; Z( c6 ?6 z
- # [3 H F- q# z/ n: p3 e8 s) C- x
1 H2 y3 ^! f& {2 Z$ u- w
say hi 8* d @1 f2 o- u3 S3 d, g
* i, t7 `7 f+ x3 C8 K
) K3 J: M8 l5 M6 X8 B' t+ V
+ b# }( r- h$ f! _( isay hi 7 [6 f, {2 W. J+ r( ^9 N
3 w8 i c# ^7 y4 C
- - q, q7 ]8 c2 a/ \/ F* z* E5 H
( G. \5 z: `. w, t+ ], A% i
say hi 6
& [5 b1 l- O5 H' Z1 M6 S% I
* j. E8 t/ _( B! j; b
0 T4 \; X7 d+ r) J" Q7 ]! J8 A
* }& [; a% l- X3 asay hi 5& U' O% H! o/ f
6 N) h3 J! Z* Q9 U
4 r5 J$ x R; B$ p. n2 G6 Z3 @
6 x: g" x+ c w9 s: S. Asay hi 4
6 B. x1 R" z* V" `$ T7 y8 T$ x7 [1 b& v) k. e
! ?4 ?/ ~2 G) R5 Y' d/ |+ U5 b" x5 f/ k& O5 i4 ~0 U
say hi 3
; R2 {) H* g( M, R$ B1 O9 f. s: p& U/ V, `# `
- 5 G( B( t ^5 i2 ^5 J M
' B( s* q; \0 |( J7 @say hi 27 }6 m3 B. e* g( V8 ^' E& p, h4 y
- `; g4 ~- x% b% a
- 8 f. d, \# J! p7 f
" k* B# l. ]6 E. ~+ ~4 s# O6 p
say hi 1
2 U p: i* x. G* @! z7 x' B5 }' j8 o8 W* z: C; Q
- % K; Y0 o9 B6 }* t/ W f$ w
/ K/ S- V7 W! X, j" m% v
say hi 0
+ G/ c7 X+ d. H# u6 }! b: i, Y7 X/ H" J, @) ^+ {8 K( t f
2 t3 b$ a0 F1 T* S
3. 进程池Pool类进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。 比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。 进程池中常用的方法:
+ g* }$ l Z5 P; w/ j; E; u; m5 S4 B) K
from multiprocessing import Pool
" L; ^/ y3 |+ ?5 [" m
2 N4 N5 z* j. g7 ?# ?- : [# k% ~8 i% I0 [) a: |: ^( K" S
7 U; _% `8 w5 P8 f* D; g
import time6 R; K9 t8 Q( L: {2 `7 B
/ H/ }) k8 W& }. f/ |- M% D
- 6 ~0 F' A* r7 C. w- {# U
5 J# l: k4 |" h$ _* M" J( K: W- t9 A5 o, e$ k, y
. n& X% l1 h! P2 _. q1 ^
( x) s; N# D1 K& A8 i- h
2 ]) t0 `9 W* y, M0 k( p. a2 Qdef func(args):
/ M. B; w0 D5 h1 w7 ?/ |4 G4 a r/ i5 k
1 Z- E% G( q3 y. T- ~
0 {$ v7 n. P* w u3 Y! z time.sleep(1)
. [ S0 Z5 }+ Q6 b5 Z1 J- W
" I7 e2 s/ X) E. x! D2 K$ a
; S9 L9 ]$ s' j8 Q# m0 d
4 I9 e7 Y) h) V% B print("正在执行进程 ", args)
' X, \( T/ @' g2 N
" K+ V3 ^ C" d* u5 `- + l# }9 V: R, @" y9 Z0 T. z
8 M0 q' H/ f1 d" q* i- A4 F0 d4 s* @6 ]$ R- O
' c$ G- \2 @4 N' Z- y6 X# _
- % e# \$ h; \- A; x
+ l) d I$ y5 q- r! y2 Hif __name__ == '__main__':
; M8 g5 y# Y" d: k- s2 n! A' d4 H$ N$ ?; r7 c
- 5 ]; d- J) \) S6 V* M
- @0 }9 ]! H; h3 ^' F _
7 J& _$ ?5 R8 {' }0 G- g9 T" l5 z" x' a: D% d
- x! M# s' @3 O: b; S4 c1 ^& b3 q* s
q: G: e/ r' d/ U( S
p = Pool(5) # 创建一个包含5个进程的进程池 }& d: @9 w9 `/ {! `7 g( u
$ b' y+ M& V0 N, s. J0 P
6 e" c% G5 A9 K0 W3 b+ y
6 [ D5 [ ?+ Q# I
- u! C3 U! Y" e( @& k
! v4 a+ f3 p. O# U- 2 x3 ~/ M+ ]% n h
`/ Q; T8 h6 Q% n: t8 S; p for i in range(30):
6 f9 E$ ^+ Q9 P+ L* U; U
P; P2 i6 |# I! ^8 a
8 G/ U p7 U# |. \1 S5 L/ w- N
1 c; r+ N4 y9 P' Y p.apply_async(func=func, args=(i,))8 [4 U6 Q7 A0 a7 r
9 k3 e/ Q; e. P1 |2 G2 I- 9 o, y% R9 a4 H/ p( f
! |/ B6 q6 I M1 g5 z
1 B& N) o' e/ m' o* [6 k( H% q
' z. }' L w) S$ u4 `! b - ! B. N3 u E7 p8 u, E! q. m1 a
) {2 v6 r7 Q/ h' b
p.close() # 等子进程执行完毕后关闭进程池
7 j+ t4 w; F* x$ o8 l
# L2 X; Z6 N# T' H - + X% ?; k' c/ v A j
" k+ k3 p( R( u9 } # time.sleep(2)
2 d/ M4 i: i, B, J7 X, ?# R
- l+ J) W2 U1 Z9 l* r) Y' T
+ C7 y- g( A3 {* G
6 M: Q5 m6 C! D6 `% t" u ? # p.terminate() # 立刻关闭进程池( f" h- V" s- @; n9 t7 @% _
, v/ R: r8 B1 W. |
% }. s5 M, S7 r. J+ x+ f4 r( c1 i4 {. d
p.join()) l( U" T! d! c
: T1 e; B* t3 M/ m) p6 q; d
, M2 n: v8 Y% ]# W# t" o2 p
% G% F: B' q' c* Y2 T( p请继续关注我
% U/ v7 V! T( F$ C7 f7 ^/ d/ H# c* W' T
|