1 [, L" x* d0 F+ e. o
爬虫(七十)多进程multiprocess(六十一)
1 R3 N; r3 p6 J9 X2 [7 T- ^Python中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。 另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。 下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。 - 5 C( s8 C7 O; U: m: ]# O$ k3 P
* [/ @$ `( Y' m" T
import os
8 y% E; V' n# w* _
; |' l: h+ I; h* x
- p% f" ?$ k3 W1 [, H* z$ A7 S* I! H, \
import multiprocessing
0 w9 n( {3 p2 r6 W; Z! g3 s `
) g: }; n' W9 @2 p. C9 l& w- / q5 L. G# L, t1 V- K3 l. @9 ?9 \
3 S1 W* X5 s3 x, m* o4 Z1 j- I5 Z+ s h7 j, W
& I% f+ ]+ S4 ?( p4 i
. g) G+ [1 L: h+ O- B- I
% l9 [) K' G, ?) vdef foo(i):' M' @1 A" J3 H
5 O3 O( {" U8 T! T T! b- @8 E: @' W
1 p* H/ z+ b/ Y# s$ `; V; z
; y: g- b+ y# T+ N+ ~0 t9 h # 同样的参数传递方法
/ c' p1 e# Y M+ h7 O E" b) `
3 s! L9 u- ]. m# u0 y
d& `1 l* Q+ t# W. R
! `/ F9 G; J! T0 b- x. e6 u/ E+ ` print("这里是 ", multiprocessing.current_process().name)4 p* y5 G0 a6 S% t; ?
5 R+ L+ @% s* C) m
- * `- v1 ~4 h: A6 C) L9 G
$ G0 J. |$ J+ M8 o2 r
print('模块名称:', __name__)- p' t8 Q% H& C. l
$ L, D7 p+ [1 Q1 a - 9 L# x: j, G+ J" B) z( @' n
- |, b. E/ T( `5 F# x
print('父进程 id:', os.getppid()) # 获取父进程id- I. K7 e7 y# v% L
& n: v, {& I9 b6 U
- # {9 G3 L' E; \% p, c
N* {" p* @2 @' G, \
print('当前子进程 id:', os.getpid()) # 获取自己的进程id, c7 j3 v9 a, {6 |1 E
) o- J! U) T# L9 }% }. w6 N0 t% Z' u
# l0 x7 V6 q! c7 H, p2 B
+ R8 s0 g* X. r5 W$ T5 Z- d, G* Y print('------------------------')
2 B4 ?8 j8 o' t5 t3 L0 @
% ]; L( t e5 D7 a- . _5 J% N }3 |2 [6 k j
3 w: D: ]* R0 v- m5 Q# |
, s6 \$ h8 Y' _' X6 R; M
0 J2 {3 l- l3 j0 K# w
- + [1 t- q' q: j7 y, Y8 |
8 n6 O; b! g, L9 w3 v' o4 T; z8 @if __name__ == '__main__':
$ c/ M( F, l3 R
- P, d& i7 q9 G - $ p7 Q+ e. P" C
" w, y+ m* H& Z; D4 G$ G$ r3 p8 ]( v; U9 Z* Y$ S/ ]8 k9 G
. C' G. X6 N& R9 @# N - # x9 \+ B) O& e- p5 @
8 D7 s# C# q0 [) V1 {$ h2 F: m
for i in range(5):
% ?5 U8 w! {3 ^9 N8 ] W* r. L) m. E6 R
. g7 T. W- Z, m9 ]2 ?% ?' Q% v
6 |5 D! P; V3 C$ A p = multiprocessing.Process(target=foo, args=(i,))
0 L! I. R G. Q# N% J8 _+ t& ?! q4 O& T4 h* B4 V0 u) a
9 v# z7 W2 I0 O1 n7 f. v' R; D* m& q T. Q* X+ F' m! \
p.start(), A& Z) y) d% F5 Q0 ]2 W) V
/ q9 i) C& W, }8 O+ r3 W
8 K' ^# o+ D0 i" q) X: I
运行结果: - ) `5 X6 k4 V6 a( l+ A0 M
. W Z9 W8 R: a- J
这里是 Process-2/ g# u# k6 s2 \4 S( T/ F, \/ e; m
' ~6 m4 x7 l" s' y* l# A* \( n
$ k- r8 R+ K w* C1 Q0 Z6 B: g. o* D$ }4 a& `% P
模块名称: __mp_main__2 M& y/ B: V! F( K
. D7 `7 Q* I$ w) [0 T/ u
- # T# \# I( i* B1 |+ n0 i' t- \
4 l$ f: O9 U: f- R+ h/ q5 |3 c
父进程 id: 880' A! Z( G4 V" Z) P. T
4 G$ K, H( M$ J9 I: Z$ p0 C
- ! T( R% b4 o( p0 r2 W/ f7 y# }. X- J
1 [3 h: t7 b0 E. ]3 a2 P当前子进程 id: 5260! A. |9 c( j! P6 S. I0 j6 R9 W
- R6 K R% N ]( n6 ?$ J
6 y" k) s1 M, P# M3 ^/ J% E. V3 H+ U0 n$ v Y4 w K7 S0 W' _. O8 {
--------------; e) G% U6 n& |
) ^. i: P: V; U* r
4 G0 S' l: h/ Y/ B$ b* q. C( B2 Q. J5 P/ W% s
这里是 Process-3
1 P, N" e g6 z4 p. E8 o
[5 P4 g |# v8 c" T
- [2 o" o3 {; y* h7 J F* x8 Y3 n5 H8 v* O, P' [: z; Q# G% n# z' ]
模块名称: __mp_main__3 A% p, Y& P0 R. U
* O, X! z- a) L2 L; H) T+ ~
' H+ n" U& S$ W4 b* c8 e+ ?- r" _3 k. a8 m5 k
父进程 id: 880, K3 [+ g( D9 C. f
2 x0 T6 ^6 Q% O! n5 x) @8 Y
4 s3 }9 O+ w) R; `" m
7 A! W# C, e& G6 n+ L当前子进程 id: 4912
# p M1 @2 Y- c, f1 H
) T+ D6 m$ P/ W% d/ d- i
- T6 ~7 H5 C! z6 z5 C1 B. D( S: O) `/ j: g. \4 Q% {0 V7 ^& x
--------------8 V2 O& a x9 v' Q* ?
" h1 Z* j& N' Z) i3 V
6 a1 G @; d& [3 C! q4 G3 J- ^/ V" F8 ]. z0 u* Q
这里是 Process-4
, Q& r2 Q M. k( a, [8 G) V& c* k# T" D3 L b/ x, p W0 I
b) ?# t I; M* o4 a+ e: Y9 [) b0 n9 c ^/ z
模块名称: __mp_main__) t0 [. Z3 P; B
" Q9 @# C; Y; d& C& t0 C! R5 O' i
1 @) c2 ?3 V- Q O
5 D. k u; Y4 ^. y9 S5 g1 u! ~父进程 id: 880& c& A4 \% }4 T- g! K2 F8 k
; A# _6 t5 M/ w. Q
, m* A. C8 a: H5 e9 t. B! b1 |0 r @% }+ s7 ]4 ^5 z- }1 E
当前子进程 id: 5176, z1 c( y4 y. }# l% d8 G/ C
9 y7 W) I5 E- A# y6 J- 6 q8 b' Z8 U* u9 z
% p8 X6 E& S/ k/ Q, ~0 y0 W
--------------
& E0 S( e; h' n" y" [/ ]5 B! F, S% ]6 {
+ M6 i7 @; [+ h) O2 }) ^/ `; b p: x- ~; b- M }
这里是 Process-1
6 v/ X9 R( M/ a, |8 t4 J$ ?: e
3 t$ H9 \' ~! Y4 L- , Q+ z9 H: O6 a9 _2 W* s. |9 u# m
U3 r$ @( u' N8 j' t7 t" U. B
模块名称: __mp_main__# A0 R1 l! a3 _5 A2 J7 K# j B
/ F; Y% l% e+ U7 g7 n" j
- 3 c6 d2 N7 f! b2 n- `9 C
0 n8 d: |+ X5 {$ ^父进程 id: 880$ h: l& A" T% d8 k
! g1 z+ w, ~& U" W
6 L- ^- c+ }3 Y; Y# K
; F8 t$ {$ s7 z5 J$ A当前子进程 id: 53800 N' S1 K) p) n ` D! k
0 r& h: H" i, \" C
- \' M1 `: d) S/ u) ]
: c) s1 h* s- T+ V+ f--------------
6 }; ]( {& x% j2 A1 v# \5 j; s! ]" s4 b
- l# B4 m- u2 k, s/ X6 q# ?1 t2 i) n! a5 o
这里是 Process-50 u9 `7 g. z0 ?( m6 L) s
& W; P" ^2 `! K* Z, L- 4 y2 X% T* {/ l7 q( |- b s
6 ?5 E$ {/ x, S% u1 b
模块名称: __mp_main__
2 w! l) D \8 j& @: [& R
3 s7 A: H% A4 J) q2 R: a - - v. v+ p# t3 M& E' F0 l
; ~4 d) y3 p7 e, ~7 v; {父进程 id: 880
. f; ]; x7 J- \; t. e8 v; ^3 M
! H% k& r/ Z4 `/ w5 q( A - ' B2 R, V: X- j2 F) U) x9 @
% h! ^+ i" q4 e Q当前子进程 id: 3520
, {% P" w% M5 F$ [% A, j! B" j* Z' |: {& R' F" H
- # b7 [3 v( G4 x4 F% o
0 } M8 ?" K% Z( q8 [
--------------
( @* P8 M0 n/ ]0 c$ `
" Q4 y2 m; T" [& c2 ^, Q+ B# @1 s+ e# g
1. 进程间的数据共享在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。 创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。 下面我们尝试用一个全局列表来实现进程间的数据共享: - * ?$ ^& n( C1 |% B1 A
% V* U0 l9 T1 s. q: V# C
from multiprocessing import Process
9 u" G5 M' X' r/ s3 T. ]% P
. d& I6 Q; P* P; m; K) `. W - : c( L2 ]% m$ z* V+ O7 A
% m4 X1 U4 e8 s7 i
7 T. q: ]! `3 u Q9 V1 x
* X5 C3 ^6 \5 h5 U# q
& r/ s! i7 V8 @1 t- _% A6 C+ S+ @6 d4 j, I& s* a: o
lis = []2 a+ a3 U- d! y: O
5 F3 r3 }& N: l2 o9 o: g& z
& h. ~5 T- l o( y; Y; p0 [9 W
1 U# v& J3 m6 S" D. j
9 p8 X" @2 v9 A* t3 \5 N2 i: j. l0 Q6 z4 @
" w- [7 D5 E$ p7 D# M
, J4 D o( F1 ~+ T U) f) O% xdef foo(i):
0 j" b" O" @1 f, ]
) h3 t8 T4 c' H# V$ Y. `1 q; Q- , L7 y: M! U* ^
" s" Y& E$ K- R% j lis.append(i)
3 ?+ P' m/ X, B
2 p; Q8 y: g+ Z0 Q! ~# n, ^
' ^& f5 {- ?% h! V0 n+ I7 e: }7 f' B6 d) a! L! T5 p9 p( o
print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis))
5 o& G% h3 t! L" t# F/ |+ `
: G/ M# c) h4 U* N) P* {2 D- - {' e2 }" z8 U. s/ s: F6 n# L
% @- M4 ?8 X6 G2 c6 P. V; Y7 }. t9 W$ {5 U h3 v `) u
; b1 b. e- q, F% {0 Y+ ^/ G
; k, V% j: Y: a9 Q N6 @
$ \, n% I# `+ a& L, m4 k8 V) wif __name__ == '__main__':
2 u/ ~ i! j- k. g- v s! F) \ G
/ E) |% A7 b3 A) @
5 ]; F: i0 a6 v3 @! N1 Z2 w/ R" N' ?9 L/ n
for i in range(5):. @/ Z3 w n9 ~( [
+ F7 _: C" _5 N$ b$ e6 D; M* E/ Q
1 g5 i# O/ c' o# C
! }' Z0 O$ ^* c' v9 ~ z2 i9 ` p = Process(target=foo, args=(i,))1 A5 _ |8 p s7 V( g
W# @; K0 \$ q$ w& U& i& G
- % R7 B) S# ^* T
' Z2 f+ ^! B- R! t' b m
p.start()
7 @8 ~ ?0 w9 \" i* o, j, q I1 R/ l' K. B
1 ?" J. `4 d8 u. [3 r9 U
$ E$ n9 J- r m: K print("The end of list_1:", lis)
" p7 _4 Y3 \: ~' }" p* ?2 o9 d- t9 Z$ B* R* j
9 x3 i* G9 e5 c; j7 n( b! B" u
运行结果:
2 Y, c( t/ h+ S, |: g9 ?3 h+ r8 g( g1 c# r5 c+ D
The end of list_1: []
4 H2 t o6 H0 w. T2 J
% c' A3 R5 q3 S5 |2 `' D3 o. d+ H1 n2 z6 |- + ^( a2 D6 ~% w2 ~ L* j3 [
8 Y. i8 I7 k P$ x5 E3 }. D# t
This is Process 2 and lis is [2] and lis.address is 40356744
; Q% L- W' s* j* x& X) a- H) Y$ W0 H1 m G0 \
- 2 l- _; d4 u$ v/ Q+ X7 c/ P2 g
) q! h/ V t9 |& N0 Q
This is Process 1 and lis is [1] and lis.address is 40291208
7 j: c- t+ Z% x/ I# n' {" V9 h& r. K0 D
- 1 @) \2 Z7 ]( u/ p
% \0 P# @$ v( g5 b
This is Process 0 and lis is [0] and lis.address is 40291208
( l% ?- r; J0 L' ?: q5 `
/ h/ `. Z% j1 G B+ N/ Y8 W - 2 G8 S5 u: ], S8 P
1 w* k' j& B/ v5 b
This is Process 3 and lis is [3] and lis.address is 40225672
, c: D4 b* m4 n5 G- g6 g
1 H6 Y4 W( F( K1 v6 i5 n
( d- ]9 B7 v( ~) ^) k+ F' W0 Z3 q7 M+ z6 U8 q) N) B' R" ^5 s
This is Process 4 and lis is [4] and lis.address is 40291208! }' f2 h+ X1 B2 B' j' j. c7 R
1 N4 H' b$ k9 |. D( {* Q0 ^" \- J7 s+ S+ T2 l) v
可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。 想要在进程之间进行数据共享可以使用Queues、Array和Manager这三个multiprocess模块提供的类。 1.1 使用Array共享数据对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系: - 1 {. K9 r b/ R P7 z" _
1 D( {9 [8 K/ M5 F& A# Z+ `5 w" g
'c': ctypes.c_char, 'u': ctypes.c_wchar,
- w# V2 a! ]( d# I0 l1 u0 _1 \) A
- # }9 _/ @' M$ g* k# V) r6 L
0 d P( Y) q# Y9 P5 M6 b* w; Q
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,, d) Q* L( S2 l3 y' x: l; p; h% a- X
- j8 C7 Q- M% q4 M7 \9 l0 l& v
6 @! B1 g6 B; u. N9 L- I# g, K j5 M
'h': ctypes.c_short, 'H': ctypes.c_ushort,
# c" J$ V/ Q+ P: w3 e( d
% ~" O( Z$ L- [. t) U5 r7 m U
H$ Y- u( Y9 s& c
/ \+ Z+ T& O9 p& |' j) z9 X; r'i': ctypes.c_int, 'I': ctypes.c_uint,
7 @. N8 U! g' s+ D4 B& ?+ S _
1 F& J; L" n5 c3 H/ K) r- 5 U! q* m3 k, ?7 }* u
. V6 v# l- ~9 C5 B: i7 k" b'l': ctypes.c_long, 'L': ctypes.c_ulong,7 ~# K9 r; ~ M$ F# V' y
9 u! v5 D! M& @1 o0 [/ V - 9 k6 Z; U. @$ `4 x
8 ]: d0 y% ]6 M g4 l4 N. S
'f': ctypes.c_float, 'd': ctypes.c_double
' o& w" x. @1 _) {+ _7 t
; X6 G" C' N- N; `4 X" D& q
% V$ e$ v4 P- Q6 ~- o
看下面的例子:
" t5 t' C9 F, m! c4 C. }) q7 {+ h* e7 f7 g9 y( x' \
from multiprocessing import Process
- r. L X+ F8 t+ Y; J+ J8 G
/ I7 X9 W4 G1 \- }- ( e o, ~$ P1 @* M2 x
0 `" R2 i6 u2 Mfrom multiprocessing import Array1 g. e. _' K0 R) r8 I6 X
( B$ ]% X6 \& C
1 L1 I/ @$ J( B# f7 F% W* ^; `9 E+ u( ~. a, \- ?
& p4 ]" e) t. |1 G2 g0 V& B" k ~% Z+ @, ]0 C
$ C; r( J% d) b2 Y* S/ k7 g& s% a" Y# c% l" P
def func(i,temp):/ M* C+ p2 f: W N$ C
) S2 N3 g, f5 t2 A/ V/ e- 8 A( N, f" S! O. O
5 Y5 l7 _% E j" n" @. y
temp[0] += 100
4 S# v' n# |& Y- I% I
" j. r, h" f* n2 z/ P. Z7 G1 c - * N) Q2 L. O* q( x' i
1 q0 [* ~6 r( l$ j' h print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0])
: y E; ^( A" K+ d* W9 T4 s
: R; v0 s. _3 R9 r( j% R, Y. t
# ^' G8 i# h5 L- ]+ a$ J$ `/ w s! w/ N2 i
' b2 @3 P) _; P# B" b$ N% e, g. |0 C+ ^9 [
3 U7 G+ `6 m2 _' _% k
' m9 L7 w* `- D, i; F! }+ w. P' [if __name__ == '__main__':
3 { T5 t) @+ n7 C: k
$ V* s Y6 a9 ^: m% c- ( s. U& L+ |( h& x t
# G8 p0 D3 G2 m1 a temp = Array('i', [1, 2, 3, 4])! |: l0 ?' v) |/ i5 r
& I# p+ o% h6 R6 P6 _/ z
) F( w" ]* t1 ?% O( z
" w0 j& `( R" Y1 T2 [% W, {* s5 l for i in range(10):8 U! d0 S) F4 C/ p% d# Z
. y$ C2 q7 ?: \% v7 B z- 3 J6 m% x8 d9 Y0 ?* w7 ?5 `
2 z, x6 L W9 N# x/ E p = Process(target=func, args=(i, temp))
0 A& t- w5 S2 ]- ~& H s% m- y
! s& i% a3 K- i9 ` - ) \. C; P; u8 r; q( c2 x
% H" r$ |: o: j# r3 g2 b& `2 e
p.start()+ R* u. {5 m. e' I
! y9 y. I* x8 N3 E/ C
; t1 d' O& p; s" a! J( ]% g+ g1 ?
运行结果: - , c1 X1 C8 K& p( z$ w4 W8 E+ t
/ [7 K5 B8 {; `# Y, G) C- X
进程2 修改数组第一个元素后-----> 101
1 D, I1 \ N) q- E" D w* R& q5 x6 s0 a
- v: G" o. A- J: c
1 b; f! R) h' E6 u2 L6 C/ M进程4 修改数组第一个元素后-----> 2019 s8 N# ^' ^% h+ ~2 U# S7 Y
8 x* K, T2 N0 q( W
/ t; N/ i! L4 t2 A+ @* T* j% @
5 u+ d+ R( D, r7 W" n! V进程5 修改数组第一个元素后-----> 301; b; v, U' r" q M
$ k8 y" r0 Q b8 f% X
* ^7 F4 Z9 q: _/ d/ ^ W/ e" R
& v E/ ]4 l& m2 O8 D进程3 修改数组第一个元素后-----> 401& r" ~# n2 ^, l9 } N
; g1 B4 U1 w# O+ h; c4 r- 1 @$ H, r. _" d% s7 b2 f, |
2 ^0 B$ i X3 H* J7 q, m9 ?# Y( r进程1 修改数组第一个元素后-----> 501# I* z! K) Z" J2 }) v; o- Z, K
5 d6 @5 s C" \+ A) r - ! {8 @; [9 Y# s( I
- Q6 q7 Y: S& @) _/ g# [
进程6 修改数组第一个元素后-----> 6018 r) _5 X6 q8 l+ m2 P
\7 w& O. J. q; A
, N: `! q _. E$ z& D4 Q" M1 g! i! Y/ ?1 T- S8 G
进程9 修改数组第一个元素后-----> 701
2 y) X9 F) X8 |7 g( J
! T' Q* J& v/ X3 J; v/ v
* Z. w, _0 W) z: V8 j
# |: W7 a$ K; @8 s4 Z进程8 修改数组第一个元素后-----> 801! R. `; _$ n1 y8 G6 k$ K2 U
- R6 `" ]+ M. Y/ O8 d) C. L; w4 q- # o6 p. a$ m. P6 L
# V' G+ J8 A! p进程0 修改数组第一个元素后-----> 901: b7 n; Z9 A9 E- \3 i- s, h9 @- _
+ s3 ]* Y; \9 g. k& g4 Y v
- , D* O9 R) J+ Z9 @0 ^
: {: d3 B5 Q! H& V
进程7 修改数组第一个元素后-----> 1001
8 M2 V3 S: e: Y! F. z7 B% w" Q3 d3 X# M4 V- x( {# Y; k. p1 `
3 b$ k u8 D- m# s# T
1.2 使用Manager共享数据通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。
8 ^' [2 H3 g$ B
+ d6 f8 z4 A, g( Ffrom multiprocessing import Process9 r+ U, g2 C3 s! Q \
% k1 r! d) \& c/ R. ^! k w
8 N, p% I, r6 X
# l$ P3 @, O. f# W: h3 ^" c# pfrom multiprocessing import Manager
+ x' e* I- S( T6 K; D! n
' n; U/ J+ L) Y z- ; k1 z; W9 s; i- G' C$ [0 I# j- j
0 X# i1 D4 V. i, B! T5 [* F6 I# o1 M$ V$ A) N
) n9 N6 v& L! @+ D - 8 S6 W+ s- v: L% ]% o" e
# d! H) R6 r5 B* O8 o9 Hdef func(i, dic):
4 Z" |$ w! T5 w( ~7 k: o( ~9 P- _4 I& P
! l5 r' |( a/ }2 O6 `) @
4 _- g9 u9 n6 v/ t- v/ A* V m2 l& |; I dic["num"] = 100+i' B# |" w" L) o
& k6 b; ?& O( n2 K& J3 I9 d
0 a! k# C, M7 q1 _5 ?3 o" B0 g, l) A& {! Y% W
print(dic.items())
$ n9 S7 b1 z. P4 ~9 L; R7 }5 M' q/ ^; a0 U' R3 F; L1 C4 t
5 [& i6 Y2 ~- N/ g( r6 C1 W/ J8 w, j# p3 W$ \
6 x! U( @* p; ^ m
! y1 k7 v @5 M: L3 d+ ^, v
8 t9 U- R' V3 B1 ?6 v+ B/ `& s- }7 I H6 K, K2 }
if __name__ == '__main__':) x9 v7 }5 X5 E% T5 D
6 }1 M2 V, `: T+ `6 R, M6 t7 [
. L9 g+ I; b4 m; ~
" F: M8 A i5 K/ O6 |1 \1 o- y7 m! L dic = Manager().dict()
$ m: h: L3 k# K, x) b/ y4 Z$ `8 \7 ~0 `, F9 U( F! o9 _2 ^& D
- Y- W% m* Y% B( K* ~0 f. ^% i Z
- z) `" e* q- v2 n0 I* A for i in range(10):
- K3 P; ~& s8 d& }; z5 R/ ^
0 ~; P6 b' w$ n t4 D2 u4 X! V - " l" M; e: S. A1 r2 Q: d4 X
1 [! z7 F% b. V& k& j* v
p = Process(target=func, args=(i, dic))! {5 c" f( P/ q" x& Z9 o
( n- O3 |# n7 \- s: H# ]# h2 q
- ' Q2 f' e# s5 r- J8 e& f) O5 P% A4 ^
' |* c# u' T# @3 r# a; R
p.start()
7 ~) z3 w1 J1 ?& {& ]( o5 Q' Y# ]. q/ u6 z0 y
- ! Y' n3 _0 R- J* ~/ i
+ U' B t$ w1 P' }9 M0 w. T+ u
p.join(); c! i: U5 C1 C1 Z) v5 A; B
3 B; d0 \) @3 [& m# _1 a9 i2 O0 v9 G4 h9 L
运行结果: - ! Z. Y6 @% }; z, w- Q4 p
# N8 i& i! N5 o) }* n! ], n2 E6 N[('num', 100)]4 z! M( ]* Q+ q$ N3 S* R: g
0 a* V U0 P- G- f" h: s! E. N+ ?* ^
% J; {+ ]* q3 @5 `; O0 g: a' U' _( \
[('num', 101)]
8 F2 Y3 J8 B' _: _
% q* ]: Q( Z$ X- X0 _% y
: n1 j3 I9 ]! J; k. J( f% p
# s. N6 M4 ~0 S4 g[('num', 102)]6 ~+ U' V7 @) W" `8 n4 @ O4 L
) M- } l/ K+ O: i5 ]
( \7 h0 D! f% Z6 ]* m$ V1 R- O! d, E9 C
[('num', 103)]
2 g8 ~; W) ~, O2 M- ?0 v7 f4 N4 y6 z0 E, n$ J4 A
- & v# N( E; y. x
7 @& g9 ?: t1 r: y: F* B" c& ~% Y
[('num', 104)]
4 Y' q# T# V u$ B% n& V: R6 {9 ]7 y) Q; z
- / y$ {- w5 X/ P, M) l0 I4 x5 E
' Z0 j, G0 c% S0 I[('num', 105)]. \( H# }7 r- b, j+ Y# i
Y2 b: s* p: S5 H
! a- ?( a2 b- M% V- b! k1 h
7 c; z- a r8 h7 L; {[('num', 106)]% T& V8 m- d# f7 m8 a/ [$ U7 D
4 d- W: D$ ~7 R; e" y6 s6 J' r
, @/ I7 F& U/ a% E7 N5 z; j0 U Z9 s( N# `4 s; V" @' l
[('num', 107)]
4 \' d& L- v, G- G# |$ a0 H
5 d& ^! g% d$ u3 C. H* g' T K- + W% c0 z" U7 I" }( q
. f0 C6 {* @- Q7 h" C! | ]4 d[('num', 108)]
* e3 V, W6 D, `! N, O
* B- } W K( I6 Q- E: p - 5 S+ X: j4 G C' [5 N
& W4 b6 G+ S/ N/ Y: m- {8 J
[('num', 109)] X) s# |1 Z4 p
* g* F1 H, x/ B) q, y
9 \8 ~/ ^# w- G 1.3 使用queues的Queue类共享数据multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示:
4 D8 E$ U+ a+ o
2 F. J" K' `( N; z3 S* [( vimport multiprocessing. @; D# @1 O& U! a2 h1 m
' f6 L- ?1 @% n) |* }# d, Z9 D5 @
& i* z6 i! L; B( Q: _' y8 x, \4 i1 q+ Z5 M9 Z2 t
from multiprocessing import Process, H0 ~2 ]: q; q/ X+ U/ B$ ?( A
; n4 Q5 A! ~2 @8 q6 ]9 {
- ) N, |6 Y6 P ]0 i( q
5 H; c: |- ^% n* W' e# b( x' d
from multiprocessing import queues
# T9 u$ Z; ?' B5 k5 t; b- S% m' P8 j" `8 `+ B) j$ O z9 G
- " u( X e" r0 h; r. d: n, f
; _$ y+ O& j6 N& o5 h0 N7 b6 y* c. A5 |( A" D9 D
5 z' @+ W0 k+ U9 j: G9 A - ! @6 n8 ~5 w- R9 _, Q
( {8 w0 O ]' H+ odef func(i, q):, f0 I$ Z- _6 v! Z) L
& A Q# o& J0 B1 l# ^" s G
' Z3 X) B1 t5 s H0 f
: w7 b" A, e( H1 D5 ? ret = q.get()) X! B: z" A6 _, _& k! _; I, A
6 q0 m9 q2 F; F/ }3 |# o2 { N
: G5 ^ t7 i# R/ u
5 b7 v7 u. E5 @) g t) X print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i))
5 K1 |$ o8 o& p2 n9 s- F) b. R \) F1 [2 p$ k4 K0 W& Z
' o7 q5 c, l( m6 f# p* ~2 l" ^" y
q.put(i)
- @) W' z9 }. K Q- ^) ]4 O
$ f5 N: A. i: }$ _. i
* [# p8 q8 Y$ R; g9 e
! N8 i5 C! K% O- J1 X5 _
9 z; x( \7 D& [: A7 ~1 t2 `9 q9 @/ W8 [; q; E1 M* ~
- $ ]& ~+ d g# h2 C0 R
# \5 `: V6 }. C2 y4 F1 ^9 Sif __name__ == "__main__":
. x& u: z, q& k" W" a" O% d: g1 p1 f' F5 ]7 v
- ; k% u |$ W# s9 E
+ a! y5 T/ r& x+ V) W5 z4 o
lis = queues.Queue(20, ctx=multiprocessing)9 h, N" K* g' ]: h
+ d# Q1 p& S0 h8 k
9 `0 k% k( m5 F, O/ p
, z* x6 y% o0 |8 p( W. w7 p' q" D lis.put(0)
( k$ v0 T. Q/ A3 T: o1 H8 M1 T- c/ i! w1 T+ { t% q
- / d2 s1 [& P* ~5 ^ V3 O
0 n/ S. R$ B; s9 f& @; \. h) _ for i in range(10):
$ \ y' ^' I( ^$ {+ M2 N
7 ]: b7 U4 N6 y9 t, n - 6 ^3 R3 i' w8 Q) }$ N" l% J J
4 v9 h* l$ `3 t4 o9 S. H3 B p = Process(target=func, args=(i, lis,))
* K/ b0 t1 y! p5 s. x5 \
8 l: f; O/ E1 W0 t U% B' r% c
) l# W6 x" [: ?4 u% a
; G! L8 c5 R4 @* l2 ?$ q p.start()
# O6 F' O. d# Q/ ]- \
/ R: C7 K: `! X! o+ b$ N
3 B" A2 O0 k8 c! c9 ]
运行结果:
2 [! E _6 }" Q/ p) V- ~7 y" z5 y+ \+ N$ }! r& r0 W$ _
进程1从队列里获取了一个0,然后又向队列里放入了一个1
- j; R' c6 N* r6 z
4 |$ n; Z2 r! g3 t4 {( N/ d- ( G6 l) }2 k v: @
O# t. B% i4 t ]
进程4从队列里获取了一个1,然后又向队列里放入了一个4
z1 l `3 i1 I9 r0 q# |) m
1 h. a2 [2 `8 R @
( q, j5 s; O( n2 y; B: g6 c
/ V x( i% y' ?8 u进程2从队列里获取了一个4,然后又向队列里放入了一个28 r5 n3 ~- g. Z' F) {. n5 ]
9 w- f; Y2 A! u( r
! c( ?" D) e6 @8 |( g2 K
4 X6 I8 k& J! [% P% x3 l1 o进程6从队列里获取了一个2,然后又向队列里放入了一个6
* z, z2 L/ G3 a+ P9 c4 N1 p; U
3 u, u7 R$ T. d9 b. Z; [1 Z' @2 o# b
8 _5 P/ f8 r# N/ U% m8 l5 G0 h: m! z
3 P6 Z1 i+ A$ N进程0从队列里获取了一个6,然后又向队列里放入了一个0. k$ n, q3 H7 c7 T2 n" V
C' B* }& c% Z% T0 Q9 \2 F
5 P2 ` K6 W6 Q. t, S3 M) R; L: o0 {( E
进程5从队列里获取了一个0,然后又向队列里放入了一个5
. N O+ c6 A9 O$ l' b! H$ A: s& d9 ~% m( j
- 3 q% o9 d1 z* ~0 W) p7 Z
! o8 q5 x6 \. y5 H: r( U! u
进程9从队列里获取了一个5,然后又向队列里放入了一个9
; Y& W1 B) L9 b7 H
Z8 N; X- C* y% s: S0 J! f - ' v) V8 L& K6 r# o) _. p
. M3 Q+ x; o# C! o U h7 _进程7从队列里获取了一个9,然后又向队列里放入了一个7) n+ }' `2 v# q
- @; J! o5 d3 n/ K$ X1 F
" ]9 \ M( X, n
5 z; c5 `8 v! R, G进程3从队列里获取了一个7,然后又向队列里放入了一个37 d& c) i4 g' ~& L& @3 h0 g' W
& C+ |7 U# }$ a$ s$ Z0 N% r4 [+ ]
- ' V- `8 u0 {7 y
3 K. q- r% \% l) {9 f8 `0 x
进程8从队列里获取了一个3,然后又向队列里放入了一个85 l3 Y& ~( b1 y2 r- p
# l: {5 s2 l; ^4 f0 d6 Z0 D8 r
; @0 o$ G) ^) o/ c0 S7 ^- A
关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。 2. 进程锁为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的,这一点非常友好!
' M& H8 e3 n/ G
; k* A) ?9 Z4 [& _+ Kfrom multiprocessing import Process
7 S+ W4 v ]& |. e8 g( v
# D# C# p& q1 p4 n! c3 k/ h- 1 z1 |/ n0 \+ c9 ^: u! u1 B3 R* f. ^ J
1 ^ d& b5 W+ z/ O3 Nfrom multiprocessing import Array% [7 M- a* i9 x: a% O6 N2 j) y6 y
- z7 f7 T# w* F; U* v, G) `4 b% c - & M% x# p3 W6 U
% g- K) h- \) V' V& L
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
8 a! Q) ^/ U9 z- m4 K% w G O" g$ r: n! I4 p( H2 O" D. o+ o
& ^2 [/ p" M; X1 p v# q5 `$ z9 P) ]1 N2 T$ Z
import time, A" Q; n$ w r6 t& y$ k2 B5 Z
$ J) | C/ s. p& a, V
5 i# `# ~) x/ K; u9 f2 ?# I/ _$ z$ O% X F' o- J- E4 J
; b! y Q- p, B/ P8 h9 l R
# j5 x( X# k$ V; P
/ H9 X. Y5 p4 B1 E5 ~6 a i4 D4 `# c' M8 {
def func(i,lis,lc):
& X7 g. J4 k7 f* X
2 G8 {7 X0 h& ^
* t b \# e- ?2 {9 p3 n+ o, m" A/ s6 H- [3 T6 K! ~
lc.acquire()3 b, a2 P& }/ ~3 u4 s- `6 Y
3 `+ e5 T1 Y f' c% C8 Z
) a7 i4 a6 `+ h9 b6 w$ ?
' T; A3 a( D6 `3 M( u4 { lis[0] = lis[0] - 1" L8 I7 H; h0 D9 G: H- Q. w
7 ~% e& l, n. R7 @ C+ h- p! Q
- ! x8 `- ?9 a) J( S* b+ b$ B
! j- w% G4 u, K$ D7 k+ ?
time.sleep(1)7 j7 {0 B3 w: t. o1 N7 g3 |
. _$ \! r" m @$ V" F
+ N( P# L# x4 _1 T8 }$ ~1 z( A
0 K' j7 O, g# z( j8 k j" C print('say hi', lis[0])
4 d3 i5 b( y; i& R8 O% W
7 | l) P8 j% ~% r' B* [9 \- h3 U' `4 J
, y# r; J s0 |5 Q0 c9 Z6 B) Q. L
( ]1 f0 @2 C2 f3 L3 X+ i6 G lc.release()
! N5 t Z% @# K6 Z
5 F' T9 g9 N# X* a8 |6 r
) O" }/ h9 A# Y9 J V" Z4 d
/ s) T6 P. O( ~$ ^! L" E) `
7 a# a5 R" I! c, L( s* K O
: n1 B! b! b8 h& A; c
$ _3 Y# k* \+ w; x
! s0 m7 o `8 F0 l" W! w. Jif __name__ == "__main__":
0 R0 A5 B5 ?6 n8 h4 ~" h4 Q
) `" X* V3 J7 n. J0 r: R- 3 [. Z- Y7 C0 h; J6 f# c' H
) i. d+ O% S( E4 r8 R+ _$ l array = Array('i', 1)7 b. V5 B% L% @$ z# h, F6 O' r O M
$ n4 I, ~0 k6 Q E
- # A4 T3 M9 r# a( @* f2 X
0 ]" g9 B0 l6 f9 z+ z3 [! w
array[0] = 10
4 ?% ~- R% s, v8 b$ z- a
! {; E7 B* k* ~5 I1 ^ - 3 h& T9 h, ?( |0 E6 N
: X* d: V4 B, O8 \, g# e) w, ~ lock = RLock()8 N B2 r3 e6 @8 m, A) g
3 C( q+ J' M) P% g! r- n2 C
( \/ ^, u; {. z. l) S6 a
1 H8 T/ A$ U* U5 h/ C% y, m for i in range(10):* b4 y0 \) l8 {7 Q
( ?2 l+ p D! `, u7 c# `
- ! u+ T) b& p, d' E+ J) }; X4 m
- n$ c' r5 p& x! g4 [ p = Process(target=func, args=(i, array, lock))2 ~% F+ X( g/ L, d* S7 s1 A
5 G( y/ n6 Q! \0 Y* Z
" ~1 |, X+ f; d# [" T
# S3 V7 c1 ? N; b O: E0 v p.start()
, ~9 C) R! o% o8 o) B( _0 B
4 [# O' [9 A$ ~' s
: t2 f" P" u- |6 k: a! F; \9 B
运行结果:
8 b- M/ Z5 _: C4 O# G2 R6 r- i8 h7 H+ M5 }( [
say hi 9
' A, t' ]/ ?6 D( N; `7 w M5 u; E4 J3 _1 G: j! }2 Y r& {9 B
- % W; j# R# z& w' a
5 M/ j9 u0 T \1 u6 O6 Hsay hi 8
( E' H+ x' M& j' z7 F8 g
0 P( ]3 l* u8 W1 V; _/ V3 ~
b4 z) Q3 B+ ]& A+ U' ]
, [; U0 `9 X. \. Zsay hi 7$ F$ ^5 i# y3 |; @, K* d9 P! h
k) X8 _$ |$ K8 U
7 x% t: Y/ o2 @) D3 o# U4 |
2 G" l# H( l$ t" ~& P! d" Dsay hi 6
5 N; d/ C) {4 W+ q( ?( K5 l: g- b
- - p4 K& @. F1 V" N9 p" s
) v0 q, x2 H6 r% D5 Qsay hi 5; r0 g; t% j7 G, Q# B$ R" H( q# \
2 l* {% I% `8 T- h
- ! d0 J6 r/ v6 d0 @9 H
5 `; x, F! D9 Z* N# `* ?say hi 4! M5 {0 f# `! R. _: ]) Z
* j$ K) Z" J, N+ f$ S, Z! v9 T- p1 C
% J# B$ d8 T& {9 O! `! f2 f q4 x0 b R
, k, |4 Q8 N! _, L7 ksay hi 3
7 \' V/ p$ v7 T2 i0 @. ] _
4 l3 A5 Q8 f* E. _1 X! y
+ e& a5 |3 \) T1 V; M8 a) L9 u$ U3 `& u
say hi 2& l7 ^) V% j6 o6 z, r
% S$ D0 D z I+ S8 i* d V1 M- / q4 z7 G R- m( P5 G5 W
* v- h9 U* I6 j! u# _( ]& u$ ^; X% Tsay hi 1* q9 M2 z" R! G+ E
, S% j( e3 |1 w, O8 [" s
* y9 g1 m) X. e; L
6 K. m8 N" O5 _7 V+ asay hi 0 F' Q' B7 q) ~
e7 ]3 i& G( O1 u! h
$ ]" Z4 d5 v( i( H7 n2 [( K 3. 进程池Pool类进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。 比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。 进程池中常用的方法:
: H4 b( D: h/ J' d+ X0 F! @; u. _* {
from multiprocessing import Pool# U% k' d& B1 ?. M, s* T
8 _1 R2 o" _1 o
& k. O: q, ?4 h- n* S3 [% R6 S6 E+ y+ k/ _( x9 S$ P
import time
2 \9 b" B5 |- A) A+ i% e
- F: `8 t6 w3 y$ T! [* G- 3 F$ Y# A, E0 R
. o9 c; h* Q+ G7 K8 r, I
, O9 ^ n+ T J- R7 Z- ^0 P( L' o5 h: R1 C+ _% j9 q1 x
7 ` m& f( x* J* U2 r
8 \0 Y- X$ @" C+ k2 Vdef func(args):
0 C: I; b2 e* Y
' v/ ]) i0 q& ?, |. l& m- 3 Q" l: Y: J8 X8 X: b
3 C9 E9 T' W G, c
time.sleep(1)
& t; A4 }" B6 z& R8 v% W* l" ^0 v8 O$ ?
- * A$ T. A) b0 H! z+ k* @9 S C
: T* b! G, b0 O R8 Y6 b
print("正在执行进程 ", args)
+ H) A+ `) T; u4 [4 U; q0 ]! ?. v7 U! S2 V) V
- 6 U1 M# [1 D2 W, X! s. ]* F
/ u( ?4 x k: ^2 L2 a& a
' ^, U' X7 M: w) }5 v- x4 j+ z% K# q
( m. M$ L( Y# A# M/ q& x1 p7 E9 ]6 w+ j) c4 K# L/ K
if __name__ == '__main__':& i |# \2 r+ U
9 }6 }$ K7 e: z
- 7 s, r$ U* p' ^
, d- e0 e+ w7 v2 X& M. ^% J. i) {. P4 ~1 A+ f9 C3 E0 q% J
* W" H* h d5 {; X; T/ n$ a
- 4 [6 a3 p4 E6 H5 _$ I! D
2 I) ?2 w3 G9 } p = Pool(5) # 创建一个包含5个进程的进程池/ q# F2 c) P( ?
% n u, V7 ^7 \+ c" B
- ( `9 t9 b8 G1 v( K9 A
: n% S9 I2 Y6 P0 T( L
4 f4 R2 A4 Z4 W- e7 D! f' j. ^7 R7 `: l' i( K: m, B
- * X7 I& o5 Q% m
2 ~7 C0 m# y3 @$ n for i in range(30):
* c% f; H1 n2 C) L/ T" M) B# u
! b' B! _* S+ a - ; B+ o- y' R! q4 A: E
9 g7 b( V6 O9 J0 @: q8 p
p.apply_async(func=func, args=(i,))9 U& c- v$ s2 ^% }: v
( w- e; u( `1 L# @* j - & m5 y2 n# p0 q
9 V7 }; O$ y; @# [& v+ `
5 M$ E7 V6 U2 \" r2 F# E1 w+ s% L+ C" x
- % `$ t4 D" X# _' T) L- l7 d
8 l8 |& o3 ~# N/ [, E
p.close() # 等子进程执行完毕后关闭进程池; c( q8 t+ r/ ~2 M& Q" _4 P8 u
1 X/ ?9 X9 l- u: i% U
( p% F1 ]9 m! `
$ u3 }7 u$ X7 p% }% N' b! e. W # time.sleep(2)
" w8 a! O+ a1 R: [* C; k. [* H/ F# @
2 T' {- t6 U7 U+ {7 o B- X# {
9 M: ~7 O% @, o" `) f # p.terminate() # 立刻关闭进程池" f3 R8 x( W& B) _5 S( B: O0 G5 {
9 F3 m( O6 z' { X
- & F! Z) F. R3 ?
; s: ^. C; U g/ Z; n b
p.join()
6 _# B4 t: O& _+ h7 [6 n
7 y7 ]8 n$ a& D4 ?2 Y3 S% H7 R k5 L M2 m! J0 N1 B
2 [. [! Y* ^1 Q. Q- ?) R& }( R- h请继续关注我 . d, y5 p4 b" L
# j4 C. N- ~4 }( e4 z |