4 Y- C6 h4 F1 n4 N) j6 |& g爬虫(七十)多进程multiprocess(六十一) z2 J; V7 T5 U3 o6 [4 R+ ]) Z" x
Python中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。 另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。 下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。 - 4 Q: f n) X& a; i" \& f+ |
3 L/ N; `* [1 A! t; e$ _$ Vimport os* s* X% S. v1 g) ?( r$ T
3 \7 p/ o- M e; g7 U7 p
- ; h! K1 O; Q8 F6 y- e) \$ b' s- `
w- @0 L' y" v) P7 W9 E
import multiprocessing
3 y Y7 L9 x4 }
, j- W2 n$ ?- Z - 7 R8 [1 T. N( U# U! t4 G
$ q$ \6 b5 L# `( f% K3 ?
: ]7 L7 m W2 P0 q$ v+ H5 o: M7 [% {- |' S, B$ z
9 r6 C+ h+ ?& O: `# F% C$ V$ n0 v# b% A" E0 I0 F- s3 s) Z
def foo(i):
; L" S( l; h: E( d
" x4 B1 X) j* @6 D- 2 T E5 r& D+ P1 F. W
. |" j2 P) x' s% v$ X" |0 M
# 同样的参数传递方法- Z3 ?8 Y; k5 J# ?3 W+ M
8 H# Z" ~3 ^" \) Q5 i8 q! C% Y
( g* y0 @/ O1 |9 Z( X7 q4 g0 w9 R; Q2 A2 d0 v
print("这里是 ", multiprocessing.current_process().name)
7 @; c; ]0 w1 D3 w' w3 T' V
( g8 h+ T' w5 Y1 o- ; R) @* C3 P2 O/ N
5 p0 l: X) S+ v6 {4 A* r7 r$ G9 ` print('模块名称:', __name__)" k3 z. w! k; @! e& X6 k8 v
8 C, r8 r" G8 Q: `+ `. Q& w/ h - 0 v; r, ^6 g: [1 ^
9 w" P: |, ?, n, X$ k
print('父进程 id:', os.getppid()) # 获取父进程id
) y2 _2 L4 a2 g" N2 A0 D. J1 y5 t: D
- 5 z: W( ]0 l6 M
4 H, t! C* p9 {0 d' a print('当前子进程 id:', os.getpid()) # 获取自己的进程id
" E7 k1 ?0 D S/ }" T& H# D. X. Q! U9 q- e) @) I( O! x
- ' I+ U/ o/ E2 n* _) M/ y5 H6 q4 y
F3 n: S1 X9 A. e6 E$ t- z print('------------------------')$ }1 u4 Z5 q; R i9 H3 I4 I+ ^
+ Z0 g0 j2 B% I+ _8 N
8 o5 Z6 Y8 @& U% v7 i( A6 A1 ]8 Y+ Y9 n
( D# @. V& _+ t" z2 f/ T
$ b% s" E3 F; L: C& U- ' I) j; e) L, z+ E2 `3 |
P+ o" D$ ]- U; I- q
if __name__ == '__main__':8 b; d/ C& O( F' T% Z4 v
0 O9 T3 w* Y- d+ ] q2 Q
- 3 U! K! k9 u: K/ p, S8 m& V# V. k, m+ ^
, B& S8 ^. Q; X {
( T8 r5 l5 t1 a7 I$ Q0 \- a# d1 J) r& g$ D7 w5 D6 |9 l+ B! h
6 [5 n. @& @2 G% n. Q* x9 t
) K, s$ B' ~7 ~8 ^* V* m for i in range(5):' z" n- I3 p1 R3 a' w0 T' @/ R
0 p9 Q3 @: e" C8 r( i5 L- + P. Y- b! Z0 d3 q% k: {
# t A Q) I! |( h b8 H2 l% q7 g p = multiprocessing.Process(target=foo, args=(i,)), E4 h& x; d( W c" x* l! L8 ]
1 d* b# ?( `0 S* A) ]9 b - 4 f4 o$ [( p1 n4 P
' Q2 Q+ k" p7 |* D3 b
p.start()
+ A! n- u9 p9 P) g
8 D& Q" r3 X, z6 `" ^# h4 `1 j
- ^6 |4 A" l# K% \7 d
运行结果:
( i1 ?2 C! J6 Y( f( C5 ?* Z' o8 C. N) f* j
这里是 Process-29 v; z0 {1 P$ v
, N4 K( J( S$ i" S
- ' z+ A& T# C6 P; Y# k3 E* } Y
5 x! H: T$ C1 p# \7 w+ m! {
模块名称: __mp_main__
5 A2 h: T8 v+ W9 C1 A6 N% G7 Q, Y7 t
: i4 t- G: f' ?9 v, W - ) j. n- B+ n% R5 I
3 [4 H J1 n& w$ \9 l9 S7 i) j) {6 ~父进程 id: 880
/ I4 M. O: {+ B; d$ y$ F, N* I4 ?$ [1 K, d2 O) T
- 1 H3 z( r$ r- d: G( e- D/ ^
8 s& \7 X) |5 y' M5 Z
当前子进程 id: 52606 D# P5 B" |! F- Z$ A
- H$ }# v8 d1 w% A& Y4 H9 A - - I; q, `9 i1 k0 k) d
$ L7 K' j: @1 k5 f% `
--------------6 |6 t( ]! [6 F* R' s
6 {3 o4 y M$ |2 a5 {3 l - 7 F7 b% l' @. U* q+ ~
3 H9 K U2 `; ?6 c, p8 z& d0 E- B2 D这里是 Process-3
) V1 c/ O9 J9 i( G7 J# x* p4 A
9 f, Y9 C8 w) x# n5 U7 t) h [5 M - 6 c9 {' a) u5 [ o9 x8 `
* w: L* H1 k1 R/ y4 s模块名称: __mp_main__
8 M6 A* C, m: D7 a5 h; a$ C" g5 w# J4 U! z2 Z8 W
4 _% H2 L0 n, c) R& K$ E
6 `6 l2 m/ f0 q5 U* d2 X' ?父进程 id: 880
( I9 `7 v& U# z9 B, E O6 Z
( n" v, V, U: D6 |7 t
0 Z1 x3 n3 F- ~8 z
2 l. D# L& R* a6 E当前子进程 id: 4912
Z9 W* _& ]$ j/ V/ c
0 J9 H6 l/ o) F4 A8 w: D( @- # f+ N( |/ V3 K6 }0 `* ?7 l
* R0 X5 t) L- k
--------------( r; p2 n1 H! m( R7 C. W( C
* z# y3 b/ H' \6 U3 B6 B( O2 t - 8 y6 v% D/ B l) S9 ]
0 w6 J0 G. [6 L5 t; }
这里是 Process-4: {& ? \6 G. h: Z: d( e5 M
7 _) Q! F2 I* t% _- n - $ Q' [: ? E$ q0 I: q ?
+ K) j" K% e5 W" m2 H" |
模块名称: __mp_main__; I' R- w5 P7 f' a, ^
6 t) A% {/ C& p, B
- 7 _! `' O3 x" g8 a6 K
* e+ o9 S) n% Y/ H: z( f
父进程 id: 880$ M/ [+ G8 A* L
# Y* Y u% P# [+ E% V3 V( I
0 ^: N7 p. p1 ~, Y0 q) A' a, P% ?3 V7 z; W0 } d7 e% r
当前子进程 id: 5176
& Y% K9 o( b; `* ?9 q" u# A0 w3 t( H- ~. u. |
- - {' [+ k% o8 q8 d) j) a9 O3 v
# ?; O# J* L; l! u0 u1 Z& y+ J) Y--------------) h$ E+ W$ t: y0 `
C; h9 U" N5 d0 [' ^/ k
( s2 d3 c6 ]& h1 R) H1 Z* _& I8 o& }* f+ e
这里是 Process-1
! q0 p9 d g% [9 {' X9 |6 b! E6 V5 Y3 y5 D9 F- u+ Y" ~) \( a3 _9 \
- : c7 l1 W- d+ `. ~* l
; a8 _& v7 Y: e g
模块名称: __mp_main__
' ~1 H' [7 v0 M& @0 `; V; `' O9 d1 D
- ( U3 V; z! r. C2 F
( y2 a! M6 p+ h% T父进程 id: 880
2 @2 F1 ^2 W" U5 Q+ L$ A
" D3 x7 U4 d' G( } - / w2 g2 v `+ O# G
" x) V' c. w# a O当前子进程 id: 53807 @" e4 U1 l/ }9 I# g0 \
/ ~( b* J+ k' A5 i" D, ?1 r
' V z- U) x+ I( h" h3 P
; ?0 X$ X$ y" b! `& P: B--------------# @6 X: _; H% ]$ O9 K! y. j
+ |! H% ?$ j0 V, _9 [2 a5 h/ g
/ ~1 v* o- H& s! K* ^( X V( ^4 D" f& `* W: L
这里是 Process-5
, F5 j8 p l' n% V* H! ?
/ \* d, R: Z0 d. ?$ Y d' r
# Y4 r' {3 k% S S/ y, S, m2 d% t) w: P6 u/ W+ a$ l4 s
模块名称: __mp_main__3 X8 \( S( Q0 }- y
- d: A2 D4 K% J9 H5 A$ x0 M
7 W+ o# `/ J! q
# g7 n- o3 K: ?4 p' V父进程 id: 880
" t( c7 }: K% D+ \$ g T8 Y4 o. h2 y( a1 e+ E
- . \3 h( s/ r( }6 y; @
4 v7 r; k3 Q3 [6 e* r$ \/ p当前子进程 id: 3520
( [7 `( ~( n; [, l, |) ?& \ C+ c. g# C3 d1 x6 p& s( \* L8 W1 E3 \
- - |, ]5 W/ f3 j) E
/ p1 o- w* o% U$ F4 ~# x--------------
" x3 F* @( y' B- Y8 ^% H8 u+ [% D8 Z! e& w$ s! V7 D# w' Y
: D$ p. ?0 ?5 X S 1. 进程间的数据共享在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。 创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。 下面我们尝试用一个全局列表来实现进程间的数据共享: - ! D3 g" U( C/ n8 x" w5 o
. N4 n. N& Z- m" I) P
from multiprocessing import Process
& X* }; N B x* g, Y9 ?( w+ y6 j$ O1 o4 A g$ {
- 5 d8 k" Q$ ]+ {
9 p0 \+ m. F4 w, j$ j/ _
- r ^; s' ]3 s& G; ^( h# Y4 S- I$ U( P$ g1 F
- " M/ f2 c2 a3 O7 m
" t, l9 L) Y4 H* x
lis = []! Y1 u7 l Y; ?4 d6 R8 h
& r' ?; m' t2 ], j ? A! r" Q - % e( L: E3 Y2 q3 P8 }" `8 A
. M- p% D5 b* G' T, [
& Z4 r, C+ }2 }* [8 b- j5 i2 {* ^; `! ]$ y& n4 t) f
) e1 ]7 Q& \+ h' J; o1 }9 y' u8 F3 e) @8 C
def foo(i):
9 ^2 f, E- |+ D7 L. @* g
( A( _, Z7 }( s! F
N0 L! u, x6 m4 s* w* C$ R$ m1 p% K: H* R
lis.append(i)
; ^! N. N! n9 c3 E) P0 S3 D/ c) I* ^( S; C/ g V
6 f5 V. o8 ]) G( Z3 X8 L4 k5 U0 J4 ?1 u4 B$ |
print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis))4 K! b" ? \. k. J2 C6 B
1 o3 x; @+ S9 J ~: ]: a/ {
( ~# c) z' {% T8 ]0 l+ C
& q- p# p5 j$ \0 T O& L7 M
9 ^1 j0 A' `6 e. }& t+ L7 L# B' }# `; D+ F7 l5 m# ~
) {/ W$ A- c( A0 c$ w& G: D! _6 V* q' [
if __name__ == '__main__':' P4 Y8 i9 r7 j8 E, ^* r* g" N
* k* ^& y( d7 v' z# P9 v- + m+ ?( _" D' t
( I! t3 H( F$ m& K for i in range(5):
6 i. D6 y4 m5 d* a2 X
" `1 _3 A6 N3 R1 V
* b- E! `5 Q! G H$ f/ L* R" q) ~7 f: ?/ i
p = Process(target=foo, args=(i,))
: j s% ^& ]7 G4 L, @ [
7 y; I, P5 d& \7 D" Z& c4 H0 b' Y- a. t5 V7 y' R5 Q- F
2 F6 X/ Q8 x% ^7 ?' b3 K
p.start()
o$ W8 u( o+ L1 O) s
7 y x9 M+ w. J$ J7 Y* B7 `3 X
* d! v( w; X T0 @ K" o, Q4 h* z% v- G/ x6 I& c$ \! ^+ r
print("The end of list_1:", lis)/ K# ]" I+ G* s. d$ Z( K
. L3 R' J# [! G8 Z- O, v% J
6 X4 i" I% G1 W" K
运行结果:
N, |1 W' U8 C- v7 o
h6 ^, B7 c: L* _' O" |3 o* `6 zThe end of list_1: []
8 y8 m6 s8 h$ d
9 U) I$ X! n/ S9 H( Y
d9 L$ B: q! F2 \) I& f
" c$ O, r- ?0 z- UThis is Process 2 and lis is [2] and lis.address is 40356744+ U8 z& q8 z5 U
! a8 D; y7 i2 z/ N5 V7 o& o2 T
( U5 I2 c% V! z. M) y T7 k( j; w5 t
1 [" r* J! h0 `- {This is Process 1 and lis is [1] and lis.address is 40291208 K! I; W1 z( j, f1 p- O5 m
' j; e |& X* g; x- 3 x' r- f2 G% F6 h9 Q3 t5 Q
. K& |6 I( d5 P! h- O
This is Process 0 and lis is [0] and lis.address is 40291208* Z( f# |9 F+ X
& ^5 R) V6 J. A& F. L' ]# h: W( T
$ \$ n6 ~3 j- H0 S. _+ ^ S/ H9 T% E9 F2 p
This is Process 3 and lis is [3] and lis.address is 40225672+ H6 d" X2 I- H
k1 V. n9 g# a, @
- ' H9 @5 ]+ s6 h+ m9 |
+ C3 z5 r$ l6 [$ e* f! ?2 QThis is Process 4 and lis is [4] and lis.address is 40291208
8 l. j/ l# z: [
; Y1 V, X1 T# t& _6 S8 T4 @0 g2 l6 j: g N1 b; h' R5 z$ P+ m' U
可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。 想要在进程之间进行数据共享可以使用Queues、Array和Manager这三个multiprocess模块提供的类。 1.1 使用Array共享数据对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系:
5 S6 z) u' J2 E9 a5 J0 Z# ^7 h1 y
# |5 \2 M7 u8 P3 }( R% X'c': ctypes.c_char, 'u': ctypes.c_wchar,! w y& N2 g3 h( L' {- K1 g- t
5 C3 r: O/ z9 o6 }' j
- ! f6 R* s# R" M; V$ b- @9 F
) m) L: _4 B8 g& A. U# D
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,& T0 A$ B, h( f z' X& Y
) w( C: x. F, Z2 R/ I- K- f" G - 3 Y/ x0 f& i; m% A
6 T V4 y; d9 u j- G9 R( l4 r'h': ctypes.c_short, 'H': ctypes.c_ushort,. ~ t. ~7 s/ w' {0 C8 M' k7 N
7 K; z4 `" N' o+ s% F. m
- + o7 @7 R4 W8 B
1 \2 Z1 o5 v& d# C8 d
'i': ctypes.c_int, 'I': ctypes.c_uint,
1 R# U: ~9 I5 r O/ P3 _+ J+ \ r3 H( Z; T7 g& g, E2 {: d
- 1 f6 n6 L6 F, G4 e3 ~. k2 x" y9 j( V
; m- Z; H X, x1 V( w& b
'l': ctypes.c_long, 'L': ctypes.c_ulong,8 h, Y O5 {( U. x) ~
! b/ S6 A" x+ a$ n# E3 a& A9 J
8 @4 ?% L" L- ], _. c0 j
! s5 m# W0 {0 A/ M$ f'f': ctypes.c_float, 'd': ctypes.c_double% Q, c. S! N% F1 g3 M0 n$ q
1 N" v# G% k# |
: I3 l/ F' J. D% Q+ A1 z
看下面的例子: - - {4 [# V/ A! n! o, ?2 h9 o
1 g5 ~$ y) V& p, x- I6 q( ?2 r
from multiprocessing import Process
. C5 f& r: P9 y) ^! I0 L
% `0 E' s. U% ]1 E" P1 d - 2 |2 l/ `, W; l0 u" G+ k V, J
# p3 R) X! H* ^; s0 n7 V: \3 v1 B
from multiprocessing import Array. ]1 J& Z# _0 X& Z' g% Z4 v4 V
, _ |5 M8 N" n- X# @
5 W* n' F& ~3 H. W6 l& p5 x0 @9 G* y2 f9 @1 R: h+ R+ Y: x
" W) R8 |, V6 z0 E
! V9 N; B `) y& `' `/ ~+ u8 B; V( T
. a$ i4 A9 @' w+ F( j |1 L- Y! D5 L/ d
def func(i,temp):
( Y4 P. ?3 x1 z0 U/ k/ g
, t6 a& N9 A, O# F
; j) @6 i# Z. B
1 b" x# [3 i2 U- b _1 f temp[0] += 1007 b/ D _5 Z. Z, O! n* f
. y. l# a3 t3 J _
- 6 a, A4 E$ @) ^- P7 K% L
, { o& A2 ]. V- n2 [ T L print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0])
+ F( _' r& d E7 n: R y; F7 I- G, I
* D4 t- T5 x5 M9 P& m" K
# v- O3 n9 e z O$ R% P d. t; w l6 Y$ _& n
' b& g6 T( U+ J- w/ ?
% W/ B4 ]/ Z; v, A
. ? ^' j: }' d$ \if __name__ == '__main__':$ n6 C' x! f! Y3 X4 H/ H& B
1 p6 }8 K, l3 o9 l. b
- 2 P! S' D( S4 ~$ P0 i9 o
. F* R& P7 ]% L" d( e9 I' D B+ U2 ~
temp = Array('i', [1, 2, 3, 4])
! }+ z# [* j2 z+ K( P# m1 ^ z1 ]
- 9 `9 [1 e9 W5 m) ?% H( t
& g! ~9 U" o3 q. L for i in range(10):
& U7 a$ e" J3 B& i% e$ d. _7 R; \$ k/ {( a
# b. q& q9 v* N( G) V( ?' s8 D' `, T h6 a
p = Process(target=func, args=(i, temp))
2 L$ Y! x0 u/ x" k1 r3 P+ m3 r1 q) W" t4 [) C0 t) T& @7 F# k
- : x- A z# q7 X5 A# O
. }& z& v2 m: T3 z
p.start()& V- }- O* \$ S8 h2 q j' B
& ?: v! g- R; n) A' @& z# q9 q0 J3 X- \. d/ ^0 b' S2 q! u
运行结果: - % L! L" v' p, G# {' G
+ {9 l% W# O6 |进程2 修改数组第一个元素后-----> 101
# Y7 T3 Q9 l+ K0 ]8 ^; S( u! n. ^* R) [+ C' L; l- G1 D
+ z: K5 p/ X7 O" w3 `! U$ t Z% K U$ a- l0 _
进程4 修改数组第一个元素后-----> 201# R3 v+ {& g8 s t2 Y; f. n
* A, o! q) O' G y1 C- 1 r% y8 _; \2 P" X6 y# z
! u8 u$ R, L. f6 O进程5 修改数组第一个元素后-----> 301 o" y8 P( p& c! A
/ _5 ~9 }8 o% K. A( a" n, I3 ~
- % ^& g" O: {& W+ D9 Y$ ~
r7 R# E3 W& v( L' y8 G进程3 修改数组第一个元素后-----> 401
# C# \* Z5 h( l X+ p/ X, t; W& W- L S
8 B; Z0 t+ E4 N2 v& Y6 o$ B
2 r6 G" P, x5 j; D, g2 {进程1 修改数组第一个元素后-----> 5017 X3 z' m5 F6 ]3 S7 z8 }
7 m2 Y% M# n8 C- O% T
3 Q: o* v7 @3 ]/ Z% F
" J" _5 l0 W8 I' p进程6 修改数组第一个元素后-----> 601( e* B' ~4 y6 n1 }; L2 L+ Q
- c& y/ F: ~6 g# D# x0 Y- U# G' R: h( X( F3 _
6 `, m, u- F3 B
进程9 修改数组第一个元素后-----> 701
! L) C3 ^( y" `4 X6 {: b: R
8 I+ K/ x' o" J% a - 7 u% x7 B" c4 J+ T9 s
8 K3 S3 l! u' Y/ |! l0 l
进程8 修改数组第一个元素后-----> 801
5 M& C$ d! |" G& t1 I% N. b9 N+ c
: P8 ]% [$ s2 C M - 8 N8 `; E- A3 J3 |( t& |2 B
% ]7 M: ] L- j7 B
进程0 修改数组第一个元素后-----> 901: y( d y* W/ Q5 C1 j4 ?
( P8 ?1 s; y- Q0 B0 _; r; ?( v( `* C
- 5 ]9 S1 _; [( c
3 h% F& @* L" Q! u进程7 修改数组第一个元素后-----> 1001
& n7 K6 B+ t' _7 \1 D! @3 {- U/ |# M8 ]. _
, u2 X, a# U+ `- w1 O
1.2 使用Manager共享数据通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。
: @1 @: y8 {. d* v: t" t, I& N4 H) \3 j# ~& o9 T8 G
from multiprocessing import Process. f; E( ^4 b, ]0 v( K1 _# g# J6 ^
" J. D+ L. S/ H( F2 x6 M; v
* z+ a( U% O+ [' U/ K! m6 v1 H& }3 j3 {( z: v* w$ K# a
from multiprocessing import Manager& F5 C% K E+ p% F' }) s' i
5 w9 @; w+ v" \/ E. B! h! w
- 2 u2 c& u+ _0 h0 [) x8 Y8 r3 o5 v
- c. T. N0 z/ P t. e! `3 v: Q* g1 O* f K1 p0 |% b e# d6 T" L. L6 L
$ T% s8 g u. [# |: A: Y/ Z, c - 4 i% t* o8 b+ j
& _+ u; y7 G1 R, J2 Z) Ndef func(i, dic):0 M; O6 r0 Q# H* q, M
9 x8 T( r8 v+ V; _6 c7 K& E
6 v: O* [: o; c5 b3 e
6 ~* d [$ Z7 \( H2 {) l9 Y% @- f dic["num"] = 100+i! g' k; z: | _. |* J- u
: a% [# j f& @' H
- , N% l5 {5 r1 {
2 n- d1 g* u0 ^1 v3 w. S* e1 U2 V
print(dic.items())" t* r) I" q8 C
/ B6 g) `, q' u3 Z& Y6 B. f - ; c0 A( @, U% }. m0 ~( ^) Y
3 ~ P: z& D8 W7 t/ P) X$ B
3 f8 p; K. D- X. U
v. J1 q# j2 `0 d - 2 b$ k, @& m5 r& H
/ J& c: D7 J; G2 ?
if __name__ == '__main__':3 k' d! d5 n+ Z( D1 X
, V# T& e. Y. U) B6 ^0 |
) G0 Z) W3 \( ^5 @/ N. Y: c/ O4 o) i) H2 v7 j7 I# d3 E9 y5 z; d2 `& `
dic = Manager().dict()
7 M$ `0 O9 O2 c
0 R# y0 h' M( P1 t' B% `
* c# d7 {& X. _8 e* s E3 E! K; q8 U' T
for i in range(10):
/ `( C' t( Q9 @+ I: b
L, @+ A9 D. Q' k; X" C* U
, }2 Z, ~4 F! G" `9 \9 M+ V' x0 b( h' L3 t0 f
p = Process(target=func, args=(i, dic))
6 u3 j- g g2 e6 o
7 L& X- b1 C- ~0 m0 |
% a& h# f. j4 O$ R; b4 [
! h, \5 {- ]3 [7 F1 K5 t' G( ^0 R p.start()
7 ~2 v! s2 [1 ?* P: r) p
& u9 ^% |- Y; j) F
5 c- T+ K, c& L" S8 A7 b4 G
" r- p: s( E; F, ~; F! X p.join(). {; ?# P0 J' o# a# I1 l
8 E# j! F1 B5 p1 l, f% \8 s0 r6 [/ x3 p& c' D
运行结果: - & g* L3 X5 p7 H+ N4 B/ g2 F" _
0 }, N! c. u3 f7 x. _
[('num', 100)]1 Q" z: ~- w) L$ i
1 V( W) X& Z; y! k2 W% c9 D) A
; T( ], p7 I" q8 g3 c& |3 x& P* L, g7 G& C8 v) f! B8 t+ S
[('num', 101)]
" J9 o# X E. s8 J
" o7 m2 Y# ]4 R- ; Q1 T2 M) V) _6 Y5 _) L
0 }& V6 |" ] B$ m, y5 `
[('num', 102)]2 L: e4 d8 U8 [; p
# C* F6 f; k% m$ X# X2 D! t, l - - V: t( q6 v7 C/ Z e* O: ]
+ t! _5 E% P- q) g3 E
[('num', 103)]( t7 O# L1 X, Y. _ c( s1 \
0 |! O! H. M. i& H4 ` - & K4 V8 |' m) E: s8 V
) n; Y% {9 r7 N' ~
[('num', 104)]% }: `3 p$ [9 c: [/ E: e+ W+ {
- y% x/ `, X9 Q- }+ i4 j$ a; a
- ' M1 C4 B4 V3 P0 W) c! J+ ~
! v/ _8 b: w, E* V9 O[('num', 105)] ^ ~# j9 T" G" I3 V
; `3 |1 H6 m9 q8 P8 y
& n0 i3 L8 ]4 ^ X2 E; g% H7 ?; j4 ]
[('num', 106)]6 L2 _4 o# A# d
8 ^+ _# `& p! B; i4 @
- - R3 h9 a) x! ?, J' O, ] @& _
* C4 P; u) q- J; d) d9 u4 a8 V
[('num', 107)]5 j& }* O" X0 V5 k t
/ M! X- Q# o Y' J' G1 l% o- }
3 Z; [# w' Z B1 \7 E$ a
" b, t6 |( A& m7 s3 @% S[('num', 108)]
) t* ]7 S/ |2 t4 N% s
3 h6 Z; {2 k+ G
4 e8 i3 h# `. B9 s) u |
& G: z& u B) e4 Q[('num', 109)], [6 o3 w8 D0 n1 }" ]4 U* A
' D( w+ W, R3 G9 F( p& l; `9 U- n& Y q
1.3 使用queues的Queue类共享数据multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示:
, e* ^ u4 w* i7 T% n# I3 k+ o- R. v9 u4 B+ @- m* Y
import multiprocessing7 T" t6 t" r) S8 p- ?4 E8 z
! b/ ]% U: c4 Z- ! I5 q x. Q a1 H9 K
& E8 m7 E$ S) x3 }from multiprocessing import Process* F, r2 z6 Y, r
+ t+ h* A7 g M* [0 j$ o% Y - " u) m+ w: X5 H
! M7 z v4 E* Z3 Yfrom multiprocessing import queues" I" _4 R- }% h! c+ u+ A4 r
* Y% c- r+ x7 C* ^ - ' q3 y5 t) d& I! c) ~, \ B! ^
' i' z+ l5 b% z
" N v, x0 a' _* n
* \, @- ]5 J# r- f! Y, t - ) C$ {1 i$ J& B- d" ~
1 c* s8 D! G' ]9 h% r& Y% C1 V2 ]
def func(i, q):
7 _' G) r$ ]" d. {; f2 |9 w9 I! e- {, I2 j0 \( p. \
5 { g- \2 L- `- C9 r1 s: O# C1 Q
ret = q.get()
: Q. v- P1 i" U/ Q$ \1 x: H
& X- b& Z7 }/ _+ ]: l& V
8 Z3 z8 d6 E( J6 n: ~# W, [2 F+ h i5 u
print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i))
+ @0 l. Q( o$ t$ D, @9 v7 V/ n% K1 z. m- j G( X5 j/ v
( g7 X. V9 k: v9 L+ g, X; H' G5 H+ r( e$ @; O4 K, Q
q.put(i)
/ l4 s% T1 u: |, n8 ]6 T: v0 D( _5 G5 S7 Y( l; v
# i' S. g/ x/ f4 I$ c* n
9 {# o/ o% U/ A% W1 ^
% n: S( x+ P; ~, f- n$ @2 _# i7 J, P0 w& |
8 u* \9 C% K; ~. M7 Q! @' h$ x1 d" h4 Z+ Y* W- i; D
if __name__ == "__main__":
, r0 @4 u; n+ E9 Y: z6 o Q# A2 s! j4 [9 K* H! ?! F6 e- p
& H, @& S% R( W5 M
9 A% F. S* X/ a/ |( h2 V2 W" D% F lis = queues.Queue(20, ctx=multiprocessing)! ?& a- ?* H# p
4 \; Z3 v8 o9 X- 9 X6 C9 s9 N! V6 p# Q* v, \+ I% V
0 {) c3 G! X! j lis.put(0)7 \* s0 h ~, X
8 x* w/ z: I: ]# Q" ^. t
- 4 I1 h$ b3 U" A3 ~- y$ ~ i
4 U+ O" n$ ^1 } for i in range(10):, A; g6 V* B; H+ H
& ~( |# }0 f* o/ m1 Q `
+ G" ]; v' Z: v H4 m" {4 x! I. ^5 }* K( [+ e3 u4 S+ Q" P
p = Process(target=func, args=(i, lis,))
% ?+ Q+ d3 Y" q" _% y B7 K7 V7 B, d8 h, Q5 [5 a% b
5 g% }- r$ c/ O
+ T% `; @- n: ?8 M$ F( p p.start()
; Z( o: U6 V u0 B) P! G: ~
' T$ ]; u/ v1 G. c% n: q8 b3 `3 J; f9 J' @
运行结果: - % s+ B1 U+ `( R1 V: B1 @& A @5 ?
" @# ]5 P9 [: M% f% [# d- j' t进程1从队列里获取了一个0,然后又向队列里放入了一个1
B0 _0 t) Y9 l' h7 M, h4 N, V7 w) \5 w8 {9 n4 H
- 5 i, d) X6 x2 D( \1 R4 `! f
& P' e8 o! w, W3 c9 |7 [4 O K" l
进程4从队列里获取了一个1,然后又向队列里放入了一个4
1 T& g' B. a# p" N$ I2 \/ \$ f, Q8 B# I
; u5 }4 e: e! r/ S, l) F0 ^5 C" T5 W/ v+ y- E& j# k. x1 l/ f2 `/ K
进程2从队列里获取了一个4,然后又向队列里放入了一个2
6 T% W& l$ ]' o4 ~9 @3 I6 K6 [ E+ a
- . q, Q) O9 _5 @
! |: V9 @1 b* C8 D2 G; P7 T' y进程6从队列里获取了一个2,然后又向队列里放入了一个6. G0 ~# l: J3 k. o9 M& g: j
) W' Z# O" e8 t; ~3 z# {, C; L6 k
2 J4 g4 K' }/ E+ l/ z! `, Q. }! n
5 z" C0 o$ u4 o8 T进程0从队列里获取了一个6,然后又向队列里放入了一个0
8 X6 t: b1 M, e/ s' }/ f; @7 ~
" q0 i$ E4 V8 f* Y' j* b
. C( r5 ~, ~; m2 _- z& x/ l! Z9 q# X& K# S6 A! F
进程5从队列里获取了一个0,然后又向队列里放入了一个5
! q$ {% s9 p6 [& X1 G6 g. C
( q4 f9 `5 ]8 h B+ ]& E- : f" f& a$ S3 D- ~( [1 }
/ G9 C* x- W- r+ v8 Q$ @6 p o
进程9从队列里获取了一个5,然后又向队列里放入了一个9; t! q. M# {4 o- i. C, U, Z
% }5 G. R4 N8 K8 o/ o( r0 |, O( M' V - 5 x5 L& y' H$ V8 a
8 J( I, _+ k7 a6 K8 e( U7 g7 G# ]
进程7从队列里获取了一个9,然后又向队列里放入了一个74 ^# R6 t& H3 [4 O! `6 o
/ i; m; u9 ^0 v( o2 @ - / W- w6 {: @9 o4 V( L
- M% G. [. B8 q- @3 s+ u- P: s: E
进程3从队列里获取了一个7,然后又向队列里放入了一个3" y# Y& G$ Z. D. b: |
) S; v/ k5 n" v8 l* m. R
- 2 j1 W7 V I( I k+ t" O/ l
3 |: s7 a# C1 b! m6 l L进程8从队列里获取了一个3,然后又向队列里放入了一个8: ]( I' y& e: F( s1 [. R# Q3 _% Q# [
! X7 u2 W' ^, `# h, S* J
6 Q5 o" m5 G; g0 p/ c) j
关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。 2. 进程锁为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的,这一点非常友好!
4 O( ?/ O' ]6 z4 t; ?0 \& z; ~+ Z
. A6 ?3 M$ }0 n! _% m hfrom multiprocessing import Process
( w0 R/ B4 [# g% [: ]
/ N" [, q3 k5 j Q$ |/ Z
4 R# t. Z1 ` M4 O g/ \) H% ^# p% m5 J: \3 l0 Y+ t$ r; J
from multiprocessing import Array- M7 O: E1 b% V
e: u# Y$ p0 V. B) S `! E8 L4 h
4 u9 i3 y" ?6 @. Z, R. _0 ], r2 s; _+ A5 s; \
from multiprocessing import RLock, Lock, Event, Condition, Semaphore7 [5 I% @& n+ x: @3 F3 ~2 C6 Y
0 J+ w3 e* N. }$ J
- 6 u1 p! z0 A# T( V8 O4 `/ E
' p& e- e& o* k' I+ ^
import time
" h# x. J/ ~. |8 ?0 u$ V! ]. S- s& A
, K% L5 }; O/ _% u3 C
. v, n x" Z- l: x
6 K8 k, D0 L& w c
/ M' x: o3 B9 x. Z! H$ v& ]0 Z" }. T' q6 `/ v) @# |5 P
- s% Y% `% f# ^ [& @8 U
4 J4 E0 H( H" y7 k: t
def func(i,lis,lc):6 z) [9 {" v/ E8 P7 B
: C# X6 |' u0 L1 o& n6 o
- 5 q( Q, \: m2 g- N& I
6 m, @. Z- J1 C- d3 ? lc.acquire()) q5 ]: D3 V% i& ~, o
' E, H/ W, ?" r0 U' Z/ w
6 p- C+ Y9 Y7 y0 Q! |& i) C# J1 J' z& y5 t2 n5 {7 f
lis[0] = lis[0] - 1
" _) ^( T; h3 ]/ M
) U2 [6 D$ y! c0 c4 n1 ]
) w E( I6 _9 t4 Q4 K4 _$ F/ G6 p: O0 J
time.sleep(1)
* G g8 }- R+ ]: D; r8 Z, w. p' F' v
- : s; K0 N: ~6 G% i* h
# d, C5 I" W. F* b5 P3 \
print('say hi', lis[0])
1 h/ t- D- k( D9 u" l* c* c6 n; g$ M9 g/ Q9 z" V& N% a9 W8 p6 l; J
- " ]) |% G$ n" I8 r2 v# {
0 D) s$ T/ `& v3 E lc.release()0 ]; V2 b5 C7 f: _
8 j0 ]9 i6 J, J2 j; S
& T4 G: Q2 X: ~: o( p& m, p& ?* R1 T4 j( }8 d) i! C8 X0 i
: C! f( Q7 z- t }1 u# ]1 G5 }; D1 r b7 P6 M( {
/ t3 s3 K! _- w3 m( Y0 |8 b! A3 [0 P3 z+ x! D2 C
if __name__ == "__main__":
1 ]8 y8 M4 {1 W' V* g" c5 |7 W7 U/ H/ p1 m/ ^
0 H" @/ B W" } `. J9 N
; ~% }1 |7 X6 ^4 c& q8 v array = Array('i', 1)$ T9 O5 [3 g8 C/ P+ G" a$ g4 z
" _4 r! x9 |: K+ a) J0 i- 3 |7 h/ A# n7 ^- L# w
" j* K! w" f7 r0 T array[0] = 105 x/ B1 q. h n% a8 u7 z* b
j) y$ m2 q* A0 w4 m G
0 ]8 R) o, u5 o' B- q+ B+ D) A& |7 x/ l7 J6 C& z+ B
lock = RLock()
- k8 H% u/ c5 f4 R9 h7 s& l+ P9 j0 ^9 a' e$ R1 V
- 6 m. g2 e" `+ l/ R9 O
/ F$ _+ U4 c; \) b" O for i in range(10):
/ Z( [3 x( F4 U5 \8 h5 E7 M: U! `: P/ t4 L- V
; l9 o5 f; X3 z0 Y' C# _+ \
2 `, y, v$ [$ c l, A5 @: G ?; ^+ }2 Y p = Process(target=func, args=(i, array, lock))' }' R3 p0 W9 P" K3 `, |
) c2 U' ]1 N" x! v: b! ?& }
- + E' ~$ v& B% b) u: q
! y. I# v" J+ @* W O1 d( M2 v
p.start()
9 j0 G: Y H7 m! I# s0 P/ r6 i: P4 B, \7 f) Z2 R
, q4 G. J7 g; v6 s7 e% ]3 i7 E0 a
运行结果:
6 {; ~7 e B% g9 i, ]' q1 L5 F7 H( ]1 R
say hi 9% f4 u1 X; j% m
. [2 G! A% ?0 D& J/ `1 }% v8 b) J
- % L3 y4 |( g" {! ~4 B% N7 S
+ Q6 e7 U: s- G' Esay hi 8/ Z6 ~7 \: y, a4 N
$ G$ g. R6 Z r! k# Y$ V - 0 {* ]4 X; k5 K( S' E
) s. v ~" a2 i5 Z
say hi 7" y2 h3 N- G' E' k& |8 v! E5 y
- W( M3 g9 k: T6 z: P - $ V+ y8 q" [! w
0 D. m4 R" w5 ~! b
say hi 63 q( ^8 D3 s! q
5 P! d, v3 ?4 T/ l) ] - + K' u) m0 r1 }5 o, v' u5 Z
1 P6 ]- E0 t, ^4 d
say hi 5
7 o& ~) Y# J' }! m R, p3 p5 t2 O
- " A; z# Z* s6 W/ ^, M
3 y- v/ K" h* n- N" Y' c: u( ]% Q5 isay hi 4' i8 {5 b6 z0 k- @6 H
+ J7 W) D' D/ x. ~( G2 z8 Z( q( g
- / K9 ^: ~0 T6 C( ?+ r
1 w( A& u. Y( ^. _say hi 3, z, s: v4 @2 e
/ ? J: i; z. E, g! u
- 6 v+ m7 r+ J$ z2 N4 J* G
7 I6 J/ y) a" q# z. I% b
say hi 2
8 s; [0 {7 C- r- R3 M5 f! x
( ~2 l- a- i8 \# B5 }
, y \$ J5 V0 L: |4 x5 P' H3 h8 @1 A1 a" A
say hi 1% X+ |9 ^0 f5 b6 [' @
6 ^; H5 R, C# |" p2 C/ g6 x
- 2 Q6 t X. I! F& n5 J1 M8 B$ J( w
* M" \" D7 p. z ksay hi 0 |1 s3 D6 n" a4 C1 [% Y
n7 o" p3 h) w" _' L5 L
3 w/ |: n; k, H4 m: c
3. 进程池Pool类进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。 比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。 进程池中常用的方法: - * P3 ~: W; U, o: ]: ?" p
1 G0 E3 E6 B% I/ i' z* x" ~
from multiprocessing import Pool9 k3 ]4 ~9 @' y) @: b
7 [1 C6 b% R/ u! g4 ]! I1 m - & a3 [- |/ U" }; i8 B3 M, x
9 D. r6 _$ h9 t+ O; T9 o" kimport time
* n2 H" @% S9 F$ a" M m/ s% P! o2 z* y" Y2 p
4 }8 I6 L2 B+ @) v
: B3 U, u/ r* x2 D; e4 L& L5 `
6 c: D/ u ?& I" k( }3 g! u9 E& I) V5 Y' L1 [* O5 H9 J
- 8 P- v. ]- @6 N) `- m8 j
0 C& Z0 L0 h& C( v
def func(args):# r- S$ w# c( \4 B% o3 i
$ | \% _+ \/ _ h) L - 9 B$ Y0 \: h' z n7 @3 G8 B. D
5 Z) Y6 c) s4 N
time.sleep(1)/ D3 }1 n$ L* t+ {$ c8 J
) `" m u7 L/ ^% R: t) @0 G
- 1 O# `; B6 { Y8 d( `' D
w4 l: k% z5 J- b/ q6 w- A print("正在执行进程 ", args)
+ E5 m* n$ F* T0 v3 ?! B' F( o+ Z: W( \. J" y, r: R
" n8 G2 b% n" g1 ~! @. ?- R' R3 S
/ i2 |3 D+ ~ X& {- s% i0 ]( g( v) Q% U
2 F/ |3 Y+ S$ f+ a, Z
: w( f, z7 ]1 u/ _$ J8 G1 D8 \3 ]
if __name__ == '__main__': E( g! M$ D5 `3 u1 z/ }
) K0 ?: B$ t' H; R0 l* G7 m/ ?
0 n( l; \4 l* ~$ o$ q
2 c8 A% M0 j1 y. a, k6 }6 W) L/ L$ N4 x5 C1 e
6 f# R( `1 u8 G; m* J3 ~- + {" x7 d6 C2 T" m# [, w! C
) e2 M" |, t( S5 i Z p = Pool(5) # 创建一个包含5个进程的进程池
8 S7 t% `. ~6 L9 Q2 u
3 ?0 W( u) e% _& N, r
$ q$ K; D4 Y7 Y2 L1 t2 ^8 ]( F, |& ]" T. @
- V4 Q3 J ^, v6 D; B: ?7 |5 d' p: g4 O/ w. w* m& U
/ M( C- H6 G! [) g. D& j# r( n0 [- ^* N' J$ U3 a
for i in range(30):- J% C9 B" `* }; ]" z, M
/ ?7 ~3 |6 C7 ^# o3 e# N( ^
- " O( E: l# x( o2 ?) ]6 o
5 o. J6 p$ t7 {( ~# ^+ ~
p.apply_async(func=func, args=(i,))' R" ?1 l; B& K8 y+ F- {
) j: J0 g0 W: C' S7 F; c - 1 ]6 y: o( }; u! l* }5 H3 B
; [8 l$ \4 v/ T* k2 T
, I" m+ ~' | N+ Q' f
) A+ u+ S7 x$ |/ o' k0 m - - ?9 P* r9 d; i8 N$ [
% f6 }& {: m' @8 l% @
p.close() # 等子进程执行完毕后关闭进程池
! v) A" q& U a6 g* T
6 a/ K% z7 [+ E+ B% m* X( u1 r - 2 i. ?$ M+ s8 w2 B/ d4 k3 R
6 m9 h) S+ P$ a) B& G: K/ h. `- y # time.sleep(2)
; ]6 Y: w, R: Z/ A! f6 f+ C6 f! J i
, F4 P3 N) i# o+ k+ ?) i9 K$ d( D8 @- L l0 c+ b: h3 A
# p.terminate() # 立刻关闭进程池6 ^# K% Y1 A& z- f, P! B' P
, y+ E6 @: u) b! H- ( s. c* A, J& @& e' F
. w" }- m6 M, j% U/ ~
p.join()
, R% h1 v' K p- w% W
- g3 F1 N0 F# `' _$ q
" H7 g8 X% }8 ^0 o+ \" D( l, k2 m
' ]# d# w: ?& @/ @$ t请继续关注我
' m+ c1 u+ P L ?7 {1 U. m: A8 d" M) Z4 ?5 m* n/ P
|