5 k0 ^, p, E! u( [爬虫(七十)多进程multiprocess(六十一) w4 S. @+ _8 C6 |
Python中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。 另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。 下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。 - 3 P% u$ s0 I. j9 X2 f/ V
5 F6 `" J, ~- d4 m1 `% W
import os. r$ T' T$ P9 A* I# f+ L0 \
0 M3 G' p7 ?5 O% l+ o( R2 s' s
1 E+ w! m: a( F7 z" N( k, E. T8 ]8 _$ O6 n/ E: y
import multiprocessing
% ?! {9 a0 J" s! U( U Q
3 f% x2 S; L5 o
$ E+ q! b, _) t% Z q4 n1 [
1 s! f3 w' _8 @4 \. J8 w: B' v- j2 h6 h
7 Z _' z# b( {# \- ! @1 K9 Y# x+ B b' A" r
: n8 L& o1 n0 X* A8 o+ jdef foo(i):+ v, k, _$ ~. P" @! ?1 `
" n/ J& S% \) \/ ~
9 i: V" ]) x4 v: O3 f6 B5 o5 i' D3 w9 E
5 n' c$ |0 ^9 X" e) h. U. }8 e, u # 同样的参数传递方法
. f% j7 P0 G- q8 }* \
' j8 J6 G; A. C* ` e5 B- 5 Q8 ~& h5 C7 Q- b: P
$ P3 _- P7 O& n: r8 X# O9 D. p
print("这里是 ", multiprocessing.current_process().name)
, \9 s, W n0 Q( m; N- H
6 W+ A8 N/ S% P( C. I - ) ] d* u/ W, V1 J) d. O: [+ P
; a9 c# P" a, m) k$ L% ^! R4 N& {
print('模块名称:', __name__)
# ^/ J, l& d- N n7 U8 K" j) X" K, d. L# K
. m: g. z3 _ B1 W; W# \% ]1 U
7 x" _7 v: n$ X print('父进程 id:', os.getppid()) # 获取父进程id9 Z, x9 X2 W- D3 Z
' N; \6 O. R2 G# I. ^; c5 U. ~) r
- 0 e7 {) m L/ E& h ?2 n9 m/ U
# ], B5 N0 E# @4 X print('当前子进程 id:', os.getpid()) # 获取自己的进程id
V0 Z* A3 {; p" C$ s) Z
1 }, ~, [: d( B0 r& _6 j - ( c. P, Q$ e+ K6 W0 M7 F
/ [/ e4 C' t# I$ o& e
print('------------------------')
* L6 q, s( ]* ]* N
) Q! B9 f2 U: s; W1 t# m6 o( j
$ s& }, G$ W* h' x
; G) Q! a( _8 E% x5 ^' L
& D% Z6 `2 j7 D2 T+ ?) i+ Y/ ]" k. z$ F0 \
- + [9 M2 _) z E
/ O0 {5 [. Z, U" jif __name__ == '__main__':7 j0 ]) A& |7 F. J( {
" T. G w3 G! I& o; ~# v& R
1 Q8 Q8 B! |" @: `9 G# x( A
9 g) g# d# l! x+ _( t7 H# c; G0 O7 m! K" c
* T+ u c `' k9 }2 f f* A' ^
4 m- ^: u5 m+ I" k! [8 E9 X! r4 ?: T9 H* a( F# f
for i in range(5):" ~( F) d# y6 |0 ~- C
- I' a# ?1 D2 k+ M* Y
9 B: {: i% ]# @, m" h/ R7 m0 |8 o, w6 x% e+ H
p = multiprocessing.Process(target=foo, args=(i,))
; e8 V4 n4 R' H! J$ |# F; _- m7 ?* T% U8 C; N
- ?, G( P# s1 G' ^/ g( _; Z2 X' w. X7 A w2 m
p.start()
4 a- g) Q2 d( _# O6 ]" ]# l0 U7 p1 i9 P; |* S
* K# [8 P) W8 s$ G
运行结果: - * ?2 G5 K8 L2 r5 O5 s8 k) }
+ r( m: Q6 w! M% f- Z, ?5 I这里是 Process-2
* H" ]& A' G3 b! e+ N& `' j8 d" }/ U% e7 n0 s
- _6 j5 Z' U, i& H/ U7 z0 q: V4 E
. H6 `+ M: @+ J模块名称: __mp_main__1 x4 I- _# P# _, T$ U) j
* }* _2 X8 {9 W( S - 2 ?) ^) u3 R( l8 K7 v/ }8 h; S0 K5 H" ?
0 Y$ K9 N; B. E
父进程 id: 880
% ]4 q6 n" A& P5 {9 d# Y: V. q! K. H$ J& m
1 H4 s. J+ j, k9 Y( j5 P- h) ~$ j# q! ]- ]9 X
当前子进程 id: 5260
. A3 d" i) c& i9 l" H& ~1 L; Z0 T6 e$ N: @1 y' O- T
' }: s+ c8 l) w8 s+ d0 f; d& D/ @2 C- z9 B
--------------
% f3 S2 _: L* q& X6 o$ Y9 T$ p
# O1 N& i+ d4 b8 ?9 w7 Q' w, W- # H- W" L: C" N R8 H/ V
J% C$ V( ?6 ^. X
这里是 Process-3/ c8 e+ n/ M6 z& F& z; ~& T
. Z/ Z0 D( V! G( z/ | `
@' _, [$ e: B4 v& z
" T: K5 @6 R2 q" j n% G2 g7 q模块名称: __mp_main__
5 x# ^/ [( Y, t* h" e
6 M: B& I2 U& ~- & u5 D4 j2 g9 c9 ]3 x$ B) T
$ ~4 q, [% j/ b/ ~) H
父进程 id: 880' U7 V! s' A6 p) v# @
$ V! e6 y$ p( ~( J l! ^1 h
- + Z# _; b5 i" _$ p1 x5 h: [
- k- E' L; b( d$ Q0 ]5 j: x当前子进程 id: 49123 T& R/ [8 u* u8 J2 N: N3 S0 M4 Q
/ D3 F. z* H( M/ t6 j - 0 O+ w6 M) i9 @! R
5 R* Y( j1 m1 r$ r( ?4 U--------------
+ G4 @ d: X; s1 A" r2 ]# Z9 ]8 M( w( j7 z4 K: V' N3 G; t
' G& t0 L# T/ \5 l s5 ?
+ Z+ a4 Y% O, e这里是 Process-4
9 T, k5 F. T5 i. W7 C# j8 g7 r$ d$ D; E
- 2 z: Q, ?2 i5 `) I0 L
9 f2 X7 V L) V( e0 ?
模块名称: __mp_main__% F) @7 Y0 j6 Z/ i$ M' L
; W; {" C( o/ q9 \; ^
- 8 V' `5 v/ _* m6 n& L2 y0 w
7 H# \& q9 D0 i" _3 y5 ~. h4 x父进程 id: 8800 z' E9 C7 x9 A- @: X. z& K
5 R5 _& x) h8 l0 n4 A
+ y( T# Q5 G. r# M) ^9 @; C, j9 D& h! k- W- m4 B, g, J# ^4 T! A
当前子进程 id: 5176# U* U' h9 h7 B5 f8 g8 K
; e1 i, e+ x. o# I# x+ V+ A, `
- 2 \6 Z+ I/ G0 Q7 g+ [
2 R X- W3 W% u& W--------------
9 e- |2 W0 o0 e9 }, T. S" I9 P9 s X. j
- & ]% J( s3 B* h1 A7 R
; f3 a+ G' S } M% ~这里是 Process-1
8 S/ o* G1 M1 J$ X- f4 A/ {4 M$ H1 z+ Q6 m
( B2 `& [2 v% }. d. t6 A& x, w' Q) {' X" f7 E$ n+ C- T
模块名称: __mp_main__
) m5 E* a8 C( y' l! H5 ?/ c+ a5 v1 p! \; X1 j& c! O# C$ [
& l; ^ c3 U& S1 {9 ^( t1 ]0 ~4 X' o L. k: Z4 H1 ^
父进程 id: 880
Q; ]; f) U/ U1 g b# L- v/ w
2 t$ Z6 f+ A5 Y
) G7 \8 B- V# O) D6 S: K* q. i% b0 g( n
当前子进程 id: 5380' `% i3 T+ F f4 J
5 T. P3 g$ y6 i: }; ^
- & M1 R3 X1 ^8 A$ I2 q
1 o4 j1 n( b7 c* m& F--------------
7 S! E+ |- U9 w; ^! h) M7 u) }! N. ~' N# J/ E& F
% u0 b% c9 C; h r4 @+ B& U5 u; S9 ]
这里是 Process-56 }: M2 Y, a D9 _
# E" p+ n+ {7 u& @- {4 A
- O, S7 l! i, J3 o* O O# g9 i
! y; R# T+ s; w3 X8 d; T+ C模块名称: __mp_main__5 I% s+ g" n0 N- ?1 a4 P
& y+ i" w- X% ]2 r+ A* x$ A; o
- + L" [ k1 \- r2 c3 o9 R; [: S
7 S# l3 p9 a; r- e父进程 id: 880
0 P5 i% F; Y7 R2 i6 M) K
8 O j3 U. S, h9 d8 Z+ } - 5 |" @# d( q2 U" `( I4 g1 h
' D6 L% D9 S2 ~2 t3 Y# N当前子进程 id: 3520: _0 G( O0 s" C: x
- t2 D+ x+ L% o' D6 H5 R
- * E9 G" v/ m( C6 B! q& q
& q- ?; V4 A" b0 y" _' r) [! a--------------$ N6 v# I: G$ ?
, W8 Q' S. Z' U, ?$ e& `. {
+ Q% J8 @8 \! Z5 Z7 i) p1 f 1. 进程间的数据共享在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。 创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。 下面我们尝试用一个全局列表来实现进程间的数据共享:
% f& d: ?9 c: k7 Q+ j
5 a: r. R" ~$ F9 Q8 R2 g* Ffrom multiprocessing import Process
1 ^( m7 J7 [9 |0 _7 l2 s7 B4 k( p' R: P( J' K: G6 z! B: S
4 J7 N3 J# g6 Z K* k s
2 m+ ?) o7 z! x s' \6 i9 Z$ h7 Y5 h* V/ J& }" j* b, T6 P
; D' d2 P v* e9 f/ Z9 Q% ^; r
- ; v8 C0 m/ k9 _/ z
$ i6 f0 w! d5 K Hlis = []. `( c, l; b% e/ t
& @( f/ f7 [! E - 6 [& f# w* B3 M T6 m
$ W% x# Y2 T( k0 K
- D( A3 C; M6 r& Y, j
F }- b9 J( Z9 O4 F
5 H- b; N [5 ?+ U6 a& `0 l+ ^5 S
9 u3 E# z! M5 P6 _def foo(i):
! _* l7 u. G2 [8 | V( r U2 _. g! z1 k( v; g2 c* N
7 r" {# O+ N* i7 ]- L3 D# l% B) h
' E* g( J& Y0 J9 n& i lis.append(i)4 e$ ]* U9 o, R& U2 L* _/ f) e- J
% w+ a8 O, C" A/ J
- ; o6 G' u5 S# N$ d8 Z( ^& J
2 H( S M6 A. ~ print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis))' ?* H- M% m) x* F$ Y& Q, E6 l4 e
6 b( e0 N! K7 Y v4 W2 h' y! m: h: L3 ` - 4 q! X8 G$ e& A/ p3 D
- Z6 ?* M8 B/ v0 L
6 m5 z& L( _- W# [
% B& V L6 ?6 ~+ n) H
f" ` ^1 d' K- r* f4 G! L8 x0 V8 f# {+ I! Z# t
if __name__ == '__main__':
! z9 ^& S! G& J7 |
" R8 ?3 [* n! }- 7 ]6 E# D; q8 ^* P3 C8 O6 T9 ] U
) e, J. x3 u, J& k for i in range(5):% G# D5 o" Y1 w( a% a5 @
l/ O+ T1 f& t! I: J6 u: J
# @% P/ U2 G* |1 F9 @' ]! H$ J T% p. ^! [& F' M8 L3 \) | K2 O. v
p = Process(target=foo, args=(i,))
# }: g' T6 Y2 b) ^3 T3 U0 Z% ]& O; F) E# F9 T5 ?; A5 x
- . _; d/ {* [5 [; s% A( f6 W
3 l2 n: r" K4 H2 k7 G. \
p.start()( b3 \1 z j h$ w Q
& |# _; g$ \5 b8 f" _0 Z
* n' _. g. A/ ?( K7 P! ]
6 q* F3 B. p* U/ e" F: p( I7 a$ |" { print("The end of list_1:", lis)
4 b# G0 y- ` k& q8 t; Z6 e1 z b( F( X! P2 G' Y
, b y1 b/ I- `1 O
运行结果: - - ?7 j& `& b+ ]1 ]3 E% W+ ~7 s; b
$ ?' G8 H+ h' p% Y7 P
The end of list_1: []: w, e$ o2 u( _* l* x; O6 l
8 k \6 @2 J9 g4 a
% |; x# k; R" L3 n
3 H3 f* m- f" M- k! MThis is Process 2 and lis is [2] and lis.address is 40356744
# f8 |. Y1 b1 }3 b
* S9 g2 X8 Z w. [
9 R3 n+ I+ \9 ^) R% I4 A1 E8 b& W( Q: M$ N
This is Process 1 and lis is [1] and lis.address is 40291208) F; r! b1 M2 y0 ?8 l
) L* Y- F& l" P! ^4 `9 \& ]% C( V) L6 x
1 X2 {" F, e4 D1 ?1 v$ [& {3 P* `, n
This is Process 0 and lis is [0] and lis.address is 402912087 Q p, l' Z$ i) s
0 [6 A- K4 D, c! j, _0 y ?
+ y! p' n# K6 K( `
# A. `% G: y2 [; C2 M! {This is Process 3 and lis is [3] and lis.address is 40225672
T6 ?8 w' L$ K) A5 t. O" T2 u
9 y8 ~6 X" ?% i# [: w- J1 j- / U$ @ n3 n" T/ e
3 e, I: z; y! A3 t2 R( P/ ~
This is Process 4 and lis is [4] and lis.address is 40291208
) ^. v) w6 T b& y8 |1 X, M$ |( C, Z- m4 x4 ?& h
$ G$ t% H9 k* y4 v+ h, N
可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。 想要在进程之间进行数据共享可以使用Queues、Array和Manager这三个multiprocess模块提供的类。 1.1 使用Array共享数据对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系: - . w% Y0 t' C2 R- N2 w1 z( H- b
# ~+ M- R6 Z# e+ r- G4 ]'c': ctypes.c_char, 'u': ctypes.c_wchar,
5 |) J0 {1 x$ `! X. l0 p
# M* ]+ X$ u/ v1 v
1 x& p+ s$ ]# o( g5 [
- r. I/ |2 Z" m5 {1 u'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
- R D; V# f- V% N3 b1 O$ p& P) m9 u: K1 ?% w0 K8 s7 y7 P
$ ~+ \9 f5 M% q0 }6 b$ d
) a' c% p# o1 J: c0 K8 Q'h': ctypes.c_short, 'H': ctypes.c_ushort,
7 z: ?8 @" I) N- z
& ]) q* N& D( w' F- ' @( [* f/ Q4 O* d" L+ }% P5 j
9 f2 E# z2 {; o7 N8 j
'i': ctypes.c_int, 'I': ctypes.c_uint,
% Q* r) j7 J( R% U# U8 y+ J1 \ S+ i* V: t! h+ o/ x [. |
0 N" p2 _% z) p6 U# ?, \' V5 @6 z, X; A9 W& O5 _, i7 y
'l': ctypes.c_long, 'L': ctypes.c_ulong,
& V0 Q* J/ i3 U/ U* G+ Z( Q' h+ O9 _0 A% T2 ?7 _
- 1 S+ d `' o# w
- `) y7 ^* S: I6 z0 k
'f': ctypes.c_float, 'd': ctypes.c_double1 E& h5 E+ e4 l* ]3 q0 h' V
E# I( Y9 y* K
8 K+ _( V, m- r# A
看下面的例子:
% d3 v" ~) o3 y- Z+ q3 B
5 Y5 f8 y3 a8 B1 dfrom multiprocessing import Process7 m* [9 s4 p; ?. b
8 @$ I! v+ c: X2 I2 F( X
# Y( u6 Q' [0 W/ @2 H8 H$ M* }9 c5 b- v" a$ _
from multiprocessing import Array0 ^5 i) W' p& {/ u2 h
9 y, V+ W! ^ m' q5 w# `# ^( k
; ^1 e z F6 T8 ] p x5 s: c4 Q! P8 N# O9 K! \9 ~( Z2 w! w
# L8 p; w2 G" i4 Q& Z* W
# s: N( [0 m" |$ n P+ l
- & u! m q" a. o; F. I$ O* \
" d. v5 ?9 h/ y$ P) d0 bdef func(i,temp):
2 n3 Q4 C' l8 H3 b
. K1 I9 k& J" X7 v% w5 S1 L& n
. C& ]! Y& P+ |
' A! g% G3 P* q1 I5 k+ q- ^ temp[0] += 100
2 _% P' F4 z6 H" r9 J; j% p0 d* O5 c9 I1 n, h
- 9 L# Q8 j0 d4 O- {* R
$ s0 V u F4 Q5 v
print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0]). l( t8 d+ N4 k! u! @5 h
/ ^# _4 b0 H5 a8 b. W0 {1 Z
- 7 k# Y- _* }' k1 J0 ~+ j, x
$ n* n4 \$ \; v7 Z% R
5 @( | h) ~( A
# z {2 v9 k1 T9 m6 M: | - , _, R( k& ]( v; _6 R8 p7 C
% a( N4 q' \2 L X+ U
if __name__ == '__main__':
9 E+ X* E6 S; E4 `* D: ]$ K& L( [7 m
3 R) W8 F9 f$ n$ a! B" T6 Z( n W' A4 p3 N
temp = Array('i', [1, 2, 3, 4])
- O# g7 \. S" e/ P$ P& [0 P
. L( e4 C7 M% X$ M, @7 ~' P( q- 7 y# P0 A N. N
9 @+ R6 `& A/ D' q6 L+ S7 O9 P for i in range(10):/ y) ^4 q4 Z( q4 @/ x) y
6 f3 \$ D1 T, C/ @2 l$ g - - \2 J* R9 Z7 K0 q$ H5 K1 Y1 W
/ R4 w6 ~# l% Z) z! }6 n, e* Z* B
p = Process(target=func, args=(i, temp))
+ m4 W& h$ o' j$ P/ [8 F1 V+ E5 K- z* ^+ N0 L0 q0 }: l
* w! m$ a% E; o" C" @
9 J z: i* \! M, |& ~' S p.start()
! q/ }6 A' |( O$ { h/ m9 q: P! c
! v7 z$ N' g# p( T9 S) z8 g) D* g" Y$ `1 C1 c2 n7 j
运行结果: - 9 _9 W7 l- r4 y% p
& ~. ^9 J# M2 M4 V" k) A! X0 V进程2 修改数组第一个元素后-----> 1015 t; d, c9 T( ~7 d& p; ^% j
1 ~1 I. ]" p# M; x3 N- i( S. F; t
1 @! Q X" T7 F$ z: \
+ A# S' ~: g) Z) B7 P) s/ n进程4 修改数组第一个元素后-----> 201
- {+ N7 e: D. Y n& L5 F$ Y% v0 ^0 G$ C a+ K; c0 s8 Q
- * F* }: `7 S3 G
i* z. F3 S* [# E" m- q
进程5 修改数组第一个元素后-----> 3018 k( ~- B2 y; B( G3 x& L
1 Z6 M9 o! l! Q
- 3 P) s' d5 P! \% c" E- O
3 w S4 ?% z6 M3 U4 h5 [进程3 修改数组第一个元素后-----> 401
' Z4 n0 w/ w5 U* E2 k
( \7 \# U8 l" t2 \
- v# {8 d3 z* ^" B5 h" Y5 n, G8 ~. n- `6 a2 O; c) \3 z
进程1 修改数组第一个元素后-----> 5017 ?6 S- |2 _$ f2 P7 |( E
# E7 ^4 o& w& w _, w* ?
- / Z0 u! y( r n
! \, z6 A" A8 c u5 U, J进程6 修改数组第一个元素后-----> 6014 b! B4 f0 f: i5 s6 b! h
, K9 J+ u! O# W$ H4 Q# m
5 y4 p2 T: A/ O" W7 L- I( v( W7 N7 g0 G% x9 G0 Z( R
进程9 修改数组第一个元素后-----> 701' H- `* z; C1 E/ w3 [& J
3 S1 \: x2 y. D/ b
- 8 m7 e% N& a8 ]2 g0 z& m! Y; G
3 G9 M8 m( S7 u3 @7 G! d6 X
进程8 修改数组第一个元素后-----> 801
8 g% o% o7 ?% x9 b; J4 v6 l1 w% a+ S3 R3 |$ Z
; f( N9 V: U( b0 N4 F: t9 T6 k. c8 }
& l0 r- p6 |1 M2 Q( T进程0 修改数组第一个元素后-----> 901& b- v4 L. r! ?
% T2 ]. d& |' u& P3 |* v1 k
- 2 J% U+ W: t8 y7 | t7 h1 j
+ v0 m! K9 `# }+ \4 H7 o
进程7 修改数组第一个元素后-----> 1001$ x, e* {# z9 D- a# r
1 l" k7 M" ~6 v
* I3 e# o1 i( u9 g+ C" a H. ?
1.2 使用Manager共享数据通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。
g! K0 z% h" r3 i4 p: b
& m; M5 k: A1 k8 g% wfrom multiprocessing import Process7 s) J( a: t4 m% W5 U
9 |) Z/ Y+ V8 ]. W8 R. P
- ! m6 X2 h% M$ B, T+ b+ B5 g
; L% P+ d |9 f0 {$ Ufrom multiprocessing import Manager
3 w. k7 [2 {( T: ? o2 V
& f( X/ U7 y" p
$ ~9 P7 }! m5 }6 a7 h; A2 u1 m3 v% E2 p7 i. x
- d4 P2 H2 X" V6 m
- D' P4 q6 \, Q& K. S+ Q3 E
. n O, b9 a8 a9 Z- |
3 T2 z7 p7 i+ o* p9 V% a- X8 ?def func(i, dic):: |9 [% _# a2 P. z
5 y3 h. h; e* D& n. { a, _7 D
- 9 |" Z! |& V/ K; o7 }
6 i/ f8 V% H6 b% r dic["num"] = 100+i/ z$ d% _; L" @9 o2 L( B
: B; r* v, W0 |, K# \: f: }5 _
- 4 R4 d4 q, P/ U5 c: Y4 R
! ^9 M( @' _) [4 _! A8 F( ~ print(dic.items())
' n9 O9 e! T5 N& S# a8 \* c% R/ [/ ]6 ~
0 w* g8 Z* l. F5 d
& }0 `* Q! ^1 D* B- i$ Y9 U
4 ^( [+ ~0 I, U! s) \3 w
u/ S) I9 p I9 c- H
2 ], _& i) N3 d( `& S$ `( L! V$ S
if __name__ == '__main__':
; ]7 x, ^ _6 ~1 z1 K
( C: S) `$ V& M6 c: V v7 Y* Y
: H6 L, E3 F1 P! F6 |7 T
9 I2 P' [" d+ }. V7 P v. v dic = Manager().dict()
/ z0 }7 x8 t0 u
8 K$ C3 Z5 n( }- c& r
7 c' w+ G S9 H. l" P x+ o: ?3 I& J( N1 Q& V5 w
for i in range(10):
3 L5 G- R+ L; q) ]0 [! x }4 R! K" A' Q' l
7 e; {% [ Y+ M# z% P; o/ n4 F; W$ L9 g9 ?5 b4 e
p = Process(target=func, args=(i, dic))& O8 y) T: k% q: a
3 j4 t$ B+ b5 _! [1 _7 B& _ Y$ Y
% U6 p b1 N9 g+ e$ o4 M8 b7 i( p* q) e- s
p.start()
! p7 Y; c) ^' A; x' q# I; A d4 a d m7 V
% M$ I: K4 v/ ^) L5 G3 i6 F' k0 I2 e/ @: }' [
p.join()6 I+ ]) b* w, C7 a7 H
2 ^& Y& N$ w' {* c
1 }1 P* R7 l9 w0 N3 K& R5 @
运行结果: - 5 Z" H2 N5 Z% T) {
2 K- D* M0 ~" c1 {! Q2 c[('num', 100)]! O* Y$ b0 N& g3 a
- ?" U1 T9 X4 Q- G
3 S& D# g4 w$ K, K. z& _& @7 S0 Y2 f, v3 j9 j# D
[('num', 101)]
2 D0 G- g9 M0 J. T, E+ ~; a3 `1 Z3 h+ Y4 T- U, @7 M
; t) Z# \3 e! ]. d5 e8 n z9 R3 L, r7 f: C% m
[('num', 102)]* E8 X* y5 W! b( M5 s& k, c
$ @- P% \1 F5 d q- $ O ?- \+ h+ O1 }% F2 R
; d& y% M0 F2 m3 I+ N3 i
[('num', 103)]3 n; ~ T/ g3 F+ S" J
6 g; d; S2 h, [5 L- M
% O# Z2 Q N6 N8 {5 s7 |3 U( {' T' C
[('num', 104)]+ b3 x+ T' V. g' }! v
2 O1 G, m) ~& p- y* ^
y, C, h1 M6 m/ F( Z8 y- m/ y- E$ v1 Q- r
[('num', 105)]$ A4 T$ B6 e& F3 V
7 O9 S4 X% G+ M: D& D1 A; F2 h( t- 4 ~( ~: ?% A9 x/ f
+ t, C# k( T3 q7 B+ K/ r* ]% e d! H) B[('num', 106)]" ]: p% W5 J/ l O- @7 l
9 Q/ n8 |# e* y+ _
& C8 |: h0 K' k$ G9 N3 _. V7 H7 ~/ w/ K+ C* n9 L8 Q
[('num', 107)]
+ A {& U% @; ^7 S" M, h" X& V' s) P& [ {2 Y0 p) Y$ G
- d$ H& W! }+ L: h
2 \! W' B' Q/ P6 T
[('num', 108)]
- C: J+ q, @; v" K q: l, E3 }5 T/ I9 F5 W1 Z' x9 m
- ; J" m+ j5 k" p8 I. e- [. d0 v& J
) X! r% ?8 b _/ h9 u[('num', 109)]
( B# @7 e$ A1 C8 ^( B4 E+ u& W$ D) C& F I& |" G m
# a# C0 |3 Y& G 1.3 使用queues的Queue类共享数据multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示: - 3 e- B& ]7 F5 P# V; y
% _. T" T* T- K- W r6 `" c0 _import multiprocessing
^8 O7 L" D! N: j6 k* S$ w8 n- w+ P+ E7 U3 D( ^
- ; K8 U# ?9 G _8 D0 E" i
3 |) {5 C+ A7 @/ z p7 [from multiprocessing import Process- O' G( r0 S: V6 p
- H8 n8 G: A0 D2 z# g& c! b |: w
( z( X% H- Z# G! u$ s1 t% l0 i z' J4 C8 Y0 m _- C4 P
from multiprocessing import queues
- G% y( o% K' R& L6 x8 d8 y3 T0 f$ [5 K! h9 S6 R
( T6 W3 X( n6 ]
+ _2 k2 N w, p1 N
1 z; w6 O4 m: ~, z0 z# z/ _
: b7 m+ G3 D" h1 i! v
3 k& v' o7 L8 f
4 P" ~4 i% A6 H, Kdef func(i, q):' i6 v- ?) v9 B2 A* k6 s1 F
9 |( p1 f; Q/ V2 G- 3 j. k/ f* _- h" s, B
* f6 b" M0 [* I$ n7 }, W ret = q.get()
! z! w; I0 ?7 x' b: C+ S; p+ c; j6 }1 E9 J
- u0 A b6 r: Z- A4 V
6 H- j, h. [# ^7 h print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i))2 G( E# q. h5 O
; [' k$ P0 G7 N% M4 R) k+ n - : e6 k" L2 P5 n
- q2 p1 n+ @- X% g3 x: M3 j1 t
q.put(i)
( U% I: c% Z6 z1 [+ K6 v7 V; Q$ q4 R2 T! A* A3 s
: P7 T* W1 l. U" w( ?/ b) [8 @+ X1 Q9 \1 a7 f) h: G* N9 z6 p% |
- C. V2 Q. [1 V/ m* h6 N
) T/ `% C: l+ o4 J- + N- T3 A/ ~0 p. u0 |
) |* p( f! f6 K$ @1 `if __name__ == "__main__":
2 e: p& H. w( R8 R; C6 D" T
+ L& y( u8 M. _% [# o2 G3 v, r! C
6 W" g6 J q" h2 `* w9 S n& i' F: X" O S7 c7 H5 ]7 ^7 M8 z) u
lis = queues.Queue(20, ctx=multiprocessing)
1 B+ [( }6 @, e, _" ?
' o! n0 I. m, A6 ^/ f; W0 E" k
- x) u' v( O! a6 Y) j/ V; X
, x9 [# N% T' W5 a5 c5 u: ^ lis.put(0)
* {2 M5 ]+ N4 T+ n) r
. w5 q. o0 n. {, T) z3 A
( N6 g6 ?& R: U8 e0 p2 F: j8 W N" z. d' k( O
for i in range(10):0 C0 t: J" `7 i3 Z- p, \+ E+ s4 ^
& s6 @3 d# a" j' Y- 0 f2 G2 \' B8 j
+ b2 W' s. K6 K2 k3 f
p = Process(target=func, args=(i, lis,))4 u( X# X1 c5 H- G1 |2 A
* W+ v O6 ?- ?- L$ J
- ; @6 I+ u! |6 y I5 t
4 o4 [ h% o& T p.start()
: j" u! r( _4 R" p: a$ F3 H$ H; x6 b2 K+ F7 K, B0 D
8 C- L; M# X1 }) n7 [* i8 U! g
运行结果: - $ @6 s$ Z3 `' h$ d) i2 u
, d9 p4 Y1 m* T) Z# }! B. W
进程1从队列里获取了一个0,然后又向队列里放入了一个1
+ y/ B3 n% Q6 i" r* d c, x
; w+ T" Z& N5 [: a - & U; X& T1 t4 E9 B( k p- @8 M
- F1 J* @9 @1 Q2 E- A0 A进程4从队列里获取了一个1,然后又向队列里放入了一个42 K% @, W& W; _" F! X
; k, D" x" c' [8 F8 A+ U
8 C# m1 `+ }7 y+ a! v, J0 A
" ~- l; ^8 t1 ^: \0 u$ P2 L进程2从队列里获取了一个4,然后又向队列里放入了一个2) E& \8 @8 [7 Q. t. l; @, \( r
8 _% c6 T5 }' I6 Y. U! G) }) t
+ f8 j, ~) G4 D1 y: l9 ~, d9 _3 s8 g5 v% j) l$ m! ?% O% a
进程6从队列里获取了一个2,然后又向队列里放入了一个60 v" c4 R9 [3 h
5 d. `# ^! r; C) e1 [; H
% t# n7 I: K0 d, R4 m7 d T
& u0 E9 j$ K; ?& ~& N$ S4 m, z进程0从队列里获取了一个6,然后又向队列里放入了一个0
$ p/ _2 o3 R& { Q( [# J& H' E; L8 p! {: n3 u
0 [5 V6 O$ y8 W. T
, T2 K# V0 h* x1 v5 Y( O4 a进程5从队列里获取了一个0,然后又向队列里放入了一个5
- \: C! [0 T. _ }4 X) X. U' y5 i1 A+ D& X% a1 A8 |$ |
- 7 z: k, ^4 a: M0 q1 Y k
4 w. L% }, X4 }3 M# R: I* o3 l
进程9从队列里获取了一个5,然后又向队列里放入了一个9
( w; Z% d8 H2 [4 S S3 o3 t! D" c6 \6 R2 C0 N
- 4 j, g- O3 {' Y4 k8 D7 w( X
/ `4 j6 x& V$ p$ S- Y
进程7从队列里获取了一个9,然后又向队列里放入了一个70 s) S) B* J+ \) V
7 U9 _9 j) f" G% H - $ U& i8 z3 W5 x+ ^5 c
) \ b: Q% o/ X- I, _
进程3从队列里获取了一个7,然后又向队列里放入了一个3
/ i# t4 ^) w. A( M
$ f4 [* [& T) l/ x - 2 M q5 C5 U0 j4 D% w C
1 }! \/ ] o3 R
进程8从队列里获取了一个3,然后又向队列里放入了一个8; V# a1 m- u8 F% {# j
* \" s- K/ g0 t" a
! w4 U6 g! k5 r( o, ]6 _
关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。 2. 进程锁为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的,这一点非常友好! - - w# I# Z: m8 G M. T5 Q2 C
) M# x9 V" j( u* H& Z$ C3 s, afrom multiprocessing import Process
( U) h F: R/ o9 s7 R" g! \7 u. e# b) J: L6 T5 G: h3 Z- N8 ? ^
! ~1 D' p) @7 p. z" K
% | H7 P V5 D3 `' y5 J5 {from multiprocessing import Array
" V X6 x3 ^: H2 M: F; I3 a
( W0 }2 r% }% s( _, L9 R! ?
! N4 Z, @, G- M6 S% p
" |9 `1 x5 ?' @from multiprocessing import RLock, Lock, Event, Condition, Semaphore% o4 j( Z4 ?( T" O1 P1 G4 P
$ c% g( y; {0 |' E; ~, Q# F+ ]
- I7 O1 ]4 d& a' z4 W% ]) ]& r) K7 }
7 V r% N/ L7 e& w0 y' m# ?import time
6 {+ {0 p/ f7 c0 V x
& `3 `- i8 o! v - # i' ]! x0 M1 w* [; X: ~0 w, c
* V. ^% H( g& m
5 `+ z, k4 G8 b" Q0 L2 a# X6 q4 C# E
- + f- [* _5 H1 E
$ n. d" d, y3 j% t2 r
def func(i,lis,lc):
' U G" i4 T7 J+ A. M, t5 c! W, J! a& L
- ; c: q# r" R8 L- u2 F( z* q' h$ k
9 O) {! E# R9 z7 P4 N; E7 S lc.acquire()- n# { G% u- e' D {$ r% z) q5 J2 `
8 A0 b9 S% Q+ b" E' I
- " m/ d5 H) \$ Q) g. w! g, E# ?' J
q) ~! C2 n& { lis[0] = lis[0] - 1; w0 R0 L' f5 j" [( m D, A
& i8 _: L6 v2 [ - / q7 x3 i& a8 C- g! m: }! {
{3 B7 I" U* C! c" W* c time.sleep(1)8 E% p+ R4 I: C7 g' H2 |5 y& @1 u' I
- N7 X5 C7 c8 X
- 4 i4 p! f/ ^& j' m1 S: N
5 H5 r: l5 P6 `! j6 d1 z) x print('say hi', lis[0])0 l6 F6 `+ b5 _$ j" s
3 e6 f4 J, k$ t- t2 ]' q( e7 L. g
) c: Z; l( ^9 h! F1 _6 V- T7 C; {7 z6 l( Y0 L6 Z" H/ r6 o
lc.release()% o4 E+ ^5 G# R8 Z( P9 ?
) H% t" l8 [+ |5 a' Q. W! @- v
' ^- `: {4 C, G- e" y# T3 ^
; u- P% T) `' Z8 _, w
- q# p0 A6 ~0 I$ Y; r
3 S: n; H# } N) P+ Z- ! w v, k2 H6 D3 X, c' ~% \/ Y$ ^
9 Y5 s( m+ K! L, U; Rif __name__ == "__main__":9 s# g9 h, G5 F/ ]+ B* c
; X9 d$ _" F: j8 P' `. t
+ O" C6 t. I7 D, P, ?- R. e/ z" h
array = Array('i', 1)
% \# m/ Z6 l" B9 N* D7 t' Z; }. L0 }
- + Q9 Z! Q% ]+ k$ K
/ `# R9 g+ r6 T- a; C& Q
array[0] = 10# g; z/ M2 | |% m& f+ L+ D% s- I
% B5 Q4 @3 x t. v - 7 t6 R. Q' x/ X1 K N# R/ S" G8 V7 ?
1 S6 b2 B6 q9 C8 {8 R. ? lock = RLock()
& a+ y$ k% }$ @) n o
) G& A6 M6 @) [( B- W- n" i& W4 t - 7 I. o+ P2 G- h# o+ D
1 e5 o8 c. n8 ]5 G6 i for i in range(10): x- C* K9 N7 B. E) r! u4 [
' o& J( W3 [# g8 S
6 I4 e( H4 q+ B, R5 Q6 D$ t* x; @$ ?3 _3 f9 l. X! u
p = Process(target=func, args=(i, array, lock)), e, v. {( x# ?1 N8 @9 A' m
8 R( R0 a. p' i8 [- + _8 B% K( m. N5 f8 X$ g
/ x6 F: n* }4 A3 E. n
p.start(), o* Q, M! \* k0 P+ ]) n
# p6 V/ z' U- i, C$ d' A! {
2 t7 i2 g1 s; _3 ?: |
运行结果: - + E1 e7 v. S) V& D/ ~8 c) A6 W
5 K2 [* {( L# n3 G B. Wsay hi 9& u9 z5 t9 i9 D/ ?0 \# j" ]
1 ^3 J" g6 W( e& x8 R
0 |- ~7 d, t) p( X1 \& O6 V! v( {3 f: Y# z6 I6 _* C7 T- M4 y
say hi 8
q/ o! F" R# u2 y7 g% E: O; T2 x- ]4 n
- ; |% Z T8 o s% n7 }1 c2 m% s
, e7 W0 s/ H$ m: i6 Osay hi 7
6 ^& _" ^0 g3 g" d8 C
8 g6 w |' [+ r3 m7 O( D
" @) h; M% Q( n" j3 ]$ u* n' ]' z5 H$ D0 W: h% U9 T: a
say hi 6
" b0 y. d; `+ D6 Q6 ]$ C
2 N& u; X! s, x: ?/ i( f
9 z- C* q- j! }: |
( ~) S9 m3 Z" S# D* M& W# X) Gsay hi 5( j ^/ |) w8 D0 `
V, z: X; J9 [" L. u2 b" b
) I0 C# |5 J- _$ N; R
; p" j2 p' C7 D$ S4 G; Tsay hi 4
6 ?' r9 a$ f7 Z/ K; U. o; L2 O9 {) P
- ! _9 [3 T6 _5 X ^3 _9 B
8 i! K/ S# ~$ e- _. p) [8 nsay hi 3
3 v, W8 s) v3 X( w# i
! Y% k& r; N6 `" T! C6 D - 9 j; w. e/ @' i2 W$ a7 k
) o7 s B! p9 u' a+ \# f( Csay hi 2
. J$ K1 }. D# u! w" K. f# ~( Z# |4 y" k" A3 W o
2 h$ l. [0 `- g4 C
- g/ u+ U7 U* E* _. W4 @* S% Gsay hi 19 F% w6 e2 `: ~5 l; [: G8 A. Y1 _
7 z1 O( [% U& K
- 1 `4 g3 u, _. D& g! [
* U# K/ z- J1 z% N' k
say hi 0
+ x6 s* v& R; V2 V" p/ `; o3 c
z& w/ w- P! @6 X( n9 I7 E, t* n& o+ i; w) U# i
3. 进程池Pool类进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。 比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。 进程池中常用的方法:
, ^" F1 W) s- ~3 y- X. X3 m! {# o, V3 o& ~6 `1 s
from multiprocessing import Pool& W6 t0 l4 P3 C. X# Y) }# K
$ T6 b: ]) ^; H0 O# Z
- ! S% G5 I8 n0 u7 ^3 z- ?
# a/ w1 I3 \1 o# w$ R9 J
import time* t; b) \7 ~( S, v+ H! w
! n% W. ]5 Q4 ?1 q; ^
4 A6 [, G! v# \6 z- a+ L
% F5 d8 q6 H; P% M/ D
/ z$ k) Z4 c3 z/ }: w) ]6 p! Y& _9 [8 Y
- 2 h3 W; F8 L1 v7 F1 ?7 x
- l: S) E3 g/ R4 G/ L- q
def func(args):
E! W8 [# B. f0 Z
4 v7 t- c5 \* P5 O$ Q/ B - 7 S1 a6 j2 @" z) F0 w# p* }
( e$ X; F' U5 R+ c
time.sleep(1)% z1 |) m! Z; v1 [' ~% T' b
% W) ]) h$ `& C
5 y& w, e) z% f# _
" V o- D7 L. |1 W f print("正在执行进程 ", args)0 K/ T9 C( Y/ G9 j- k; {" k; G( d
3 v* G& Y s8 Y' b
- A. g: W: O" d5 K; L3 Z
7 B! I# e% @- G, J3 D# e; T3 ^& @2 Z( l
$ @: j3 c+ l0 Z$ u, U- 3 N6 }2 h8 v; N8 k1 r) F( W/ s
9 m& E6 E7 T) Z: A! F/ q- P3 aif __name__ == '__main__':3 j8 ~7 L. B0 A' L2 g
( B$ o! }6 F$ Q8 X, w
) e- b( x( y' l# A& K3 [; i9 z% O; \8 ]: r. x% `
& S" ?) A/ F. j2 c" e1 }* z& G& L0 F& x5 L! @9 O! A$ r6 O
8 _, F/ f- P/ G$ {- U( J7 @6 ]% F4 p' N" z
p = Pool(5) # 创建一个包含5个进程的进程池
$ C; R& P9 S5 h- N$ g% X4 k6 x; x5 w- `) A" ~6 i
* ]" ^& S5 @$ T9 e& Y! ?" u, B. p: h- Q: A: i* ?
0 y1 {0 ?; d; T/ a) u. w
0 u3 x, s5 g0 q; N! _2 p0 ~# C
: T1 n* }( y& Z$ a& l7 j$ I: x: B+ d0 j( R& p" x+ v8 H+ R
for i in range(30):
' [% _+ d: Z) u' O7 }& R0 V( n
# ]7 f4 f' S0 _9 v
; l4 z" Z6 g- f% r5 h& ?# \3 n X, X5 @, a6 m1 P# u9 d
p.apply_async(func=func, args=(i,))
" m7 Y) z8 J3 T g. A! Z5 J7 @
8 h( Y) z0 t6 H% v
- _' N4 g9 E6 ?$ ?, T! j6 ~
2 _" Q3 ]7 }; M+ D% e. R8 I$ c
! J+ c6 r' |/ l
7 d) `& ^0 I1 Y5 K* W- 5 M2 l! B! e8 ?+ E1 H2 ?7 h
5 k, l& K2 _. z p.close() # 等子进程执行完毕后关闭进程池
, Q& u2 |6 [) I% ]5 g. G& s8 O, Q- ~7 {$ ^4 @3 x- R+ } A
- 1 b' c! ]+ j( F. R
6 n! E& k' A7 _' [" A # time.sleep(2)
6 C5 E Y$ {+ j1 I4 H5 `5 g( r5 S) v' G- z r
# `- t3 |# I) u0 t/ p% L+ Q: N. v- s; J! O, L
# p.terminate() # 立刻关闭进程池+ Y s( B# b$ f! u+ W2 w/ H5 l
% g* k* h: m; `' D- I
0 ~0 G0 Q" U( [# }) M. }
4 U+ ^! _0 ^/ O) ] p.join()' L, Z" q6 o% o" Z$ V
5 l5 e2 c6 }3 F5 @2 c4 r1 R
" z' j9 F; |6 R, s# n5 W6 E
/ [: @( A" N3 V F' L8 K0 ^; ~请继续关注我 % J7 K& C. y2 |& H/ \0 A
. o' K; D" y% x" U$ m/ F; F- ]
|