数学建模社区-数学中国
标题: 爬虫(七十)多进程multiprocess(六十一) [打印本页]
作者: 杨利霞 时间: 2020-5-31 10:35
标题: 爬虫(七十)多进程multiprocess(六十一)
# K" e; x! j C
爬虫(七十)多进程multiprocess(六十一)* D/ f' x' q) c& K# s
Python中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。
另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。
下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。
# m. J, I7 D& C, h. E
9 I3 m2 a# f" t; A# n7 w- Vimport os
7 [3 R# W2 H% {# J/ X! Z/ \) \; m- v. U: n- I5 K
- ; Y6 p" A3 M4 L7 T* s' S1 f2 e
$ E- f2 G! N W# T Q6 ^import multiprocessing! D+ [5 }+ {4 h9 }4 K* u' Q8 t
' ~$ _1 `2 ^5 K3 A+ H; w
- * L$ r3 G8 h; U% m( D4 h
5 \4 K( a0 Z- _4 n2 d0 |
+ v7 f8 w& _# F- v( d
; I1 |# `/ Y3 Y+ L' Y/ r( m
9 w- ]8 ^& s5 s: n
) e+ L. L9 _$ l$ ^# z: ]def foo(i):
2 Y+ S: z0 E/ }7 f3 X! a+ Z& E, q: J- x) c$ Z
- 7 W) L* \4 @1 E7 V
; T4 F! ?. q9 u1 b k& C # 同样的参数传递方法. R# M' L# K9 A; n; s" l2 W
* q; l7 ?0 p9 n! b/ h0 X$ Q
1 J2 ^& S1 W8 A) E
$ l5 C5 y, F/ q1 S' }& @/ x9 ? print("这里是 ", multiprocessing.current_process().name): j( }8 P5 n7 v* D
7 X/ b; T2 _$ K; O) e9 Z- * k# K9 _3 W& P* [' V4 p
9 N3 ]: Y6 ]* q! o* q8 d print('模块名称:', __name__)6 t9 U# X9 s# L
; c/ b5 l4 B7 y+ }
- 0 u, J3 V! C' z9 X Z! p
8 l' s8 ]3 W$ x; {8 v9 K) \ print('父进程 id:', os.getppid()) # 获取父进程id' U9 l6 u1 W, ~! n. u. |# }9 G
) R" Y- d% z6 ?0 k- X - & {! v) M% X- z: p- X
$ ]7 ^. d+ \- _. }) g print('当前子进程 id:', os.getpid()) # 获取自己的进程id
' k; P2 q7 T$ x
7 l, q! c' n0 N; t) |
+ B- W. ^) h+ R! E7 g/ U( l! b/ u" F! C6 h- a; r' v: ~
print('------------------------')# ~! U" n9 a& i% T7 J; l( Y
& m, J4 b. X* d5 T9 k- w
- $ j4 Z" h+ P! g. E- N
8 D7 A/ m4 n7 p
' g- {/ m' u/ y0 h9 c* N- F0 y( p5 y' v+ W5 V5 g
- 1 g* j/ T$ g9 F; j) I
/ j1 m4 I( P8 }) c9 vif __name__ == '__main__':5 _, i( ~9 ] E; U: \2 D( ^, C
0 d" p; ~, F9 c
3 N: K9 Z! f9 L$ s }$ w+ X; h
" n; y" `; ^. i5 k5 `2 r8 z, \ A: r* e$ q$ Y7 K8 J2 j. z
- f+ i4 o/ D# Q' @. v7 l/ k
; L. {" |+ r7 R @9 b5 D& ~9 K1 X0 _8 c
for i in range(5):
6 s+ }+ f9 {: A
, B0 a5 s; G. z; _+ ~- / q1 } z; x8 x" N
5 w W8 r* }8 g" r! t6 I
p = multiprocessing.Process(target=foo, args=(i,))" p. r- d% I6 t, ]0 z
& G _5 P4 n# V) _6 H - " h2 y) [7 `0 r9 i
$ Z" y; C2 P# w
p.start()/ l: s$ n2 P- V3 G
2 f' r9 }& B' ` k4 r* |, r+ O, D2 b4 p9 g
运行结果:
- . I9 l0 p( e& [% ~, b
9 G, K) I- A5 ^& f
这里是 Process-22 |, N. z1 o7 R
4 H! E: Q% U% Z& C. |" x
- 3 T1 F5 X/ o$ y/ b* o
8 d, X7 u9 s+ O, a1 u7 ~/ U1 f) C模块名称: __mp_main__
- v- w' F% p5 f. `
8 N' ~3 a& ~4 ]3 a! ~& v5 \/ Q1 V
# f; o' y I% X" b1 V5 Q
$ l4 y7 K- I. Y! i4 K! J, |父进程 id: 880
: ]4 A1 G8 ^6 D! W) D( c7 M+ Q' \8 Q# ?* ]) a& m0 W1 f
2 }5 e7 O* ~$ s6 F, ], G2 \8 \ Z0 a& g
当前子进程 id: 5260
9 Y0 V4 }$ p8 ~% F1 h X2 E" {" w& G% P/ P+ w& s
- 4 @" t: ?, z6 U+ n U& [
, E$ c& @3 F8 g$ T% q5 I
--------------( i c; e" E9 \: n: p' Y
% b5 L5 e3 \' N# Y
- " f/ ]' W4 R5 ~% N* ^; x: b
4 s% K7 F4 H) e5 u9 g$ ?& `这里是 Process-3
' e0 [8 ?( r. v/ l" j3 ?/ f
6 X' Z5 `7 H; y3 c! @5 l - / `# i: U( A" c, e- h3 i
: B3 W9 f1 |' d x% ?
模块名称: __mp_main__% r! n7 C& R$ H W5 _$ |; ~( _/ G
2 m9 I- \# w/ o8 U
* X' g: @5 u% ]1 {- B+ T, f; F3 p" b' P2 I% m1 B1 V
父进程 id: 880
6 Z: s: L% F; |9 k# l; Z2 k, v3 l, m- o) M( U
- 5 ?, ~) ?( {6 H( H) u: |
" q U0 w1 W( k* y* O5 `8 Q: E% }当前子进程 id: 4912% M% X7 f+ ]9 p4 y) o" h
/ m) i! h9 }3 K0 h
- / F9 q. O2 M& c8 i4 ]) J @
$ P6 K8 o; x' Q4 {
--------------
' b3 J2 K6 y6 `8 m* L& e1 j
, W( c3 Y9 O* X9 H - 9 }4 @$ p' y9 h$ s( C$ k% f0 g; H
; a" P! V- q0 W$ y' j$ e- `这里是 Process-45 H2 O- i) s* w# E
; H2 O$ l$ X/ G9 f, D+ O R - ) c/ D, q1 A9 `4 x
: N8 X: C9 H$ { [2 I1 a
模块名称: __mp_main__! `) l' B* M' b3 E
& M; d5 F$ f* \% @* y& [9 o1 h3 h
3 u) j5 V$ W& _
% b$ a7 W# T# S0 u) |1 l父进程 id: 880! @" {' {4 i( j/ q
* a/ @2 c: G3 B
- 0 {3 J7 \. L8 c3 h3 I1 F: A, o
% ^' ~& t; k4 A+ f1 V1 W
当前子进程 id: 5176
6 |4 ]* h0 Q& j4 G. v7 l& d4 `9 M" T
. n6 h( e( W E. g( @% ]
" w' x# h9 ]! w: v7 N' R( ?# D--------------1 s# Q2 H: Q6 u1 l2 F) R- a. y# |
( T0 [0 W* P/ G2 C; z( x4 G- c0 G3 Q- % Z9 K3 X- ]) |4 }- @ ?
6 N! N3 P9 q0 s这里是 Process-1
& d7 D' v3 Y+ u$ r4 f. w& t1 e+ J k& H+ M" C# X$ L
- ; n R7 W& f1 E" |, a
1 r! R1 r6 l; c4 T5 H模块名称: __mp_main__5 j2 `, \: e4 p. F$ ?' w
6 f: s) X5 E& N+ c9 N. d0 J - : y$ B! g2 P9 }( J+ {5 {
+ H, a- s: u) ~
父进程 id: 880) @4 f$ d% t# o' A
6 h! _9 E& u+ e5 z
2 V" d, p% r5 j) X
4 U) i, F% V$ \* i! m当前子进程 id: 5380
( n% v* S# \. A2 V( m3 K+ F
, M0 R2 b# b' P0 Z4 p- : z, h7 T4 I0 ^4 R/ ^8 F+ |# |7 X
- X |7 l H6 e; }( N3 O8 n--------------
( S+ U2 p( n/ _+ [% x; M/ U3 J: D4 K* S/ n. N c; z
4 q6 m& s/ _: t t' I+ z8 l
1 N( u0 J9 r" M; q这里是 Process-5* M+ _4 A7 L7 I# R, S) A
8 M! f! E3 R* S' U! L( K
) C+ F! Z9 ~" q2 ?: w
% \ h1 D! n! \! @0 h+ P/ `. d模块名称: __mp_main__
# B; R$ b# r0 [' X0 M1 q
) e( k" t9 X4 U _+ `5 ^
5 { D' W D. r6 E. u* k ?: u9 K2 g& n1 a T8 b
父进程 id: 880+ T# S. C8 ^+ A! ]) P7 s2 Y
* [4 K: g8 q$ u: }: z' V; s0 w
- 9 n! A4 _1 M& `7 x7 b5 I
" O) [: g9 E( L/ y' v& C
当前子进程 id: 3520
. N* b, ^' v/ b% S" @. c
% ]( a1 n0 {+ X
* f5 e% K6 d# I9 f- ?8 |0 w% d d1 t) z
--------------
& n) L. s- x4 \+ ^4 H; a
0 S& t; N3 \7 h/ H7 \( c( B: A/ {& |% ~: V5 ?8 F. D/ E
1. 进程间的数据共享在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。
创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。
下面我们尝试用一个全局列表来实现进程间的数据共享:
6 \! i5 B7 J; y! N- p% P. ?4 P( S
from multiprocessing import Process5 J$ }6 ?5 y0 _$ d- b
4 W; G! v. h t1 H$ v
# D3 l0 {4 u9 R5 J
( a+ S6 z; }% W) k
( V$ ]5 B& G* {8 \4 N' T' @0 t4 u
* I. s- A6 T/ r% v6 i, M! N9 v& S* b
" _2 b$ y8 b4 [) @( v2 \; o
9 d( ^6 g' K1 i" A% Xlis = []
/ Z& j5 R: s& N
! L3 P/ M0 @; }4 T, g
; J- n; g6 m5 F) w$ _( ^
2 @: t3 ~) O1 k' [8 R; n' W# `
+ f0 |' L$ t7 O. H4 z1 X
0 r& B4 \; C6 z: J6 W* ~4 x5 z- ) r- ?) Y2 {1 M* L3 z( G
0 }0 l; E/ p4 @3 ^% _3 h1 ndef foo(i):
; f5 a- g& k7 V& y1 X x7 I' u* y& r7 T K) v
) e5 W: B8 H1 Y% C" ~0 \% y: D0 T# {) c9 d4 J* g
lis.append(i)
' H w* o0 b% H. v" ^4 e
: Z8 e: W7 Z; J4 v
$ S: h% L4 @6 k7 `( @5 U- q8 u5 E$ A Q, [6 G
print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis))0 e1 ]% P0 I: k2 g/ u) S9 f/ F8 v
$ }5 M, \7 W- ^ Z. w6 e+ u" M
- 4 K5 x+ g' k2 a- a! j- C, g
) H; j" `9 u9 B0 `
3 E. t2 h# X' t" C
" o! _0 u. R$ l7 T0 U8 k
' E& |- j* I' |/ P$ P- D D" r9 _) _, e' d
if __name__ == '__main__':
% k/ D* p/ z+ P S$ \5 a8 W0 a3 T* P8 m0 \# s! s6 O
% c* b" H, F8 ^) {4 \0 Q9 c9 _5 K' u8 y. f4 C/ z, D; X
for i in range(5):6 ]$ O+ M# \$ ? N
1 R5 U; N2 y3 G" g2 S- % _0 W8 o& N" \; ^8 A+ {! h
9 R8 c y( @$ P# P
p = Process(target=foo, args=(i,))6 \8 b! ^! Z4 X) B4 m6 ?* S
; X. ] P' i8 ]
' m+ M3 n* _0 y( }
) o0 m4 u2 k, d3 F1 |$ }3 J% a: U p.start()7 l1 c1 M6 R6 E( O2 Q$ m2 T5 n
" L$ _* Q+ e* Q2 W, A- ; I9 S4 g* H$ q' p+ R+ ~
3 x5 V& g# W4 h: @1 l2 A/ g% O, f
print("The end of list_1:", lis)
; p0 \( F( \: g' ^" s& i+ K1 t, |2 w; \/ x4 b
5 z4 y, G7 @" Q8 E4 c, z
运行结果:
1 B& w" a0 l+ [2 W+ d+ D! Z# c! z* H6 {, N+ K
The end of list_1: []7 X H2 q8 B! v- @* m
: o; I Y' n" G" u# A
( _' O/ k7 q1 R/ G. @" r5 x. \# e4 l8 z# h
This is Process 2 and lis is [2] and lis.address is 40356744% l" m' [5 k& o& p- G0 S0 M
4 v. b" f7 X5 i$ o1 i
- 7 v3 [% V' x, {% |/ V# J
, Y! y8 }) `* ~ R+ k
This is Process 1 and lis is [1] and lis.address is 40291208; E( V6 K& Q o4 h* x0 ?" j+ M
0 r; f |/ R7 V( `
* ~4 g" l' {7 M% v; N; P5 ^. l4 x& s9 t4 F& w
This is Process 0 and lis is [0] and lis.address is 40291208% I! D3 A& u% [5 @% |" ~5 q# b
/ C. }: f$ i; ^) C6 R6 H- 3 u9 u% q: d# }3 \% Z& m* M
2 F0 W! |/ U- N6 x: K
This is Process 3 and lis is [3] and lis.address is 402256721 q. S- ~2 m+ s6 ^
4 a0 Q! `8 [8 {
~3 b7 J' \- C/ k2 m% k. V) y
$ ]3 t$ d& T. bThis is Process 4 and lis is [4] and lis.address is 40291208( m/ R' ]4 r# h7 G
% L0 s! f+ N. {6 x$ X7 K3 \/ q# n& v
( _$ ?8 K7 a$ ?: O
可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。
想要在进程之间进行数据共享可以使用Queues、Array和Manager这三个multiprocess模块提供的类。
1.1 使用Array共享数据对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系:
- ) J3 E8 [5 `! ]1 X9 d
! Q7 E2 }4 R: g( G- p2 a
'c': ctypes.c_char, 'u': ctypes.c_wchar,- [6 L0 s: M# L1 s8 U& f/ x
& W1 f" L& x b% K
* ]) D; N0 r4 R, P! z. F5 j! | k7 o5 L5 g! F9 q/ }
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
0 @8 L& }" p5 `/ m$ E0 W3 b2 k( ?
8 t G+ U \* o
3 w1 h6 r$ g F S; V( y5 @5 d- ]! w
. `' f9 s- r1 k$ Q/ S' f'h': ctypes.c_short, 'H': ctypes.c_ushort,
! Y& G. ~$ b/ f6 c) F0 M
! ^) }; q4 j& N- 9 `4 ~( _5 m0 q0 z7 A6 `4 ^$ z+ t6 x
6 R* ?# {* d( t/ r! T$ g' R
'i': ctypes.c_int, 'I': ctypes.c_uint,0 H, j# e; x" L7 t* Q) }
: ]% \4 B: L- ^" ^- S8 f! ~
- , I4 J) ~* c+ v! g
6 G( i% z2 s' D- p: r
'l': ctypes.c_long, 'L': ctypes.c_ulong,
6 B/ G# X: g. N7 v8 m8 ?& L1 `9 J' G3 Z, b. S
' N# M# \* |0 P4 ]$ Z J0 Y! s% J3 g+ d& H( c. z) L2 U
'f': ctypes.c_float, 'd': ctypes.c_double
1 v- h- V9 A0 N6 n' Z( N, ^$ ]& M" P* o9 g
) A6 H& g& V R! r3 x
看下面的例子:
- 5 X! c9 e9 e' Z3 u- _1 z
0 w" D, P n& q/ d9 Y2 yfrom multiprocessing import Process
3 S( _, u) T- C+ |) e8 j# r e P/ }
|2 b. n" d8 W+ k6 ~8 a
3 h5 c( l( h, T- f6 U& E- B' B/ D
from multiprocessing import Array* @+ I' t, @* M) Z8 s4 k4 r
1 v9 b$ `( K2 g; c T
4 r5 G8 D8 t( X9 f. k2 i, } K6 z* X) a1 K
! v( T; G7 ?, o% m( L
- V9 e7 B# e" r: O& n
4 @7 |- B6 I( \
0 b+ {8 ~" U9 Cdef func(i,temp):. [4 [# e. O1 Y5 P: w
& K) z5 o3 N: ?( p3 w j" L' B
6 P; H" K; X9 m
7 e) o+ M# t" d% T o, z temp[0] += 1000 R1 o, R8 o+ J! ~3 A! H; S& P
, h* d. @$ I% {" h! E- F& P8 r9 w
7 x& h7 B' u0 U: e D9 v, w0 g) G( @. e# K
print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0])
4 ~4 |1 I, p) c5 ?6 ~9 [
; j9 M+ R# f" z! M- 2 ]6 G1 U# V( y
/ R9 f, {# ]) @: _$ T" O
! {1 v! B# X" u
# ~$ G. r. `: E$ ?8 S; y
' L: a) F- }/ ]1 U1 u2 Q% C/ r: r- M% ]# y
if __name__ == '__main__':
% O% \7 N, a0 N4 R. N3 V
d. O' Z2 q* W: S! P
( j5 \5 L6 L1 v+ H4 `* g! R9 R8 j/ T0 Q- K
temp = Array('i', [1, 2, 3, 4])8 ]1 E! {, x# q; a
8 j+ a0 l* _' @ q) _
" A" h/ |9 D# R" S5 u2 k# w7 c- }" K! T' r# K* v
for i in range(10):
' Y2 i$ x0 t1 u5 ]( G1 k; s4 W$ I
* R/ A7 h2 y) x& }' P3 ^" c; i! W4 m p' |$ R
p = Process(target=func, args=(i, temp))0 P6 D& u! F' T: ^4 D' G
. K8 J2 R% O0 e0 W f0 F
7 I( t" G0 D& w% V( I; e
2 _! T, K0 E; `3 l p.start()
0 N6 P& H- Y4 N' s+ Z
9 Q+ r, L0 w6 M4 V& o$ t3 r: G! u5 e, L' L% `/ |
运行结果:
5 U: l0 z4 d! E6 q$ `( }* |+ ?" a/ b7 [0 n! T) O
进程2 修改数组第一个元素后-----> 101$ r. R- @# e% V
' }) Y2 D7 y5 C2 i4 T H
) m. D3 q. G5 |8 T/ }6 N' r4 @4 l9 q- l: U! {/ ^$ |: e3 u$ K7 e7 E: D
进程4 修改数组第一个元素后-----> 201
- \2 |, G- c3 v3 _! y6 i' G4 s. e! G6 L# I9 L
- 7 G. z! w, X* _: P4 b
0 Q) G* Q7 S' {! N* Q& C7 {
进程5 修改数组第一个元素后-----> 3011 a% D& K& ?9 _3 z
" f) H9 p. O' r2 ~- ^8 O: ?$ J - , q% d% b* ~* k
8 w; Q i' ?' i, D
进程3 修改数组第一个元素后-----> 401
- q& ~2 v" r0 u0 u8 d5 a+ f1 q; j
- ) \# g# ~. s8 q! j4 q
$ _. K" ?1 ]2 H. r" j
进程1 修改数组第一个元素后-----> 501
, c* B5 ~- U% e9 c i6 _1 L: E1 N4 ^/ `) G% B, D i; P
- " ^& [' f' a' T# x6 i3 o
! b( {4 D$ _ b9 W: E
进程6 修改数组第一个元素后-----> 601
2 W+ |+ Z2 ^4 K' m* o5 E
' t2 |8 g7 x$ m' E - 7 M1 w$ W9 D1 Y; {7 r1 p
" {( ]$ m# c* d# A
进程9 修改数组第一个元素后-----> 701
' L) \1 W; e" \5 e* w, ]' M
5 P3 u& I3 h; g# H& [
6 D) \6 S5 A/ }( S" T6 A
6 S* p7 a- z7 n! R进程8 修改数组第一个元素后-----> 801, v! v# w, e3 Z" g" T
" X5 m) n" [1 k# R6 F# ^' g
/ H6 i1 J# Y. b; w
! l, w1 j4 A$ b* c# U( b进程0 修改数组第一个元素后-----> 901
2 w( e4 p; W" c4 c$ @; r* M: s
V: [1 c' O( X- 0 ^, }$ w, s1 L8 u! _1 M$ H; k
7 G$ s* k/ C( C2 }4 q' Y1 q
进程7 修改数组第一个元素后-----> 1001& ], E l q, e# P
: I% k M. |7 H4 \ Z2 @: q
) m3 J: V/ h" k& `2 G
1.2 使用Manager共享数据通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。
- 5 E* @' x! z7 X- A
2 K$ i7 E3 G* o8 cfrom multiprocessing import Process6 ] D5 d5 r$ I/ P5 I& t; ^
9 r, L& }3 M+ v+ N
- 2 e; |' O/ x6 {% o" J
+ c. g# ~! c V: c3 K. T) [: `) ^& afrom multiprocessing import Manager
# g [. F1 n1 q/ o" q! T. w$ Z# Y/ O. C" ]" I
- ' q/ @8 \3 H! k" U: d% l
" ^5 { Y/ P, W4 H; `. U
, I! n" Z$ T9 g9 `0 e, G/ e# D6 [& G1 R! \
- ' u' _3 s) k( W: Z1 K
# a9 g$ n/ [" f3 ^) Fdef func(i, dic):1 v/ a9 N4 {; X" F4 Y/ h
8 l+ X, u2 R" ?8 j
. j9 ^$ v" c- B. x' }. Q! e1 w& ^6 a$ K, @$ r9 J
dic["num"] = 100+i! q7 H s; `5 B& l3 w; P
7 X: ^# ^; I/ U8 Y- ! K0 I0 }) a4 Q
r, J, }+ N1 g0 L+ ]5 @6 ? print(dic.items())
5 ^3 R$ X# `, [; W* Q Q
) }% j4 j1 [/ b - 0 k) Z( l4 X# C' `0 m
+ Z- E, A5 ~ v$ q/ o
. L0 T4 P: G% C- N- Z9 F' f: C; X3 {: g E8 r' {
v2 p: K2 ^8 G9 \ U7 o& h7 f
* d' h, d3 r( i1 d& M, P& N* e( wif __name__ == '__main__':9 v5 q, x. m) B/ `$ F- f- l @
1 @# o$ N0 E3 @2 e6 {# b- $ O+ @4 E) d2 b$ o. N
' u/ k$ g8 v7 a$ o, P3 ], k dic = Manager().dict()' }- B/ ?' n% j4 ^
# x# P3 c6 ~! x0 p$ L) O' F
- ' q1 `" P9 S: g. e+ ^! {1 z( F* k2 t& g
: u, l3 W: g2 s: I for i in range(10):
0 W2 K/ x* x; j" [+ F+ g+ r T% x3 f3 P2 Z; S0 H* Q: ~# p% ]
a" W: i# {2 E( Q2 d6 D4 _+ M8 {# y% y' l9 W" a& s, v% ^* C
p = Process(target=func, args=(i, dic))
6 h7 J0 L( H, ?+ `5 s. t4 A3 K1 F( T2 [0 X" \1 ?
; n, S/ _" i g, n2 \$ B- G/ _0 C$ Q {8 I% v8 b6 }5 d
p.start(), l8 q- n3 Z- U4 \$ R
/ T0 a! Q$ a# W) i
- 3 D* \2 r D% X
z* d+ Q _0 M8 _& t, R
p.join()
, n ]2 ]7 y! N; K! V- Z
0 O* m0 Z X, A! C% w3 J
& D* f1 C$ x& q4 s
运行结果:
- 4 c2 ^& }* l& N2 |' r5 h
/ c8 |/ k7 S/ P/ D' a- Y: |1 [& ?[('num', 100)]
7 M3 y/ I% O! J* o, u0 s+ _2 O0 z; J, P" P- s" r
- + a$ j- M" |" A# q+ r/ y6 K
, D* z6 v ^! R1 N' v; q[('num', 101)]- k1 L# r2 X, X' m" n$ S9 y+ F" q5 a
0 c+ K" R1 F- z) k3 H% g- b
) d8 F+ A H0 P3 A
1 M8 G) L7 f% V2 u[('num', 102)]
: x( d+ e# U5 R) F1 z: V) g# S1 {9 K3 B" q, c' h* [
- * d+ d. |- b; V5 b' E+ _1 j( Z" K9 G
0 B7 M2 C3 {* N, |& `[('num', 103)]5 x6 J& O( T" {7 y* z+ }
2 t; Y; A3 z. X* j' C
- $ T) q& _4 W- M% e
M- |% E5 J" W+ S7 N& f[('num', 104)]$ Y* X8 H, W4 S. b0 U ~, e
/ D! k, N, C( T9 F& k0 m4 { - ( T8 @- i) l [- [
2 r1 f) y! B- Q/ N[('num', 105)]5 D4 s& b& ^8 f* r
! p# a! x. N# q9 I, J" C
' Q' G( ^6 I3 O, Z n- s2 c
9 ?! A- L; \ t, y[('num', 106)]% J: |+ u0 V; y2 h! \9 `7 E2 t
3 u! e9 S [, M/ u: `( l8 l' I2 G
- ( z; b9 ]" o# X! [* X5 h- X
5 p. ]" F& O2 E9 g7 r[('num', 107)]
" @* {0 Z( T* c* P/ v: X# [) F, \3 J3 X. K, w* m+ q7 G' ~
9 m2 k5 e, C1 v* F, b$ ?% L. t: g" o
+ U( c' r! }: D) ?' K[('num', 108)]
5 N9 j% A$ h# @/ F3 t1 i4 ]. V) [% I" K: G. \
- 3 m" e8 U) ~" L% E: ?; k
$ Q! f7 b4 S/ e8 [
[('num', 109)]8 i+ s/ g3 s5 v& T _0 L
" _' l9 N/ Z: P& i9 ?: J$ A3 h! @% a" A! ?- q' {- Q- {+ [ r
1.3 使用queues的Queue类共享数据multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示:
6 h! h6 O+ Y3 I% I* v
% G+ d2 j* F9 @" R Eimport multiprocessing
5 Y1 t; @0 ^+ @- e9 D" u! U# ]. n' ]. Y- ^' d
- " e' ]- D) b' W8 H3 j; K
( R* R9 a' d# h2 w; a& j
from multiprocessing import Process9 b ]9 I1 \2 C$ B" _) ? ]1 Y
9 z, \/ E/ F% d
+ {+ Z( m1 D# b+ |( e1 H/ @
" c7 J- _* V8 }6 D0 {/ wfrom multiprocessing import queues5 M: n4 |% J' O1 a5 j
% I) Q' k# j; m+ I- U
- & v9 i, r ~! B8 s" z- q9 i: U
: Z$ E# I+ {: C: }& Z$ @
4 o( h# S( j* w' t$ [/ I& s- g* m
) }" I5 |9 o0 t" G
- B0 T- z0 A- W5 o# x& e. y" \" q: c1 P G( x: b6 D
def func(i, q):- U* L) L7 f V1 G, n
. E% Q8 d P$ M, y- $ f4 i" O" ~7 U6 o" V; ?6 O
9 M% o8 D! `5 o: S% V# Y3 J( b J$ x ret = q.get()
7 S+ b& R: _; d( \: u# j5 _" Q1 K: Q V; R' Q' d0 M3 m
- 9 \* p- q8 Z( s
2 d4 s/ {8 R* z% _' F7 N print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i))
: u3 k% G( ?9 |* h( { A6 \1 i! E- C2 A
" j9 R1 |1 @% K9 |% k/ |8 E
' Y1 L3 s; E0 D/ j& k q.put(i)' V- }* y- |; }2 J+ K$ S
/ I9 x6 @2 V: x( ?0 U- [, {- % Q" K; q0 {5 q5 N
! p) [) v3 R6 S
0 Z- i/ u6 a$ E7 h
/ b4 F# M- ]1 \. k1 r/ c
u0 h, w% a& ^- V: T/ n- C* ?9 b# h% L; S- U; G4 ^
if __name__ == "__main__":
; e/ {* Q6 t+ i' X1 X! r
$ D( i `4 T9 T4 u) k9 n
# ]2 u5 r' [# V
* k( o. u: M+ w- A lis = queues.Queue(20, ctx=multiprocessing)
& g9 L; v1 @$ ^5 J' ]' S. L+ U6 V/ U4 N$ }: k, r3 _
" s. Z+ F7 g/ _6 j& `/ M0 t
; O. g. C8 Y! Z8 C' y" T lis.put(0)3 o4 h- Q- U3 b& {- c' s" h8 b2 S; F
0 O# V4 y b2 P. C- , v' p G9 W3 k6 G- [3 f
, z/ O+ W" t/ F$ G9 k, X# @$ b. h7 r7 K for i in range(10):
- }+ @ H- B, J0 T9 e1 ]7 w% ^- ~3 o, x! q
- D3 o% A# E1 B) A& U+ @
% `$ v2 e% o: R p = Process(target=func, args=(i, lis,))
T1 U! g% D; p! x4 X W% K. f# Z
' |) c2 n, ?; e1 A0 V+ x6 [
U3 \: @0 H7 k; k# E) V/ t1 c {$ {; |: Z" e8 M
p.start()$ F) I9 r, A. ?3 Z
$ F# l0 F# x D; L8 S6 }5 K/ B/ u" [9 c
运行结果:
- * C( @2 l E. ~% K. j7 S6 ~
' b2 H9 ~; ~) X$ E6 u* `
进程1从队列里获取了一个0,然后又向队列里放入了一个19 @1 t6 z- C$ e
2 C; z9 H7 I* p9 ]
# n) S4 [ E% x8 p" ]: n+ X7 E2 m4 u/ q( V( B
进程4从队列里获取了一个1,然后又向队列里放入了一个4! L6 K2 c) j/ @2 q$ @
5 E9 S8 J3 l: u- * F* W, p% r9 W* I. F5 z3 p
3 c1 m% E& N- S9 p. [3 h; G) y
进程2从队列里获取了一个4,然后又向队列里放入了一个2
$ ^: s. K4 W! a; g2 v5 u2 d
/ T$ @/ v7 s- ]# A0 P$ h
8 l; s) A3 p1 p. c$ T" o" A; D, A. @1 \9 A6 H! {
进程6从队列里获取了一个2,然后又向队列里放入了一个6) ?% [0 x" B/ h! T% \
1 e8 [' y+ z' F) S/ W. _; g" ^
' ^1 D, z+ Z3 G5 A, D8 Y( Y
: X9 g) h3 h9 w1 |$ }0 ^4 @. ~进程0从队列里获取了一个6,然后又向队列里放入了一个0
+ U0 H, Z# V+ B- }
0 c5 P' M( y1 n! T" k/ X1 @
, p& _7 c4 \8 k; W- [( e% Q, r8 e9 D
; U1 F' c' g! N/ c8 l" o" R进程5从队列里获取了一个0,然后又向队列里放入了一个51 S; d% K& W( D
! X2 J4 j) g' G$ ]6 w% C
3 N) x: c7 t! `9 F3 q
0 f; |$ t3 c' h6 q# J/ X# Y进程9从队列里获取了一个5,然后又向队列里放入了一个9
+ ]& M# r" d. r, l0 } F" J% g+ @
% n4 y$ ~5 c7 R9 Q& O9 F; z- 7 O% @; N! V$ O
% M% i. o- m; z) V6 E
进程7从队列里获取了一个9,然后又向队列里放入了一个7( Q8 t, ~) G4 G* C& z& R& ?
- H9 Z& `, z- `% K4 }) m' n - 0 ?! I' x7 k& w; F) F7 k4 L7 ~
6 _* }9 h8 P- z$ P6 W进程3从队列里获取了一个7,然后又向队列里放入了一个3) f9 Y7 h; R5 J# d: s: k
1 L9 V! X% j4 \ - " h! p4 c" M% i! @" c5 T3 ~
$ {& Z1 x9 [/ j4 B
进程8从队列里获取了一个3,然后又向队列里放入了一个8
( I# t8 O/ v2 y' t; J- I( F9 n) ?' p9 s& l: u
' D1 T6 g V# X& n+ `/ K
关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。
2. 进程锁为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的,这一点非常友好!
- 4 a* Z8 _' N+ ^2 x$ N; u
1 H, F, T9 I' x8 a% Lfrom multiprocessing import Process
" J& V/ g6 l' x8 m8 ?4 ~8 x% F* t0 ~1 p+ T. W) L) ?) G& n% H' S
- 0 z' ?5 H7 C: ~& ]2 _$ C+ K
7 t4 |' Q7 z* C% o" O$ ^3 y/ v
from multiprocessing import Array; |2 \$ j* Q# d/ @4 C, C* G" l
% f: k) \+ b+ @
$ I2 j$ [' Q1 h2 F% l* H* r3 _# q5 \! q: @( z+ W
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
# ^2 h/ D# p# g; E: T! b
; c# z( i% ~% [9 W- $ f. {+ q( \; c3 W+ U
* a& m. y0 j- ~- {import time9 H2 K; e9 u% s, |
' H; u% R$ N: s6 s+ r: K5 y
1 E) I u5 P, N" T) h" _: Q4 K" S
( [# X1 ~2 J: @; P3 N4 l# o; i2 ` K# @0 [5 v4 V* L
! d2 }" q0 N8 K& H
( Q4 {4 U( }2 {( @0 t3 x
7 P/ F' w( k: \- idef func(i,lis,lc):
) O. K2 K# ~& B: A
6 _) Q' F1 L W9 \; ]+ l7 b# M) S
( }# k$ K; u$ G4 O4 A( {3 f! {1 E
lc.acquire()# q" V" v: I E7 H1 @1 o
2 G4 M$ z; h; ^. C% ~ k- u
+ V. x7 _. E4 z
6 Z- A9 A. l9 ]& p6 `6 t w4 d lis[0] = lis[0] - 15 l' z; X, w% U# m' S" o: V: C
1 t# C2 F$ x6 a
( D" W' x& J6 p5 B" w1 I) D0 n; |1 Z1 T7 N- |( k
time.sleep(1)
) s, `' N: I9 {/ i
! e/ C1 r% }" h" q- " K% \0 H. k5 Z5 E ?/ ?6 p
9 Y2 D* n0 e1 H* L, ?
print('say hi', lis[0])4 P6 L# y5 D N! i+ X& k
' N" s0 J6 q3 ]& g6 S# ^6 Z
, s# Z9 q/ k# ~& B/ q6 j* [/ r% B4 P& B
lc.release()
/ \, `( L7 W$ r- B1 r# D" n5 J+ P, v' T0 e
' D4 A, A$ V4 K# O- w) f
/ Y' n1 b; R2 i. x _
% s5 h" B4 t6 F3 S9 e8 ^% f$ T0 A) w; |
! F; z7 h2 g/ H1 M4 t) [2 U" P: i" z% ~
if __name__ == "__main__":" U; O6 A0 `. e7 x9 W
: s; ?! ?1 q) ~
9 d: b& s2 d W' O8 ^! C$ B# D) h" e0 w$ B) P7 t% B
array = Array('i', 1)
5 n# B% Q$ o7 v2 y% f2 r$ {' V+ ~0 X; g1 t6 |- F
2 P: [. X, D% P" A; Y, j' V, q
0 G! L" o4 j' I! [7 Q: B array[0] = 10
: h0 M; |' I. a; P
7 s4 T: o1 q, E" g& z- ! U+ g' M* ?/ r" [2 |/ N. o
3 h9 H# U2 ? T7 a; J5 ~9 [
lock = RLock()
5 V. N* o) m D6 u6 o# T; t( P
9 F' H1 \; O) b
3 u& h w/ I( S) l8 ^% B/ P% ~; [* v1 m3 e" @0 I
for i in range(10):
8 {2 M0 W3 ~3 S! _4 u7 P4 I. G0 w) k; Q- x$ C0 N
- 8 v ^7 i4 P9 `
# o8 `: J% @$ E& T1 C* L' m p = Process(target=func, args=(i, array, lock)). P" U; b4 ]& G1 h! G
( J5 M) k+ O3 [3 E - 8 B* }( N( p W5 H, o
, E+ U- P; W; J" d& P# w p.start()
: t/ n2 s. H, p
$ x- S! W% o! @
: O( n* \! _8 o0 V- }" d9 f$ F; E
运行结果:
- 8 A2 \& {2 c' c6 e0 B3 H4 N( s1 H
1 d( n9 d" b; @# b( \: P6 l( a
say hi 9; |7 L: p) u/ N5 e6 D# N
+ Z& z% d# n/ u. S t
- % P5 U' b8 b' t. C* ~
; b$ x9 {$ C: g. Y/ W8 z) O3 v( d
say hi 8
! ^# B7 |( S' ^8 j, v+ o) v0 w0 x$ u% k
- $ S# {+ y1 q1 J$ N: S4 F0 G
" E, w& h' u8 _: @( I
say hi 79 m3 {4 O' X9 D1 E
2 a5 j6 p: c1 U% `# V/ a$ V - ; s5 a0 h X) y* n0 q8 E: P
8 G8 C$ j$ }5 p# Esay hi 66 [3 K3 c6 \; i4 r
; a( L) F5 ]( v3 e8 i0 P3 j
' R3 Q, v) v* F; T1 `) P1 E
, K( ?# P: L* q% Q# }+ Osay hi 5; M3 l- }+ R0 m7 n
# u# V3 f) P; {! | |& z5 T
- . }* |( E ] \( z
5 j( |$ L2 F/ p
say hi 43 U# @3 R" v5 @. m5 n" m& B
t" t8 g! |7 S, k+ _2 j) V - $ M6 @! S0 }8 |' I
; F$ A% o% O7 w# m9 l
say hi 3
1 b2 e0 e( r2 a4 x" M$ r+ j) ?/ S- X
. f9 j6 a1 W& |6 s0 b
2 c5 S- r9 v0 c3 G% D1 B4 ~1 _
8 ~3 E- p, X- p: { {, `say hi 2
( W% g3 A) m6 ~ [1 W
* ^5 U" U0 H4 D1 ~( N3 |- B- ~5 o- j, w( @ H
5 u& r& V7 G' Qsay hi 1" m9 Z: T- L1 l7 ^, o8 l1 A
; m& m+ r0 k& n1 D h' t. H
x0 \. o' {% F) |) B# ?& @
4 S& r" w- v' \8 `say hi 0
9 m" L' q" N; Z8 z, y3 X
/ h J% w: l' f
1 j* t6 i- X2 Y& v+ z
3. 进程池Pool类进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。
比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中常用的方法:
$ H! }" p( b- C
- i7 y, b5 N8 z) `from multiprocessing import Pool
6 u& O3 X2 N1 s3 B( V B2 t2 `$ M
* Y, A3 M4 f4 O' T. C
) Y5 ?4 e2 [# O) S! ?$ ^
1 S# D5 g J8 h* W( {import time
* \0 }4 z4 k0 D: z: q+ B6 m5 e+ y$ A; l3 {% R* h, r' c
- 5 [4 Y$ ]+ x( ~5 Z* N2 d
1 I# l& C9 I9 W1 `; M# d( @4 K! f
; S. |% y6 ?" ^8 x/ H
; M8 o/ G9 o \; q - 1 a' c; A+ ^3 G2 r
5 L1 Q, t& N% ?8 l! o0 m# N$ t# K6 ~def func(args):
7 A/ `6 T6 X" n0 i7 S0 E7 E% c2 }' p5 h5 o% W- J6 r
2 Q" c7 {# X: V3 e/ d, K
, w" o9 F. z' W( q& v* N! y time.sleep(1)* l+ U( p5 ]* Y4 a- W7 w; X. |
" ]+ \0 w4 b9 ~# O. k7 ^& ~ [
- B" w' j0 N' h2 Q% i! n
) o) o/ }* g1 C j. M5 E
print("正在执行进程 ", args)1 Z9 G, S' x' f$ h. e
0 a+ a4 {0 b0 o - - H8 ?7 w9 ]8 V W ?( z
0 M3 w) z8 P- I, _* O+ R# ^6 E
. r2 s* p" e1 }! s
' f' @# Y( y! j1 X: h# y, J - 9 i9 C/ c, L! v( [
3 [8 \9 m* [8 N. y6 _
if __name__ == '__main__':) j7 `8 J- ]8 }
. R1 _. T6 f- o" K/ ^ - : W% g% u9 z: y7 z5 h) g, k
Q9 @4 r7 g3 n) v3 n: N$ S0 o" t
% [1 n6 W( O/ q, l. f# P
/ L5 a4 v5 I$ E9 f3 g- a
% i# {, G4 K$ h, a1 b' t1 W! |3 R% L7 Y3 W6 R
p = Pool(5) # 创建一个包含5个进程的进程池! Z: `4 z8 M7 e c' G, N
# p3 v* I0 E4 L R. @ @' M
- 4 X, L+ ]" S& n1 k: L: ]% L+ O
5 |* U5 O# c9 q( x& M. p; a9 |6 @. L
7 P4 W4 l% L. `! A9 y6 E6 c
/ P( Y6 K% f: [7 f0 N( A F3 u, J% K X0 w' [, Z+ @
for i in range(30):# C" y0 V! j2 b! V' G
5 Z1 Z2 | M7 r
- - F+ a" ?9 X f9 o6 J8 _' A+ e
* F! s+ o. f9 W6 N8 p, v5 g" D3 E p.apply_async(func=func, args=(i,))' z( n5 y* l. O e, `, K
0 j. B& S" v& R0 y
! K/ A$ Z: `" X1 Z' ]# Y- g4 l* r b% [2 _9 {! w
/ D: b' x# {( g
3 n! J2 N" I0 r) k6 s- $ N( E2 G* Q# v- M3 i
1 ^& Z9 F& {$ M$ ` p.close() # 等子进程执行完毕后关闭进程池6 S$ V" d/ l: G6 F/ X# H1 R) R
# E4 L* F$ |( @) p' D" p# Q
* Y7 s) S6 M9 z# m# z8 m1 B w4 L, u. z/ z9 W
# time.sleep(2)+ r" i4 Y0 T( i5 ] n' |
7 k/ h# V1 X" c) z j5 ~/ W$ x
+ ?! @1 f3 |/ w7 X" E" L8 R8 m7 D2 ]% ~- B
# p.terminate() # 立刻关闭进程池
* |% P4 w9 N7 X" G) l! Q% S$ b% G. a! G/ Y+ ~& ~- Q; G! |
- 2 p# B, C2 {9 c" C1 H& \& y! n
& B( T/ a% l0 s2 g' j; g [/ m+ ~
p.join()" Q/ w+ k3 S2 ^* T7 R! ^
' C3 l* X6 K% X/ _5 y+ p
0 Y6 d( w2 H" w6 y4 A2 e, _
% z! W. B/ L3 C f- Y( q$ x请继续关注我
# o# K* u: G+ q- L! @& {( ^' ~
+ C# h2 _5 \3 L; O. H7 v
| 欢迎光临 数学建模社区-数学中国 (http://www.madio.net/) |
Powered by Discuz! X2.5 |