# E/ k, {9 W1 A$ X8 v$ D# {# }! `爬虫(七十)多进程multiprocess(六十一); ?. W; t& I; t. k/ K
Python中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。 另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。 下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。 - : `9 V# O r# I* u/ ~
0 C/ h6 u t' u- C4 y( L1 ximport os
$ q# N7 D6 [3 R$ X' r' m% V# Q/ p- b' W3 }0 k
- 3 `' u. }9 J, C$ F4 `6 x- Y
# M0 R2 e F9 {8 Z* d2 ]- aimport multiprocessing
$ I' t W w4 o; J* Q% [( n2 t* F A( Q2 `
- [; q# C: L7 w: y4 R
: T/ U4 o6 Q6 t6 D1 T1 [8 N/ M
( }0 z7 N5 w- \% C1 b/ U. { ]. [1 v/ O" J5 }
- / a {- d7 R' S- ^: l) T) {
' ]. c7 k& A+ i5 _8 a$ Ydef foo(i):5 v" d4 M/ n/ g
& y) X, L& A4 `# C: M
, R( z) [4 J0 L4 L
/ _, U4 L* Q: v # 同样的参数传递方法
/ s) D( j' f5 O. b0 M1 E$ W
# ~. s, B) y) J
! h5 U# I. d2 F2 L( L- W% ]" h3 a
print("这里是 ", multiprocessing.current_process().name)$ |* B) ~+ \8 q* y4 J" }2 r
8 N; q$ r5 m( n/ }$ e+ a+ A# E% v
- 3 t9 A& f% D( ]4 B$ |! K* _
0 L" u% U2 c$ M, K' p+ P. ]( ?
print('模块名称:', __name__)
: A" D7 _+ P' o; N
6 }: P3 [2 M2 O* o, d/ I& j - " q/ z4 w, }) L! \4 B" m3 k
8 V5 k& i, r r8 i$ Z: K# p print('父进程 id:', os.getppid()) # 获取父进程id1 f. ^! l( ]# J- v
" ~3 a% L/ c% ~2 Z
- O% Z3 `9 |8 y8 [* \: e; }% E& ?' D# g, A# `7 t$ F4 [
print('当前子进程 id:', os.getpid()) # 获取自己的进程id
& R& R4 q0 H' M4 \9 ?3 h3 R& l& n. Z0 q/ F
2 \1 A8 A, X! K8 n. e: w3 m/ ]* b0 v# m$ m6 {3 Q6 j$ |
print('------------------------')
J* D1 K, x5 r0 t8 u$ q) {
/ R/ V% {# w- V1 N
/ d- ]" Q; C( Y) l4 Y- t! q6 c7 R0 X- N/ l( ~$ [
% W# b6 ]; W$ N, ]; W( I3 G
6 ]8 l f" v. v2 O1 x5 y; e; y
9 ~' a" O9 v6 @( V4 k
3 U. f5 f7 L, e+ c: H9 zif __name__ == '__main__':
6 w9 x$ y1 @' A7 h9 i3 t
0 Y; j3 D, q; P7 ?- * [' u' X% d" O. G
4 J7 S( x3 R- U& r8 a+ G6 s2 {
* ~( w; n) B; c0 b1 E+ N8 |. u4 K
- - }. I3 H2 d) m& w, }
2 L! k+ }2 ]) H+ c, S! e- ?3 n" @ for i in range(5):
0 U( [7 j8 R; z8 ^4 w7 v9 g) p. O- F) f
- * U$ m, Z/ I5 V9 I1 r. w
2 a- Q1 B0 e0 @' b9 \8 x
p = multiprocessing.Process(target=foo, args=(i,))
9 v% m o! w4 g8 M1 v" a! |3 R5 W0 f. m H. T
3 Y0 l7 u. \7 _) _0 q6 l1 r3 D
+ `/ b6 {7 R1 \) h p.start()- F3 ^5 l; y* Y& A. x) R4 ?, w0 y
$ v% W0 F9 i* H* O& x
: T" z$ _9 \5 t; b0 h K
运行结果: - 4 w1 o0 ]& s7 M( Q1 f9 O
) x; s0 E$ j0 B3 v& k
这里是 Process-2
# L; J4 \' Z7 I0 K. P! I, g1 q: ^' Q* y5 O( s) F; x% A
3 B1 C0 ~# J# G9 z5 ]2 D0 B" }! t9 T+ ] }% j- g/ C
模块名称: __mp_main__
, _! M' a, \7 ^- S8 p% Z6 g" i9 q4 U0 C
2 m0 b! g7 d4 k5 S2 `( m! ^; Q- w0 Y( M
父进程 id: 880
' P' _) F1 |0 i) U0 Z% a# c- p- |. a% E
- 8 O: S1 m" W/ R3 q, s9 P5 b
- y7 G `1 Y* P2 w5 I% i
当前子进程 id: 5260
% U- W4 v p( k$ R K0 a r% ^
, T, m8 L9 P2 g) n0 J; {" N) h - - E0 U- Y) z3 H9 j1 L2 A+ w
* L, N) p6 N, P" Y3 M, Z! P8 u--------------
' l/ x0 _3 K$ N) q( f8 ?/ O- s; b v8 m. S2 i, p9 \
- ' J. P# y/ S' I# X& k
) J9 }) }8 A8 N' G4 L8 d# K/ j3 r
这里是 Process-3
* i0 R5 ] T% k3 P& A" i* \
3 V8 h2 o* _2 d ^6 s0 Y6 Q
8 ^/ h- I( `: L8 @. Q5 d& p4 Q5 o w. C6 j: x
模块名称: __mp_main__" s S, F6 x' ~9 K4 g
1 V5 _+ _! ?: N, L, W" N- y/ b
9 ?& C |; h# R+ r: g' P; W
! f9 N1 j9 d- A2 G父进程 id: 880
8 R9 t& W! A) _- N3 k! x
6 }/ l. l7 w0 F
, l: o# o% _0 `- x8 h2 o7 P; w0 B* O( Q1 {% U( g
当前子进程 id: 4912! C2 u$ d4 Q) h+ k7 q }' H
% h, n) d4 N( R3 Z4 S; k/ R7 W
) J) ]3 ~: X. V' S6 ^3 g
; @% K+ E* c' K! \" o+ ~5 ?--------------
$ O8 x6 q5 c h9 {+ d1 d/ c
" b5 J* f0 t, b; V- N' c. b- + y- k5 }! M+ a9 G; g' D1 P
0 m) C1 w, B7 A这里是 Process-4
2 F! Z& s7 \9 t
, x% f3 C `8 ^ - % ?+ A( A- `) u* ?* b( {8 a
% t! j# { ?+ w
模块名称: __mp_main__
& h' I1 z5 L+ L+ m) `2 e" ~$ G/ Z- M# c# I
- & ^, O! t% G7 R9 @
& R& ]& _ y7 G' v6 _7 m父进程 id: 8804 ]* H8 \: g4 X4 k/ X- ?! U
$ q: @4 [! d) G/ E* I
- " q) A& J9 Q3 G( E9 N2 D3 W
6 h" G; o3 s2 y7 Y
当前子进程 id: 5176
! B L/ w+ ]9 V, a" E9 [$ `0 p
& H3 |) V" n1 w& p5 F
6 C' C( U5 p+ ~8 @1 f# u
6 a E3 k/ G! S! m--------------
; Y' x2 t, D0 R N6 ^- @, ~7 t+ g! m% m
- $ Q0 f* K8 ^$ H
8 w& H$ g# \0 E( t( [# V9 V
这里是 Process-1! \6 V/ a1 t$ P" T5 K
: _: T8 X z+ Y- j
Y6 E7 a# ]& t5 ]" |4 f" j! G/ x6 e" K* @% t: P! {0 A; z
模块名称: __mp_main__
' L! N) c' C( o% B6 G2 D6 v8 T/ s% ?
6 H! R# M N) O( s% V
6 C K. F: D, E$ E
: x3 \+ } [. q& D2 L9 f父进程 id: 880
2 Z9 B- v9 S- r% q+ o8 o! q- ^+ N* o) e
0 \, J7 C3 e2 R4 G4 [( ~8 ~( D# S" J# K5 L" b9 b
当前子进程 id: 5380
7 y/ p6 U( {7 W( g. N& S7 ~0 ^( }' x
- , h8 |8 P, Z; Q8 G$ ]7 t( k4 a
; J7 r& |! y, N--------------; X! Y8 a4 ^3 J4 S2 j* \1 ^
; e: i5 P1 H6 l, K8 o
- ! `4 q7 @3 M2 z3 D( v
2 [6 l9 p; B! g; o4 J# W
这里是 Process-54 ^3 I7 k) p( A/ G) i1 t
4 L3 L5 H( w' l+ |: { - 4 \4 V5 x0 X" Z! @* |, y
8 |+ Q) F/ Z! _; ^3 ~$ |2 n1 s
模块名称: __mp_main__( a j" h% Q% y& p/ `+ j& ^7 O6 C
- t, A0 m. {" ]4 S9 E
e- j, A4 J4 C3 w+ j$ v& ]9 f d6 }$ v8 ], s9 u* m
父进程 id: 880" k9 U. Y \% T3 O7 \' w+ t
# R# f' z, Q3 e- m/ r1 g
4 X! L. i9 m9 S' \' ^' \3 t1 l6 A2 [ s( B( |) q- D! t
当前子进程 id: 3520
; H Y& w D p2 F6 i
; W/ D5 q8 ~2 u, q' ]2 P2 \6 |
' V5 h% J, C6 n1 K! v
) x% w4 z% R B) _! G% c, h ]--------------3 j- b% M6 n9 Q) c+ `/ j |) \
! ?& `6 Z8 x$ U) N5 b! n! V/ V
1. 进程间的数据共享在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。 创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。 下面我们尝试用一个全局列表来实现进程间的数据共享:
" {/ \- X8 A u! t0 f3 `! A- m0 m9 i7 S" _4 F
from multiprocessing import Process7 o4 _8 f$ e3 c6 b! s- s& Y
. ~& l/ e. ]+ Q! k/ q4 F; l+ q# I
- 6 p) N6 n" ]$ V7 C: b; e8 \
8 N: @" R- L# d* y
) V( h$ H% y4 l/ w; S5 W- p' _$ |2 e
# a$ P' d/ {% e
! y% t9 H( T! @! T. X7 Y: ]lis = []
& Z* L& G1 J8 f+ z, G y7 }
* t3 q. J! b5 C# |- 5 Z2 d6 A+ Q% l9 E2 g# I1 f
. J$ R c% G/ u6 q. C" [. l, G/ f. k4 A4 V
' w" b! a$ S" Q4 [, @7 }8 y- p' A ` - 5 \# T/ W6 _! \8 `9 L
/ Q0 ?( i6 q) y+ h
def foo(i):
& M8 A( B Z5 O% J H& X
! ~/ `1 Z9 u& H B - 5 w6 Z7 ~# F0 P" \) g
% ?" y( L5 o& s9 j }$ J
lis.append(i)9 i, y7 I7 f' z @
' v S b1 J, V( W6 j" @
- 0 x; ~- m( Q8 I. g% v8 u4 n
* B r5 |3 f' Y; E6 h- f( r5 s print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis))- @7 {: o. |% q/ `7 f) E* c! h9 a8 K
M/ Y1 \) U. y! j V
+ A' \) o" J! j2 g, J9 h' m6 g! [2 A6 `
: R$ s" Y/ U- t) C* p) v3 N
4 ]: g* M2 h& v3 a" T7 n- # ?- }9 \" g+ x, u
& l+ f7 Z! k) U1 ?" Xif __name__ == '__main__':
$ K3 z* ?0 [! c& j+ L, v* H$ W0 v1 Q" w" ]/ V# d
; A4 c, p! z1 E/ ~
. p* A$ K4 q5 k1 Z; ] for i in range(5):
' a/ f$ R/ W: f
9 ]5 b; S" `7 E, s Q
7 o' v7 X0 _/ W. ]
7 }$ N& I' W7 R* H" p p = Process(target=foo, args=(i,))3 r6 z2 Q# ~; ]) ?: A, k
: u6 {3 A/ Q0 g. _
- 9 c, j; W4 `, h6 v% s
" h2 ~/ b5 b \0 |8 k" r5 m( E
p.start(), Y! G7 Q1 t" j
8 ^5 G! h' A' @* N. K$ s) W
) g* ? J8 t3 l* e( E* c, B$ @, r# F5 h- r( H- P# R5 K4 O4 T5 ?
print("The end of list_1:", lis)
* r% N: y7 a" C% G; W; ]5 c0 c* J% v! u" b1 W7 ?
. D/ e- g' l6 ?! E* V
运行结果: - 3 X* H( P: b4 v3 j0 _
$ Z+ n/ W+ P+ t! e3 r! RThe end of list_1: []3 z6 O; {2 }! w T/ a% n: ~1 |
/ y4 e4 t9 o7 h
3 ^7 d% t% J8 J3 w# Q) H' h- X
* T; T6 _/ |+ K; \This is Process 2 and lis is [2] and lis.address is 40356744
5 r* U! u1 S. j4 _
* w: e6 \+ b" T0 l+ r9 k! k- 4 J1 h# ~+ |9 B) r
& c2 B& L2 O( \* w5 u6 c/ FThis is Process 1 and lis is [1] and lis.address is 40291208
2 @' ~1 e, K( E6 [" Q* }& y0 S! J
' e, t7 y) _* T w9 W1 M3 I - 8 `+ `* o6 h: Z$ g8 \, ]
; C7 k, @, g! }
This is Process 0 and lis is [0] and lis.address is 40291208
3 M1 s- ]3 }5 S! y
- k- I1 p4 x! c: A( ^: e0 l6 o+ Q6 e
) }* Z/ f. }8 r y1 }3 \4 [( ]! |) m7 i" X
This is Process 3 and lis is [3] and lis.address is 40225672
( q2 w( B! p, I2 B0 x9 M2 @ s
I$ X1 Y9 X9 q2 F/ Z9 H
, T' |, R' K8 v4 h7 `1 n: ~2 d
This is Process 4 and lis is [4] and lis.address is 40291208& |' W. P+ g$ ~/ m- X
& n- ` r* n% w
4 T; {2 Y G; m9 q% o- u
可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。 想要在进程之间进行数据共享可以使用Queues、Array和Manager这三个multiprocess模块提供的类。 1.1 使用Array共享数据对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系: - ! X4 U0 O7 j( x; Y5 c. @ p
' R- C' r' f& V, ~9 t. H& ]'c': ctypes.c_char, 'u': ctypes.c_wchar,
' Y' V( n# G/ a i. p- a: C4 j4 p! ~- p3 m7 g+ y) k
- L" E, ^* c) z% l; `4 m% N8 b# ?) U: i% c
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
% s, Z( \, n# u0 ^) Y
8 @, ?- n$ H0 q% S
* A+ S5 a) c! K
% ?2 }& ]1 g i& k0 q; K' h'h': ctypes.c_short, 'H': ctypes.c_ushort,! y+ o) s( f: Q( h7 [2 T
6 \4 g+ C& E8 G: L3 K; f! t. u
- H7 `( S7 }. ^9 p& _8 l, E) n4 P/ k/ B
'i': ctypes.c_int, 'I': ctypes.c_uint,
; N" U- |; B, v6 n0 T! r& L( u/ W) U+ G1 D4 d9 Q0 \1 |
* i7 v- Z6 Y' E. r7 _- }6 n/ g9 r5 y/ M
'l': ctypes.c_long, 'L': ctypes.c_ulong,9 s% h2 `' O2 y. o1 A0 H
+ H7 d* E' Q0 L0 r! {
- ; X" l' ]( h. Q4 C* w! f: j
2 u+ ^3 W* w7 j( R'f': ctypes.c_float, 'd': ctypes.c_double) p+ o. @9 i& D
3 L: s8 w3 u) m8 ~
: K) `- C* M7 K( X. p
看下面的例子: - 1 B2 {; F2 T7 v$ J7 m. O: |
# f; Z& ~1 M+ t% Nfrom multiprocessing import Process
' N, x; e: |5 p$ o' @( d' a: O- A$ u0 \4 w4 Z" [% q
w4 R; N4 I7 q1 B
9 b) n$ m0 [/ ^7 d/ G9 h# Q" D, xfrom multiprocessing import Array
4 ~+ t0 J* Q/ L; E5 O% @" d$ p+ O# g- U
3 z0 K; V5 o! M0 j7 u) l: ]/ w2 L& i" r6 u) h9 ?4 ^
. l2 ? v$ t a3 e: z& t
5 P4 |8 r: u* B+ Q \
- : q- K5 s, w" a8 ^$ g) `: q
+ F% H. t* p4 e2 J& ]- o8 P
def func(i,temp):$ s$ ]( B: D6 L: k
) x# D2 X+ b& L/ r3 \% d; X - ( y; _; D# j' b$ v
9 |2 V: g. M# W- T2 R; ` temp[0] += 100
* Z& B, }( t+ r" d. m7 K' a
- p _. H0 N# k - ' }- [1 d6 r( }0 P
) n: s4 K0 |4 t; E1 K
print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0])
" X% R3 J( q; `! q' i/ v/ e! Z5 m9 R" a/ g7 S B5 r
0 {# K6 m) x6 I3 j7 m) X4 Q: y$ _+ K' |1 ^4 ?. P
+ z& n6 V- f7 a- _5 [3 `+ w% I
# H5 m9 I/ W. n& }- 4 [# H" ^9 ?3 L% y4 k a% b
$ `2 o; [3 |5 V; s0 L
if __name__ == '__main__':3 T% _; x! s# v8 ]
5 u5 e% ~8 t2 N - 4 C5 M, g, k, ?. d
r( ] X- A# l: I) |2 Q' `
temp = Array('i', [1, 2, 3, 4])
7 |" a$ f" I$ b+ ` X6 N/ G
/ c" E6 i2 l3 r I F0 a' ~9 {
6 j5 x) t* g L! {: G
( ?2 ]3 x% K n; p0 G; R5 _ for i in range(10):! X, v) t# O7 z r e3 Y' |
+ Y6 N1 v# p+ X i
. X9 P }5 V2 Y1 Y1 b' [, w9 A2 n( N' j5 e; E6 i; u
p = Process(target=func, args=(i, temp))
; B$ e5 l0 A1 j: b4 P8 ~ K1 @4 }& s x3 x2 r; J- ^5 H! q
7 Q- |$ n4 o& T8 V
$ K" O+ d7 v6 M p.start(). u/ Z7 C+ o! t0 I/ \+ y# E$ u
/ O: |, M9 p- H- f2 |
6 z' H8 k& [. f
运行结果:
0 S6 I' k! p: E) E2 O" G; q
' E* M6 a! v4 r8 L进程2 修改数组第一个元素后-----> 1014 Z0 Q& @5 V* Y& G' b0 r
7 |. }, \8 Q1 z6 T. b% q
/ f q' _# p9 J5 ~7 w$ {, b; Y
进程4 修改数组第一个元素后-----> 201
$ w( j* x) R) {- A) Z" k4 S, L7 h; n0 D2 b6 r
- 8 v5 l3 r& `- I" ?# H
: y9 B4 j; C$ A) l进程5 修改数组第一个元素后-----> 301
~5 x' M5 b8 K" X0 [
, ~; K4 A0 \8 m3 @ - ' B: ~- y, O3 Y5 g
/ l5 g# L7 W. D1 o
进程3 修改数组第一个元素后-----> 401: o' M7 ]- y. h# d9 z) _
( R6 R/ v5 h2 d9 G# m1 }# j
* Z( k; V s: `# k/ }- W1 n' ~$ l! O
5 O7 O: L1 Y r w& C. ^( O: u+ f进程1 修改数组第一个元素后-----> 501; z$ q6 t F) r- P2 J; e( v' v
& b: d5 r* a' u. A: @
1 U+ S( f, V. y% d* _& C# a( |" o4 M- s9 j4 I
进程6 修改数组第一个元素后-----> 601( W9 v$ z6 ?. G3 p$ S# D7 C
9 ?0 W3 `# p! x1 _3 {
% c, o4 k0 J$ {! O- g7 E
( x1 ~; j0 D& ~8 w进程9 修改数组第一个元素后-----> 701$ ^) g/ f5 a: Y9 N9 o
: }0 U0 J1 f0 H0 L. F
# i9 ~. k5 |; ]& t
, C" x( _4 U K进程8 修改数组第一个元素后-----> 801
$ W. |1 ?& U) e9 E" t8 l4 _. @) a6 C+ M) [( M6 l" b
- : l' o0 Y2 x' c# B$ g
) \& x' q# D9 U' B% G# e2 F进程0 修改数组第一个元素后-----> 901! q" ^& g' P! Y, S' N8 J! A$ _
6 v0 ]) Q& ]. `
1 V: i M d3 M5 L, D C7 L& D: t: G( k
进程7 修改数组第一个元素后-----> 1001
. r6 G( U$ S: [, A' o& l7 I3 X5 r+ \5 I6 n3 L) D
. J! l+ S. K$ i7 h! t 1.2 使用Manager共享数据通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。 - 5 q9 a0 D" `# d3 o5 m7 ^, k
- s0 R* ]( C4 T' h6 w# p
from multiprocessing import Process4 A& }" [9 \9 d: m+ z+ }
3 p) p) _0 W! b8 O( N' `6 U - - ?9 \6 P# Q9 l8 f- Z+ T4 u
. X' n, C0 n* `" H U0 o
from multiprocessing import Manager
* Q1 [& T+ ^6 F$ n2 C- H* e6 d f1 G0 V; ]
- ! E7 P, S, @; o
1 Z7 J+ S. S4 z. y$ s+ s8 F) Q
/ P. d% m! v" ]/ W$ }$ c) t
4 ~# k- h2 g, o) N6 c
- 8 O2 K# L" U5 ?9 \
5 U) l/ Z) `6 D5 |8 W; O
def func(i, dic):
, |8 ?- W, y' @0 A+ l3 f) Q# T! q( ], j* x# t- e9 b: t
& {# u; {) Z# T. `
) K' ]' ^' m4 D6 `9 J dic["num"] = 100+i. X# P% Z. s) b2 o! K
9 y' y9 Y" a, P! o2 b- 7 a/ W3 @. W% v( u
8 e8 v4 X+ U5 S+ s
print(dic.items())8 g+ a7 }: V8 l$ } g- w
% }( c! E- d9 Y. Z - $ ?- v) \8 Y5 A3 u
1 y* S! e- `+ `9 R" V, I
& W: k- m8 R7 K
( q1 F( X- `" K - 6 b7 v& D& a: n- h! {8 N
" k: J* c; `; C0 c3 H9 J' ^if __name__ == '__main__':
5 G7 Y, x+ k; x3 U9 g% P3 A# f$ P* t, L# o9 B7 g/ h
- ; U+ L3 A3 S3 [% y
' S; K9 {1 I# U7 G& ^ dic = Manager().dict()
}) P, f; b) u# h
6 r: Z. S x2 E' n& S& Y) U5 K - 0 L! [0 @- ?4 U$ B4 D) e7 Q! W
) b& X" q1 D2 b5 m# M( Z* N for i in range(10):: e4 g# b k7 B4 |' N7 @- \, z
" |1 T% {! D. l! {, N2 ]6 [* R* G
- {4 A* P. I h3 ~1 s+ l# K# y3 a$ _/ Z" q9 w
p = Process(target=func, args=(i, dic))
+ Q0 Q6 h% ^; F2 `( z4 n' @6 M" h! |7 h+ \9 S! `& w) |7 b
- # W: X! J- H7 U6 u. O
/ ^ I' i8 r6 w+ w
p.start()
3 t# Q( \. {( Q0 K6 [. m
# E* F& e6 z- `+ Q - 8 e4 M4 {! r& h: e5 D* Z) k3 O; w
4 a: m3 e" V6 R% Y
p.join()
7 ]$ I& T$ i: h; Q1 ?! \3 N4 g4 [, |8 q, o$ C' |2 Z- W5 }+ v- Q5 C
D& s0 w4 M3 i. |
运行结果: - 8 Z% B6 p# A* t) l; Q: l" ]- F
6 Z- R7 s/ H/ M$ W/ u7 J( s6 z- ~[('num', 100)]" u. L. T4 s5 c* t, L9 R h9 C( A# [
) b+ v: w, v e& ]. ~. ?$ @' G
- $ n) g' D+ ^1 J$ U7 U+ @* t
& R; q5 J t8 o( @: ] @. ?[('num', 101)]
+ \0 D% e$ e0 d3 `! x( W& c' M3 G* S3 ^. `! R8 G
+ D. z( ?, A1 ]: p' b- L2 a/ r" ~% ?- z" p0 z* {
[('num', 102)]
2 n$ c6 p! p% k% ^! q7 C' I6 v) ?9 k; y$ r5 @& F
- ' @& k: \. S! z
- Y) O" r7 k3 _& R8 W
[('num', 103)]. ^8 \) _; ?% u9 R( ?) V. ~, X3 g
1 X1 l- S4 C& Z1 H2 A - / f, s/ K' A7 T& e
5 k. J7 ?3 t& |" H# I9 O[('num', 104)]
6 d* S& U1 k2 l+ h, M1 v3 B) t2 W/ a
& U* d0 f. g% v; V% X3 C1 m q' } Y$ `6 i( H
[('num', 105)]5 i/ r! W" P6 M, z
0 X s& G: C9 \6 |) C
- ) E) r6 t& u* g" Z8 D) ~
. D: o7 g- `4 D1 O3 R
[('num', 106)]
/ c6 _ r, b, b2 e
7 ]3 d ]9 k# M9 S- S& ^ P5 E- N - 8 S# V" d, Z% h5 p1 W4 n9 N
7 g, b. [7 a7 d w) o2 {% {[('num', 107)]
& o% V: Q @7 _8 G1 N9 K
5 w- v4 Z3 D2 K; C: w4 z
5 A4 ?5 v9 F2 O8 g9 u
: [1 [* O* l- k0 \8 N3 E[('num', 108)]$ H0 ]( `+ {' S* C1 d; D
3 s3 X4 ^$ b% v u( O3 x+ I; _
3 ^6 ^# q$ l1 V! K$ j* @
5 V( E5 }/ t. @! W. }0 Y; L3 z[('num', 109)]
7 y2 ]8 ^% L5 ]4 q2 I- l& ^8 f" I+ G
. d$ d" Q! i/ s* Z& X! e
1.3 使用queues的Queue类共享数据multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示:
0 [6 C1 u G) R9 ` e7 ]) q! L, X) u* p3 f. B
import multiprocessing; O5 W) |4 q. V& _
( |& l* c/ o' d7 F) I+ ^+ n
- ?5 y( Y7 M# ~. G! e4 s
7 j7 t4 Q R/ U2 I, lfrom multiprocessing import Process
4 \0 Z* B( z/ L2 ~2 {/ t' Y: F8 j4 L
- # W! D3 F1 v/ U. l2 |3 c9 W
, y5 i& c/ V5 |- w. i, {
from multiprocessing import queues
) _5 C5 ]( s/ J3 I5 V
7 G" s3 R3 L! M0 ~) F5 R1 L+ v! l
! l* S- n& H# { o% {6 O: A# Z$ q# j$ V. u7 c1 V4 ]
1 ?- {8 M3 p" p+ N) y6 `
9 P$ o4 d9 L# n( n1 X% I J
7 D/ ]! U/ ]: |0 J1 C9 {$ j; u
# V9 V: E& v7 H1 `& {3 \" F2 t4 g1 tdef func(i, q):
4 F/ L5 x8 h* O8 }9 U. [) D& Y$ |$ J" ^9 z. F1 I* Y" c1 e |. _
* I: X( T2 `- E. ^8 R) L% ~8 `5 r% a {8 P
ret = q.get()
! ]- e- M4 }4 A$ J7 W: w6 i" B, ~
3 s6 c8 s& U& v5 {( ^2 ~
# l) C7 ]2 q$ Z8 f' d/ }" g5 _
4 O4 O$ l) [$ s4 v9 w* _ print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i))
8 X9 T2 B" L9 R# i1 j5 U8 X5 d6 y7 |9 |( y# r5 t0 v+ o% S% J
- 7 r7 n. x* V6 M- C( s
8 J+ J9 n1 V' v9 y9 I9 Q; u
q.put(i)$ l& P) b( b& d
5 T- ` z$ t/ @. {2 |0 e
- 6 ]& P0 R7 o% Z# W* a6 Q
7 f% b b9 A5 c' B/ s4 }1 x- E, J& ?2 j, n& ]
+ E0 f" O3 C' n) N
% n' n5 ]3 b3 J. K2 i! l! {4 y+ C" H+ H4 O
if __name__ == "__main__":. N/ ]' _- R! A0 i9 ?
& v; X/ y' t s; W
/ a: o7 V- d. X0 c) ~4 Q) o- S8 S% |7 X
lis = queues.Queue(20, ctx=multiprocessing)8 s9 Y: V3 M# f8 @6 J3 ], M# }
. C$ q i$ s) T6 g$ D- ' w6 a9 w; X, t* k% ~! C
( x- q+ @+ C4 N
lis.put(0)# o" _+ ~' c0 m! [& q' I0 ^
( ^9 `9 B. D% w2 |( c1 O z - 1 t. c; H) L$ Q5 N9 z# a
$ l' N+ I- U4 i) z. e* d
for i in range(10):
" w2 ~$ a* z7 Z+ [/ w2 r5 h' c0 Q' o- A
- : q- ]% T' [7 @( }
% b6 b% [4 V; e/ ?
p = Process(target=func, args=(i, lis,))
. y6 N" ~+ r3 D5 L( g$ _7 ?& E4 K4 X! e) I* }% l+ G5 [) ]( \
R, [! k7 ]$ h: P/ s! N2 R4 T+ h% s2 k- ~9 d, h' Z
p.start()
. W$ z, C: e( f( K) x8 P% I3 A0 M* u; m# ]
# ~" E0 r" x, Q$ {% M
运行结果:
5 T# B7 ~+ r/ f* P+ X0 b& g& Q {5 ?4 Q1 t9 x" `: g3 W- i7 e: j j+ K
进程1从队列里获取了一个0,然后又向队列里放入了一个1/ L1 ~1 F; ?# ^) A
, P( Y( ~$ B: D
% h3 u( T% ~( C- y7 B$ L
9 u# d) v7 i4 E+ u7 C* b/ S( ?. I J进程4从队列里获取了一个1,然后又向队列里放入了一个4
* u1 X4 H" K/ Q2 v5 t6 {$ C7 J* I: M/ m' R" i% F& p+ m, \( ~
* B6 }! M9 X/ a7 I1 l% H
/ S% W' V3 j5 {+ {; n* R7 R( Y进程2从队列里获取了一个4,然后又向队列里放入了一个2, T2 W6 Z* l+ v
; v. s: _6 {( S0 A1 E
& C$ M- O+ }$ u- G9 c, N) C! n2 C" p* ?# w C! W$ P
进程6从队列里获取了一个2,然后又向队列里放入了一个6
# n0 t' p- \5 y/ L. q) a
( \" p+ \$ E% J4 O
" Q. q, B, w! l6 D/ M. x
2 p$ D: |& U. ?; g# @进程0从队列里获取了一个6,然后又向队列里放入了一个0: \2 H( m; {/ x* b; @ E
$ T7 J0 B7 [2 e2 N. W
3 z) H5 O6 ]7 m# {
: o8 Q* ~ E& `进程5从队列里获取了一个0,然后又向队列里放入了一个5
3 C1 @6 x) a7 T$ c8 k0 b$ T
5 S8 H+ \3 n9 |0 W; a" F9 y! B- 1 c' C, s/ k2 @" }, _
# _0 T" ?# ~" `) \8 R$ O
进程9从队列里获取了一个5,然后又向队列里放入了一个9" @1 A- C9 X$ r1 E, U# ]7 O( K
+ f! B; P% \' H6 S& g2 n( Z - + P( V/ X) a3 s* G
" j# G7 s1 z* _* o
进程7从队列里获取了一个9,然后又向队列里放入了一个7
" @) g5 d) }6 V$ ~5 z1 ]& \6 ^# i7 A2 o* b' c; B0 U$ c. G9 w
1 l) _: b0 S1 V* q
6 l$ a1 f7 j3 `6 i+ O( s进程3从队列里获取了一个7,然后又向队列里放入了一个34 p; D$ x9 Y% }( S+ x" }- T* |
4 ]' q* _3 }, ]
- . x3 _6 e/ _ f( [& p
. ]) a8 |. ?& g X进程8从队列里获取了一个3,然后又向队列里放入了一个8
! C g* K2 m+ F& v
n: R* J1 |8 z
9 Q5 L" @; @; R: y' ]. @
关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。 2. 进程锁为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的,这一点非常友好!
H8 R5 H0 ?3 p- b1 ^; R# A' Z* M8 g4 `9 E# P' f, `( a
from multiprocessing import Process
* f K% W' T5 @- x- Q
* [' P5 T5 U7 f) D9 x% [. [- + y2 Y8 ]1 L2 |# y j/ v1 v! u
4 _$ Z6 x' c W9 ifrom multiprocessing import Array- y( ~# J0 l# b' o& c; Y) L
7 d( E0 q. H, b5 ^8 l' Y1 j
0 C& ?: _, a5 B$ |: W4 o2 `- a% o/ R4 Z
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
/ V8 z" n8 \0 I. D) e5 V
2 Q* N1 L" L2 B' ~, r
& |3 o1 w) @7 Y
- m/ F, m. i% e9 Jimport time
0 a: A5 Q! M4 ?- z0 b) g' X% p' A& D6 w5 E5 o1 q R
/ A5 _! r* p, X4 {8 A. I
6 @) w3 I# l/ v
$ U6 G! h! p& v
3 ^6 M& _4 P: o( L# \3 N- O
$ c( v+ d" V& q3 V) Y! h5 F) D4 f0 E# t4 e+ Y
def func(i,lis,lc):/ g; N" c* a2 V: E0 v* D2 F# N
4 C" @5 S% M7 q7 X; Q! ? c
8 [2 v, i: j2 o' }- L2 h+ \+ o+ E7 N' f; Z" N& M( w" ^
lc.acquire()1 [& _8 U7 h" d4 M: S5 Y& u6 S
- w- t: M- i/ I& @8 `; z
- - Q) p( f( L1 F0 V7 L8 z; P8 n
5 ]8 |/ z2 B% j1 f
lis[0] = lis[0] - 18 K$ z7 k) @5 G
6 T$ M" D) l3 n- q# `
- - {. f: |7 Z) s" B% z
) R2 R c% d2 L7 ~
time.sleep(1)( j; @+ O* b1 m$ Q
5 v4 n `9 Q( x$ m1 U
/ q X* x2 H5 e" A% |+ L1 o `4 b* C$ e) Y- R% n% t4 \
print('say hi', lis[0])
. q1 V" {8 D, @" `7 r" G' v9 V/ u
' u& b9 ?( d; V# X1 n8 R- ; p" U5 l; I& p) y1 W
- i9 O/ a4 p+ N3 O lc.release()9 X; j" a* j( {1 r* t' `0 W
) z: s! V0 V2 V2 _
- 9 l0 ~7 E ` `1 `. C( I
$ A* y2 y4 Z+ c1 ^7 C4 w3 ~
, A3 N, W- M$ x' c
! T# J+ E3 a5 u8 o/ P, h - " Y0 O9 N K* H$ o
* p5 Y' r( y/ s) {( \
if __name__ == "__main__":
8 c3 w& E; A' p% Y
/ [6 G1 V& n8 |9 K# l6 v2 E - 1 z) L: n% J, k* g' E
, m! q P$ _! V: Q array = Array('i', 1)
! m3 {: }4 X, _7 L; u: {! F! z: R1 F$ P' v4 `
8 l# ]/ B" s }, B1 `
/ d3 C7 l# t4 Z% k array[0] = 10
4 h$ O7 @; U. a. b# C0 |- w# T Z; a, D* `3 v: D" s
* L/ J4 c0 F6 ~0 T2 X: A1 t! w- |8 d$ w4 e9 H* W+ k+ x
lock = RLock()
& o: x' z- [0 ~ l) R' g8 n1 Y1 m. q4 L6 {7 k$ f3 Q
3 k7 ]" M! Y; n- J" B3 T/ Y% V5 \! g$ f) }; G, @/ X; B
for i in range(10):
; q3 i2 Z2 F0 s6 K8 U" D2 {
3 X6 v, X/ o0 `: _/ x
; @% a+ U+ G; I( H6 R# e! v* A, y* F7 x- K# [* p* I x B7 g
p = Process(target=func, args=(i, array, lock))
# ]& ?7 y7 n* \5 t( S$ ?% t; ~( {+ k& F8 H' H
- ! l" _9 Z4 S& K. ]% N9 h4 s" {0 e! h
% n, a9 ~: a2 x S' Y
p.start()5 I S6 u1 ]7 G% J
9 t+ C/ \0 @8 S* Z# w
" w V) `- R' @# a, Q; t& v1 P
运行结果:
, ?& L) x2 b8 T
`& w6 d4 z: p7 W0 y: m& H6 i7 n- jsay hi 9; e5 t+ J' B* b' c
4 M& E `: X! n1 p3 V/ Z6 t
- ' }1 `% m+ Y0 }* E3 B
! T* U' j# N {; Fsay hi 8
4 k# @9 E# ]* N2 s9 E3 ]$ ]- n
5 m |* |+ p& ~& d1 A7 A, a( R
' j2 h0 B T# \2 m
" {6 L# `$ I0 I% W1 \( U" \say hi 7
& k+ B* c [, l' J7 s O7 ]" m* t* N3 n$ D2 P
4 P) I+ S4 x5 ?) ?! g+ [+ }1 g# \$ J. |( |
say hi 6% H6 J' n5 F( _0 \" @& o
3 R- B7 X" U r+ e- ( @+ a+ A( }% n- C/ V+ v6 q
0 \4 K& D* P* V* m; d& j. g
say hi 5; N J+ i2 I# Y: K" S
/ k+ {. l C" V6 t/ H3 q; F( w! q
- 2 l3 Z( i9 |5 J; G& W! T4 m
# x$ ?$ s5 g# ~2 e# Msay hi 4( A+ U( P- i8 `1 s1 [; [
2 X$ [4 [2 Y" j+ W
- ! H. D( `! g9 l' f) F6 x2 U
( x5 }' V9 e; a2 M( jsay hi 3
) h* _# b2 F# k, G* a
; O z" A5 `- ^ - 5 _% K$ e V; P/ k6 w
1 {( {6 @/ B0 k( _7 H
say hi 27 w* h4 T0 K" @
( c! P4 \* N/ g0 i9 ^6 a
6 z" m0 y' S! Q: m+ I( F. X: w/ v! l6 n/ b
say hi 1
' v2 S# l# V. C. B; H4 e
4 h4 ]9 f$ V n2 T
: j3 R4 h$ c3 K+ a
( w5 F" N$ b7 T+ e5 Isay hi 0
. I6 Y2 }: ]/ M7 f$ G' j0 k1 }4 @" O. R2 \$ D, V4 M1 B% R
/ M* q8 P% ]: K- H2 q6 ]% E 3. 进程池Pool类进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。 比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。 进程池中常用的方法:
8 N1 p0 z4 K! _; y# `* I1 B4 U E6 W7 E! c Y) O4 p' Y% ^
from multiprocessing import Pool8 ^# d- X0 u5 }3 b+ G$ H( I9 X2 E
) v G$ A9 y/ h. M9 D; J: S! D* U* \* ]
& Z- h1 {5 v9 Z# `# r
' h( g, l& M" V# A; l3 K# c* @import time
. F* Y9 Y- ~# H2 u5 i" i2 [/ U6 o2 Y
7 y+ |$ p0 t, z' h+ m9 [8 x
' Q' h& O4 _4 u$ R2 @' {
, Y0 t# l( B. F* ^3 V# F- j* L( o3 _+ m, }
3 C) M) A0 z4 I0 D5 `' u. X6 N- ' k. ^3 o2 m3 U2 a/ Z
. x( d4 ]* V& Y" g5 ddef func(args):. F* w U0 o5 C. y3 e& u
! @4 u5 Y" ~/ K4 F: O
: }+ t6 _5 c2 v# x0 I& Y& v: V) W5 I
time.sleep(1)& `2 K/ s4 A/ o$ ^. ^, a
, @% }. v2 F7 @ l v5 A3 S
9 ~% j; r/ i6 n( L' c8 _4 ^# k8 c& r' V; f6 a7 N6 |
print("正在执行进程 ", args)5 j) W( k' }. S7 i+ i# N5 f
1 f5 Q$ | x4 R; O* t6 o% q
$ H4 T+ }0 b+ t( q8 n$ f
# T, |9 g4 S( J9 V" W1 z
) L' F+ Q! T9 F& V0 v1 m3 Y( ]6 w% O% Y
- % Q4 W! l8 D l/ P3 ~# G+ G
% X. y: }5 n! q' P" J2 X. Y2 C
if __name__ == '__main__':
' J6 N; Y1 U) n4 p7 {# s" e; a1 Y) ` |9 D1 u. i2 x% ?
- r- }. J% R- E8 w8 V% e
3 C& y0 ^1 }2 k
, O& P& Z' M5 D4 u
; \1 g" C3 R. w2 Z
- a5 P/ \+ E' w# _4 M) I( Z) {+ k# K7 t
p = Pool(5) # 创建一个包含5个进程的进程池/ V4 o2 ?& Y4 u2 b4 ~7 j6 d4 H
+ `. L, H5 |+ e+ ]. | B6 l* U3 I- . o) L7 E" v' Q
( [" i3 B* ~& i0 L3 \4 F9 p0 N8 |) D8 j/ z6 H
: ^+ y+ p- F, _1 z
6 e, G' E# A0 D% f- X3 O3 E) Q! t% R3 {
for i in range(30):" H; W/ {" k# i* {# \2 x
* ?' p. n+ a- `4 s& o8 q- ( R& O% P7 I" [( f& q4 c" s
+ e+ j9 r' h2 P6 v. m
p.apply_async(func=func, args=(i,))
3 B+ z% q, V# y2 m5 R
+ g# d& r8 n0 X8 y, k4 A0 s - $ \# {; W% C: P' {; X( y& f! C; a C
) t3 h* O: v8 T9 ~4 r, m- F
) `+ O% C% R8 E3 O2 u+ ~( k; a# Z& u. ]" u
- 7 u; z! U6 |; {. p; t
7 F$ d8 h% x& t$ V p.close() # 等子进程执行完毕后关闭进程池8 Y4 [3 g+ y) p2 j9 `( d M* q
2 J2 C) A: D# e* F& q, D
7 A$ W& j3 Q- }! L/ R6 q+ k
/ D. p1 {6 a# {6 U2 t # time.sleep(2)3 n5 i1 r# b7 p0 U% f& @ Z3 Y: p
1 A: B v0 Y7 S9 E: }- 3 m) m5 H: ^) P" i
6 G' d' R& ]2 V/ V3 S A # p.terminate() # 立刻关闭进程池5 |5 n* o, _' B- i) t
& ^1 q5 r, \. _9 \/ P) Y: R, `3 K6 `, y: } - 8 |7 m4 x; Y! h. t" y! L* D
% v0 P& I* n1 O2 w& m p.join()
; Q, ^+ y- z5 K4 U3 R* n' m4 K/ E' x( y4 a. {
% h; y: I( y G( p* w s; X , `: q4 f8 s9 y$ A* {, Z
请继续关注我 * _1 X$ D6 [' J1 _
4 M; m* O$ O$ V/ L
|