数学建模社区-数学中国
标题: 爬虫(七十)多进程multiprocess(六十一) [打印本页]
作者: 杨利霞 时间: 2020-5-31 10:35
标题: 爬虫(七十)多进程multiprocess(六十一)
U% l- \5 b8 a
爬虫(七十)多进程multiprocess(六十一)
E* V8 e; O1 dPython中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。
另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。
下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。
- 4 ^' |9 U; g9 L1 y0 g
" a! O& m6 k r5 o. R
import os' X2 n9 V& T0 A1 L6 {
; h/ b# ?5 b5 ~# X. H, I' K6 b
- : o. f2 Z* p0 Q
1 Y' ~3 ^; M+ p5 f' P4 v9 himport multiprocessing
7 H' |3 o3 S* X, D
8 s' E7 @8 u2 ]3 n5 a
1 g9 B/ {6 `* y8 e" K6 \" `0 H7 y
4 o s! d/ J8 w4 E2 {
) A. J! J! b1 Q/ D2 U. `8 W
) Q# w, {7 j m, {7 d, |- [, c( H3 ?
def foo(i):% e8 H2 f* a& P! H' T& H: k; @# S$ p
0 [ b; y- @1 B' q1 y8 m0 A4 j- / `- Q8 b, s" Z+ k- T
( s* n" V! s1 |* z+ c6 t" _$ _$ U
# 同样的参数传递方法
L; S% k- x7 p, m0 y1 Q6 V) p, q2 J
- 3 g0 j! G) |2 k. X
* M0 H" P* k. s( d$ \! y print("这里是 ", multiprocessing.current_process().name)
- g- O/ ^2 P7 H6 x" e) q. C% ^& Q3 U5 a7 D6 j0 i
4 t/ X7 a5 z' P/ @. G& ^
8 W# a. }7 V8 C* y print('模块名称:', __name__)
( U8 b; p% P9 }* P
; H+ R: T. d: _: X! ~# B2 v- : `) o$ \0 S5 W( p" E
7 \/ P0 v# S/ K, k8 g
print('父进程 id:', os.getppid()) # 获取父进程id* i& C/ T. @% R5 l; P1 j! G
9 ]8 ]5 s6 m w' a5 J2 N - " O- }9 x3 Y& C! Y2 _1 S7 l& \
' v& o, Z$ o& Z1 U$ D( `' F# M print('当前子进程 id:', os.getpid()) # 获取自己的进程id
7 ]) T3 d. p3 f6 @8 ~5 e7 s3 i& s
! F3 Q; e7 E. f9 {. q3 j8 T - ; \5 T6 c7 i. G# T9 g# U
- x" I' E& f, N, Y% T
print('------------------------')
0 Y3 n; v/ }5 Q5 d, ]9 m7 h% S5 c: o5 f0 J+ f6 n/ u
2 C+ q9 w6 c2 X1 D" D% @3 S
. I+ B( R" t+ ]/ w( w0 ?# c
, |( H+ P [5 \9 K, @5 _: m9 l+ @% _3 s. U* R |8 O
- ; I. F1 @7 J7 M6 O* p0 O
+ p$ ^/ I6 ~0 e( ^9 @3 i4 pif __name__ == '__main__':0 f: M, C9 T5 \) @2 B* \, R
8 s5 V# F1 |# }! w s
" x* m2 L. b9 @& m2 b- Y" }" t, A3 u! u) o
6 d" M. c2 f5 P0 E& J
0 L l$ t% i; r' a+ N5 `- 8 B2 x5 P1 S" v4 u5 C4 A/ |
& U4 q% }3 W* G0 w* }! u& N for i in range(5):' D- D6 p, a2 p$ v
: b0 s$ H g) D! _
i" ~) E4 Q4 M) T. l" h9 o& Z3 C: }9 [, N$ @; A" Z* K( R/ u
p = multiprocessing.Process(target=foo, args=(i,))
; d2 v" {4 J9 ^: \* C) j% r0 v
2 F1 g4 ?5 {; F, [4 A- " \1 B n4 O _! u7 c
) R4 M1 Y" b* S4 b9 M
p.start()
' [) b7 t; K) W" f" w$ T5 j% O# S8 Z: o! Z5 U X" y
4 v6 H% F/ h+ b! n1 ^! j/ q
运行结果:
- 9 @( ~' Z2 K0 g( }
" N- y: h8 i6 x3 h
这里是 Process-2
$ X' l9 J b$ i- t+ v5 ?/ F' j/ u; @! e% y9 }
- 7 V& p) o+ Q% j& z7 Q: z1 ]+ k
) m" H) C/ x" F+ f- E% [# A5 l. U
模块名称: __mp_main__
, h5 P3 y/ e( F
5 E" j) l0 D; f% g" V: O- N - : z; M: |* P5 \1 G
; g" D$ D# l0 {2 G0 s父进程 id: 880
: e( z% Z0 l' o- _
" A$ L. s0 Z8 N( Q - , n, f; g9 g2 L- i" _
9 N* }! X# y! ^- x- C" d/ x* O当前子进程 id: 5260
3 Y+ m4 Y3 m" G0 u/ i% k% R, F b
$ V+ C: t, [. l% o! K- ? - * D; V* k7 e5 m0 I& H# _
! L* }6 ], s. K! n1 S2 \9 i, B--------------; B- M. L8 K) N8 I2 a! j
( y" A0 n0 o! Z% @/ x( z
( v7 D9 O) L# Q* W# f& ^- {( c
这里是 Process-3
3 h, M1 z! q4 M; _' m ?' D% C. W1 y/ m) @
- & X/ }. @- ~6 r
5 n0 ^/ l) k: t. P* I, X! |' f模块名称: __mp_main__( |+ M. [6 e- n+ h* {+ z9 y0 P
* h6 @! C( _$ f0 y$ J0 q - 8 N* t6 i0 T& N* g" G) n
8 s6 l9 B& p0 y, n6 Y父进程 id: 880
- ?% m; `2 h& m0 U) b0 `1 s: s+ K0 h# e1 s/ v
- + K0 E# T. H4 Z" S# y( ~
' d" _- u5 D/ k8 S; N l当前子进程 id: 4912; Y+ p+ K7 b. h/ u+ h+ b
& f% {: a* O6 K
! X. e6 }* U, n5 i
% c5 X) e& R" H: E7 K' ^1 W--------------
9 F8 h0 S( v4 Q; v8 G Q6 G2 }- k- b3 j0 p2 _- T' @
- # U1 ~1 `6 p& z. F# n# T% N- {
" ^; J# X- k+ U. I8 p
这里是 Process-4/ Q/ X; Q1 s4 w( F" \/ O/ X% Z
% X; C7 U7 w7 W+ s- x5 d
) z5 k @& c$ @* ]5 \" G* d& \4 X& t7 |- k+ p) _: F
模块名称: __mp_main__- K! {- |9 C. _6 |, D
/ u$ z0 h4 y3 `1 x! ^* R
- ) }+ u0 ~- [1 N" R; @( E
' `- N- I0 n }" g& V* C _
父进程 id: 8809 W \: p0 {: n* _3 P% f; @* O
4 I8 [. e# ?$ \9 Q: F9 f7 } - % G1 L8 E' j. z4 a7 h
8 [) E# f5 F* q0 I7 m' D9 Y% E1 ]
当前子进程 id: 5176, t- H- d# U9 y0 X2 k( l
8 @- K- p4 j- H; ~6 o. Z
- ( v6 S+ W e1 I: \
* E3 N4 X# j/ A+ {+ d
--------------
! y$ N9 \: o* _8 |) K6 h& M
0 z% ~3 ]: b8 a- {" q2 F - + Z/ J. z) ^8 M1 E1 J) e& ]
: C" x& f0 Z5 [% f这里是 Process-11 k2 g# l& O" v( _$ |' J' j% W
! p+ h$ x. A' F - 2 V' }2 f; i# N7 \6 k
/ A* s( Y' L) N) X5 P8 J2 s$ Z模块名称: __mp_main__' R0 Z1 T$ B7 R9 h* {2 m) X: m* t
" t' W: j2 n1 Z. N
' ]% o! }- [1 u2 x" B: }! i- q5 }5 e
父进程 id: 880' n) \: _/ f+ ]* R! w' h
5 }# X6 X/ J( W. s
! ` i% R* N7 B7 k; j# ?$ U# e0 {0 M+ z
当前子进程 id: 5380
7 H2 w) A8 }, B8 e5 Q6 ?, J' v
( q) v9 A& O( O y) T) f
0 D, E! k0 J L9 h- x$ v* ^: I
+ Q; k/ u+ }$ u4 Z--------------
4 K: T- V- ^9 T" Q- m6 [/ [8 O( {- W5 [# U- c# c5 \9 B. T
- 0 X# o9 \2 |. g
R J: c. G8 Y/ X
这里是 Process-5
! o, C1 ~3 N! q% O* P, E
" e5 _% o6 \) k - ) J4 f: y) y! d, Q0 j4 w
: o1 R4 L4 f! n4 o- Z模块名称: __mp_main__; p0 W W P t" N4 U
9 D _/ e& X" z; f# |' s/ r' f - 0 {' R0 t1 w( F7 P3 v
# d% s; t9 h; n2 K% S父进程 id: 880) C, L' ?5 e" K, i( ^$ ~, ?2 o
9 Z! @" u/ x- X3 @+ U. U9 h
9 x2 r& y& m" M' L" w. V/ ]- ^* N- A8 R3 ^6 y" J2 [' c; N
当前子进程 id: 3520. x- ^6 i' a0 g5 [6 k$ s! h6 W5 C
G' o/ T& c# o E7 K
/ W2 |7 P! q' |" Z% k2 [0 m
% d; b1 N* r3 H8 y3 n: [--------------
( ?( [3 y9 n/ y/ c. p" [' R
5 a, M0 {% H% Y: |: l" U, G' [. }
1. 进程间的数据共享在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。
创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。
下面我们尝试用一个全局列表来实现进程间的数据共享:
- 0 G2 }; g5 c/ Y7 \
. | I( y( S: H% J8 Yfrom multiprocessing import Process
/ b: o' D8 \: g0 k3 S4 T& R" J; f0 e0 H& h% ~- B8 _
- * y" J1 X6 W" ?$ F. H
, ]( \9 F: }' u, B# m6 H. A+ h* g* [6 X! C3 j, w3 k* K8 Z
2 a8 m' F; Q# s8 M% e9 h6 J) L% V" a
- + q/ o; C ]0 n; f
; |6 v( _* c4 \# r6 v; M! ^
lis = []
) s, Y$ s2 r1 R% C8 A9 \8 Z* }( K& r: h8 n2 Q" k* p
- 2 u1 V! y Z r1 j
6 H `' m( G2 N, B$ y; v7 X" t2 K1 ?8 i2 N. l! |
/ a+ [$ N; V7 z/ W, ^
* I4 |+ g( |" b2 H* H5 U$ s: |: v5 r
3 C h6 k5 C+ x! U$ r% s' cdef foo(i):
* v( u* a7 R) ]+ Q% `# C1 N* ^7 Z
/ @0 s0 z6 l6 X5 @/ t6 C# U8 Z
( _. D; \" H9 V. V" x) d2 o P# n, T+ L; J/ w
lis.append(i)
8 l4 o7 \$ n& ]' k( ^5 ?# X7 y0 W5 A8 u5 k! @, l! ?
- ; U- e: D# T5 M4 {
# g2 I% H) @* u# N* S& T$ a4 ^ print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis))
/ Z+ D7 C0 q$ v% t% q' ]* ~" ?9 p8 h! O; V) m( W9 Q. T# x
- ! X! z j; v, v# O" z) C
0 f- u: x/ h1 V9 @) g
! ]8 R1 N, n) Y& Y5 n: r S+ S
8 P! i5 J( _( g. e! p% ?! ?
' B% D5 c+ S# R5 @: k
3 e X9 C* t3 _3 b+ Tif __name__ == '__main__':
6 F/ j$ l# W% C, [1 E( X3 _
* O% ]% S& W h2 E. T- * i- F- q, Z% W% s
5 o/ s% j% M/ x( n& L5 I for i in range(5):" O4 `' p" P7 Q
( ] J6 r9 @8 \( Q/ M
: v. a6 s: o u+ q$ T3 T
8 N/ [% {( A# {* T8 \9 r5 o p = Process(target=foo, args=(i,))
' {2 R/ M2 ]* }' A6 k4 {
3 `( U8 A6 h$ P3 r8 \- Z3 @: ^- 7 P% x; O) Q, y! u! ?9 S
% o' x8 |1 }- \3 @ p.start()
9 U" Q( U5 M' d6 Y
( a: S+ _% }# U/ o+ ^8 y$ ] - 1 A2 D4 g+ ~" P8 H; s# l
+ P! R! z6 V" M g% e& o9 P print("The end of list_1:", lis)/ G7 h0 ?+ ?# t) e
" J) C- B$ l) |+ c! H8 m
: X" G Z& Z* H5 z
运行结果:
7 {6 q) [4 W a2 c' C" ?7 @7 A+ c# u- f# ]$ k
The end of list_1: []
/ d) C3 Q) d# B+ [1 u) E" B4 P K0 J- ?( E
6 C5 Y- L" b: P3 e% h+ b; n4 t0 }) X+ t* ~' G
This is Process 2 and lis is [2] and lis.address is 403567446 a, e4 F$ o5 k% p4 t; D
- q3 z" w' F- o% x+ L4 g
, O& N2 T+ k9 d; m% D M3 Z5 |+ i$ v/ o
This is Process 1 and lis is [1] and lis.address is 40291208
5 E# B4 ~; I& w( P$ t$ Q
! }3 y O& Y3 R2 s- # E( g* s, L+ m
* N6 `$ x& W' x1 x1 f/ }+ mThis is Process 0 and lis is [0] and lis.address is 40291208) p8 U9 k3 A3 L* z
7 G: M, a" a6 h& V3 [" s' Y
$ t& l5 c6 ]+ J& a' ^+ t% b' Y6 N5 d& P+ ]2 {) o1 r
This is Process 3 and lis is [3] and lis.address is 40225672: u6 U3 ^: K8 W% O- F' z
9 Y5 v$ S( V: L' J! b3 Q
8 b+ Y( @0 h8 i' Y# ? y" c$ G N: x2 ~; G& @7 {; s0 p2 S
This is Process 4 and lis is [4] and lis.address is 40291208
/ L" K* C/ b8 Z5 R! C) ]7 A9 X: Y5 L! e, D8 |8 L- j
/ I4 x( y+ K( i) N+ n( |
可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。
想要在进程之间进行数据共享可以使用Queues、Array和Manager这三个multiprocess模块提供的类。
1.1 使用Array共享数据对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系:
3 I# M m" w; c+ W; t! k7 K) {, R! X1 \* e7 R0 K7 [. Y2 O! ]
'c': ctypes.c_char, 'u': ctypes.c_wchar,
1 i4 r+ C4 s& R! P
2 X, Y" g% R" T3 A4 A( {! u. E) `0 g
' X3 q: A/ g: o9 m7 ] ^
3 y9 X% S$ e3 Z. [3 T2 l'b': ctypes.c_byte, 'B': ctypes.c_ubyte, W% z& t' i, r; L0 W) d
1 ?% w6 q+ e5 g( s
2 t4 I d! X2 R" K r- M- K& Q8 f# _$ q! I ~
'h': ctypes.c_short, 'H': ctypes.c_ushort,
* I4 G' L$ d/ S5 W" X5 j# N) j2 g# t8 F f, P
- 3 L' C. |# u3 U9 [% \0 R& z! ~7 z
+ }+ U7 U& `2 t2 V3 d0 a
'i': ctypes.c_int, 'I': ctypes.c_uint,
. K6 D C+ U3 N4 A1 | s; h7 p1 c# E( F! o2 z6 `! X
- * t) y y0 z% G9 ? c
1 p" X! ?# x% n
'l': ctypes.c_long, 'L': ctypes.c_ulong,
7 N( Q6 m; D! t2 l0 k! N S$ q1 t- k1 a
- , }# p7 ^8 n; q, J& d- Q1 A
`1 u, l; v4 y'f': ctypes.c_float, 'd': ctypes.c_double
! h4 l9 H% w% Z6 ?
1 G* t& |0 P4 u+ T/ t( }; G! V! q+ |+ K# Y
看下面的例子:
! ?- j- v. ]$ b& C- d8 y& A" b* ^% t& i+ f+ ?4 M' Y9 s; K5 Q
from multiprocessing import Process5 I2 k# w% ^$ H( k" N% w
# Q1 \, p7 G: m/ |& X
6 B# u7 v& K6 O4 ^+ [; j$ O" U: c8 [9 `! d- q% Y8 ^
from multiprocessing import Array' I( S: N$ ~* T: l
Z6 Q$ I5 j9 ?; ~
3 H( b* n H1 |6 J
$ C, \6 b4 [/ [% V; ~: j1 p8 U
5 L9 \% D5 Z# h4 o4 C
$ j) W) Z* @+ g3 ]% T6 B. y
( \) t! Q D( @% I
" e3 ~* ~2 i/ b i. ndef func(i,temp):
( F* D* U5 \2 R, p1 L
& r& t+ x( z; D7 p( ^$ E- ) `) o! G- X- `' b+ y
4 o/ @8 a; S9 T$ F5 x0 ]6 @. o5 x temp[0] += 1008 [* o W' [: C; T0 G
; S3 \6 D" W$ l1 b' c5 [" v; a& @
/ _+ v0 Y% |; {8 H( n. p
2 E R/ m+ x' i, ~ print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0])6 ^9 \$ }# \! U+ o5 d
e6 c. x% y/ E
- ( W# n! U' u6 ?
- t3 k( |' V* e( S( [
: ?& |- u* M" g+ B* M- E& z
! i% E/ M6 i5 ^$ s& {
$ J* d) j; Q& [$ A- F5 D$ P' x' s' p9 Z6 Q" c* p6 `+ w8 J
if __name__ == '__main__':1 @7 J: ^2 ~, e0 L' l8 q' U5 {) I
6 Q' ]! f. W0 \8 ^8 {. l( {+ @4 I
. F" V6 ], Z- t; X: g7 L, c& \- e+ E' y9 V* ?2 G9 x1 p1 D% U
temp = Array('i', [1, 2, 3, 4])
' Z" Z5 j3 v& L) X7 ?! @
0 ]4 R- _- o+ f. C) R& D9 O
$ P& o: m& g& f# L, b/ G& G$ E- `5 h0 b- F4 E0 l
for i in range(10):
* M) R2 G, u" m
" ~( V+ _/ c: _& O8 l' K9 M6 k
$ H7 u! q$ b& a, T; d# h7 P) g. m- H+ q9 i* }9 h
p = Process(target=func, args=(i, temp))7 z: g2 W; b, e+ L
+ x$ F' t9 n3 S$ v& E' l% M1 ]2 s
% }; o4 V" `/ i \+ S( S8 H6 U8 W* N
p.start()9 U- y1 w4 T8 p: }# \- ]
6 }/ ~0 ~% x; x% b, O* @3 q# z; W. ]# T5 R* {7 B
运行结果:
- ( y2 [" o" U0 |, ?& R5 s3 x( v
6 ? U( G. S N; P# N+ C, |9 E4 k- b进程2 修改数组第一个元素后-----> 101
$ n% T) o$ r$ K& }! x* B/ J v) ^ l+ y
; f& z7 h9 d+ | - 2 e/ V- Y" `5 m2 T6 Q$ \
9 x$ P" Y# v m, D$ e9 l进程4 修改数组第一个元素后-----> 201
3 i# ~& O( ]5 V; c4 [1 i( _, R2 r9 `! `! o! j
- : [! ]& y% k- X- i J; F
4 ^, ^% ]+ _' N
进程5 修改数组第一个元素后-----> 301" J- e5 W2 _6 [% e, f# q) t
2 H! V& w& _% N& _) M, l5 z! B1 Y
w/ K3 S7 Q5 m$ P
# \/ I3 N" F* r进程3 修改数组第一个元素后-----> 401% E. ^$ e0 l8 q9 v3 }/ N$ ]
( w. i* @+ I. X+ G
- , b6 ^9 ^8 W$ d* z& ?' s7 i$ t
4 p! ]0 H3 G- }- j0 k# H
进程1 修改数组第一个元素后-----> 501
5 @4 y0 o; E+ [ M! o$ A/ g" |! P" t: U7 R, v( P) J
- P0 k, W2 J7 m! s
6 i% k% A& E) r) R+ ~1 q
进程6 修改数组第一个元素后-----> 601
. r" r2 n- X8 ^
0 J/ g4 u8 a7 b1 Q ^% H1 k0 I
; m( x( J d3 _1 d, o( S1 F$ P/ g, Z. m5 ]6 M
进程9 修改数组第一个元素后-----> 701
3 u u$ g8 }6 u2 e: h) K2 K
/ p* ], e1 ]) W* b {9 m- h' n) A- , }1 p& K+ v* b0 _
1 M( _8 F' @6 \* P
进程8 修改数组第一个元素后-----> 801* y2 ?, C0 H# w4 V2 C! F
3 I$ `# |7 E& n. W8 ]% r - ) ~ s; f; }. p8 s% l6 ^6 q
: s6 P% o6 K# F3 H1 J6 X8 g( m进程0 修改数组第一个元素后-----> 901
3 X$ }# n: S- h+ W, c9 q% k
( |4 X" K3 Z7 F5 h8 _ - ! w2 @0 }% ~; \3 l
3 f$ X: W, Q! T7 p" ~& P2 m c进程7 修改数组第一个元素后-----> 1001
+ C: N/ _6 P7 Z, g+ h
" n, N ~/ I, i
/ U: T9 h* c, b q
1.2 使用Manager共享数据通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。
- ) O& W' l4 ?2 B a. k! }
/ H6 Z" w% t& z6 a% a4 [1 i
from multiprocessing import Process
3 Y& N0 V( G: x0 @7 R$ {
0 W& H2 s. V8 t' Z( ~4 o
0 W. w6 S, T8 O7 c
3 D$ ]3 [) a6 n4 h0 hfrom multiprocessing import Manager# `% f' ^5 f# N; p
; d0 n# Y9 T- {* N. @: E! B1 P- 0 ?& h, A8 x: t* m
( B; H: G0 g6 i+ A- J, f8 @( s' Q c) G* w$ ~
. O8 j2 ~1 b. L' `) i6 V
- ! j9 l$ W0 f/ @. Y7 m: J6 M0 S
" J2 H- B9 P" t+ `% V8 H2 Z- t
def func(i, dic):
5 `- R) R, F( b/ B) w+ O) ~( ^4 U8 c8 j% _. G
- $ d$ A1 H2 C i* w+ ]2 Q
7 i8 [7 s6 u0 \7 x2 s# H dic["num"] = 100+i
; `) H6 }% m9 S% _0 @! {9 M$ O( v) W1 S
' I2 c, g- J* T" n' Q; Y% H: Y5 \9 t w2 E3 k" s
print(dic.items())
E$ U+ l' F0 n
$ @1 n: f( f: k9 A* @1 {" i* {- 5 K* l) |6 { {3 F) p. ~
. D7 R; M& I( D% f" I z! C
* }6 ]' K. ^. C Y4 a0 Z
% l& r5 H' H8 e' ~2 c7 V
0 g1 G- L5 r+ `0 q3 M
* ~5 I& h; W E9 E! }$ `, y" Z3 Gif __name__ == '__main__':
$ e8 z9 S8 |) _7 A+ b
4 F. x. w% u, v( p- 1 c( H) m$ e# {7 \7 ]* U+ }
$ k o. p2 W& e1 ` dic = Manager().dict()
' L; E8 z6 W, _' ?. i, ]( O/ H0 I" A) ^3 c* _
% l( L" S6 c% @8 g i- Z
! e5 e- W* ~. K for i in range(10):
, a r$ p7 I: c3 N$ c' c& q( [! j: H/ t0 M
6 ]: B5 B# R: T6 G
; B3 f; e3 A8 }5 ]* I% R5 @. F p = Process(target=func, args=(i, dic))& S5 i1 J1 R9 t: Y5 y! C
1 @- G' j$ g$ m X' [
4 A" p t/ B+ P$ w6 t- G% G* R/ Q! k# t+ o! e
p.start()
/ w0 l/ J$ s( L- s; S- k
% G& \& D% U# m5 T
4 r' q5 E; Q( I
' R* i2 {0 U6 l# Y8 T p.join()
6 q5 D( t) I, Z3 y/ H, x8 l* i& b) M- R$ i6 z) K" O
. Z" J; N2 t2 y' ]
运行结果:
e$ k6 V& S7 K( i, p/ `$ w# t9 @2 M9 V$ [% \
[('num', 100)]
9 b2 N6 i8 W3 _! u* s' b* _1 N
f& |) S7 c- u* w/ s
3 {1 o: |. q+ u( l9 A8 {) l
, ~0 H% n/ n3 z! |% X[('num', 101)]- E' H% Y' i6 X; a0 ^
' Y7 {, d$ A( W4 j* \9 c+ `3 q+ h
6 ]. p/ A- S* ~
* q1 k' }% O0 C[('num', 102)]
. J' D) O3 D1 s& p
% h; K) h6 {! m3 Y5 U* _$ |+ `4 {7 w- - W% I8 y: r# r/ _; w
/ M/ L+ i* F# p4 P
[('num', 103)]" B. w. Y& F7 F m0 ~+ A
# b8 N9 k2 g/ o* K) b$ W
- ) P4 B6 c& N9 Y; B' x
, X1 Q: k4 K$ f% \* c
[('num', 104)]
' ~1 y& n( W( W* z
' P! i9 }4 H8 c# N: P; q8 S+ X - / _5 e0 G0 o: p: C
+ N# W% v8 o* _8 f. f: V
[('num', 105)]
& y/ l" Y- P2 m$ q4 J5 {- F. u) W. K2 M# C0 a7 g& r. E% i
% q( ^! m, |/ y: M, ^1 x
9 p" z( Z. S& H% z7 C' X4 S[('num', 106)]" U( P# v) e* M8 v) q
M6 f8 }! L- c. E5 C' q/ `7 J
! Q2 @) r x: x. l# Y
! u$ I+ n% ]( M/ x' h[('num', 107)]
* }0 b9 G$ H3 f: p0 X9 [1 o+ e8 k2 Y3 n- Y
' [: {" [5 ?# }' t5 h( v) P/ W* ^9 G3 O
[('num', 108)]6 p1 N; j4 p( P0 }
/ W9 B* g0 X2 ]" a6 x% g( m2 ~2 O: J
- ( `3 H. E d/ l3 }+ s$ I3 ]
% O3 x r) y( a) E Y1 h2 F$ I
[('num', 109)]& {+ K6 P; M4 U* ^0 d- J
2 L! A6 Z. X# _% w" s
, x# M5 x2 z' y' C# n; V0 W. g
1.3 使用queues的Queue类共享数据multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示:
- 5 g( t: [; N: I' P
8 e! f7 Q( P2 c' z* I
import multiprocessing1 O) w/ @/ p7 A7 w8 O
; K9 E2 Z) n5 v6 f m. O {
; v6 {4 f. a1 n7 I* f! B2 s
2 K; O% n9 ]$ h5 Pfrom multiprocessing import Process; A# V! @! o5 V6 f3 h
5 {% G. i1 r+ ]9 |4 K# z" E
" }1 K1 p1 Y! l/ G
& B3 A2 V) f7 H3 P, L! tfrom multiprocessing import queues
% W, s% k$ ]+ [ }! ^ ^2 f4 b9 i" s% m9 P5 I
- . c" R( z$ T( h3 ?: i, j
# [# G- A8 o$ s
, g8 {! ]8 N0 I% q9 G& l9 \! G
- G* k; N: t; Z- [
- J( Y8 d6 F4 K0 N; q% j, ]2 z* H' P9 }3 H \; h* G
def func(i, q):9 G4 T5 ~( V1 s8 c6 W ~
$ \" \ \; k u4 K- 7 y0 _6 p# M" q# {8 J
6 @: C$ y+ l& j' d% }. b
ret = q.get()
, U2 C; O2 W- l8 T K; I7 X6 E- F. b/ v$ o5 L }; @5 E$ P
% c; p+ ]0 Q1 _; b, E6 W- Z* d3 o5 s
print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i)); k) A% p+ V& ]8 l) c6 K- ^* T
% C. y' j( u: ]2 e2 ~% a1 h
% [; n2 q/ B; [ {' b' Y/ C, n4 q7 Q% L- T! t/ `* \ r
q.put(i): t3 q) {4 g2 {1 Y$ G' ^6 Z8 n0 \
9 }! l) L" d3 l
" u5 v. q1 D% S4 Y4 u$ f8 j D0 Z8 J# t
/ H2 l& P( M- d: W
' g- N) N* `$ t5 k
# {/ m5 ^; j% z$ V5 i+ n4 |- R
if __name__ == "__main__":5 R; n4 Z$ U4 |' P( T. |
( v `5 F3 q. \" V, w) [
, b8 m; O$ X( F5 c$ [( D5 T" b! i3 S: m# O, n, B
lis = queues.Queue(20, ctx=multiprocessing)7 J. v4 l+ b3 I i2 r9 I! W
; {6 h9 o+ t3 z0 B" P* O+ N- 4 C# I5 d. A# Q1 g- v2 }
5 ~$ k: B. V0 X7 D; U
lis.put(0)" V! k) ^3 g1 |. J. M
5 X. C: }* ~& K3 _$ m% Z0 ?
* C7 `0 q& Y( c' ?% {( B3 H7 {0 k0 \/ g1 t6 M3 a
for i in range(10):( `1 P2 \$ ~3 u* E& U6 l
. {# Y7 H1 A F4 @
. U4 G3 }7 {4 y. T+ P0 F; X, ^( j5 K- b+ {
p = Process(target=func, args=(i, lis,))
" O0 V/ H6 b5 V! p- e3 ^: t: g+ u& T( V% X+ A" B
- 3 s6 P* c7 v$ V1 F, }% G1 }4 [
" V, k$ Q+ t6 u/ h+ o1 b" H
p.start()
/ i D2 i3 I' T- c( ^+ |1 o* D
; Z9 l6 W8 u$ H; q$ ?
5 i+ O; |3 l* x F* ^& n# v+ k
运行结果:
- 7 b% u' w) l& f O
+ Y J4 @8 m, `5 w* Z w
进程1从队列里获取了一个0,然后又向队列里放入了一个17 I+ c5 H+ E4 B, Q. R) ]
9 J1 c$ o0 l+ W6 n
( s, @9 m7 x& @& W$ Y8 c9 A
. G+ f3 J, O- r4 J进程4从队列里获取了一个1,然后又向队列里放入了一个4
7 c) o# W- B: k% u d, T4 R" a8 C6 x* b5 E
1 L5 a) t! c. @8 \ U& u$ p% T N2 U! D3 ?4 K/ D
进程2从队列里获取了一个4,然后又向队列里放入了一个2$ g6 R: l3 y Q
2 L9 I" l/ C! Y1 F
) R8 T2 }) K* r B2 ~: Q8 }! J1 {* {6 c2 V) u7 G
进程6从队列里获取了一个2,然后又向队列里放入了一个6& H9 X4 \8 t0 r& ~" S% `/ w- l1 x
: ^4 x! n5 I. \, c* _" n& r) Q- 8 l% k! w7 K6 c/ a3 _- i
5 m9 [& O: [/ D3 S8 d- R/ }
进程0从队列里获取了一个6,然后又向队列里放入了一个0
1 f; t( r' [* i( }& D9 E. Z% W5 [' }, z2 H% H. e
- 2 z9 J7 j' ~1 R( |
, h1 v7 {/ ^. g. V
进程5从队列里获取了一个0,然后又向队列里放入了一个5
7 p6 z; v5 K: J
( C- \* |" F1 j
( b' O5 ^$ u) w) s, s7 n3 l* y( b. S& Y7 ^
进程9从队列里获取了一个5,然后又向队列里放入了一个9
% x. m# C- G9 `& W" y5 v H l3 g8 {- J8 V
- / E. m4 N; L% _
1 ?# \! w/ N* W* C r' H: }进程7从队列里获取了一个9,然后又向队列里放入了一个7
5 m7 {* L) i3 J3 ?$ g* ~' e1 l4 ]5 I( E% T
; d6 J! x' u' N' p9 j. i! k6 ?) \$ |$ }+ |
进程3从队列里获取了一个7,然后又向队列里放入了一个35 [" s9 ]8 Y$ s( {% x0 [" Z6 \9 b
% `: {2 b* U- l" [- " g) I# n/ n! _' ^% J# @
! K( X; E! d8 I7 R c, O1 w
进程8从队列里获取了一个3,然后又向队列里放入了一个8! |8 A% A# y& I) k! S" Z3 V8 T2 @; m' M
; Y9 s @; n& h3 |* }7 S6 {
x* |; c8 v2 H6 A- v
关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。
2. 进程锁为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的,这一点非常友好!
" V% S& d( L2 ? s3 u f3 w; z1 ]- u; h. b
from multiprocessing import Process. T9 a- E& {8 `* C
) D7 i' v: ^/ n% Q! g6 ?
: t$ V$ O( w; l& u+ G, ^: c7 p1 Z5 c! M! b& Z7 R
from multiprocessing import Array
8 [2 v9 F4 N7 K1 U5 d# W( W! h2 Z+ E, B2 B
) _+ g0 x+ w9 G- m& [
# e% k9 e& O: n+ rfrom multiprocessing import RLock, Lock, Event, Condition, Semaphore
+ \7 j0 D/ }% Q f- Z) o( z9 e D8 D4 T5 S: Y
- ( m3 D2 r/ I9 x" R
% [4 O0 H8 {! Q8 [, Qimport time1 y" a: c5 `) H) r
# W4 z% d9 }/ U, b7 N
: e; y' ~1 D) i' h3 Z" m, J7 m. ~. s: m4 R1 D' t/ F3 h# H
& d5 i4 V; m4 }+ ?; ]0 n) ^
' J; x. N$ C% Z8 x) u9 k
/ S3 k1 B8 p4 C. p) O& ?% y0 e8 A3 d! x, @7 {7 z+ T
def func(i,lis,lc):4 i7 B2 R+ {9 ], H3 p% d
, d# o% i5 H0 F
- & t; Z9 O4 M9 z0 ~
/ o9 X% c. T4 E- i9 [; C lc.acquire()4 K9 z- Y5 S0 x4 I0 w# F
* F6 Y4 P3 o' J8 X0 n7 h4 v; I- H- Q
/ j# ^9 F7 y/ {; B9 w' v" o& z. p0 F2 y" k9 |
lis[0] = lis[0] - 1! }9 ?3 j5 N0 x
+ [' `# @7 K8 X6 W% n# t, `: G$ v
- ! S& j0 G+ `( [& R% C7 f! R) R
" h; r0 n3 C& d1 [4 g8 y
time.sleep(1)! W' F5 y6 q, q! j" S! c/ S3 Q
0 C: l% t. b! T4 Y' S/ G - 0 R! m3 ~2 {; b' V
+ P7 Y" y0 @% E. U print('say hi', lis[0])3 Z8 _7 O& ]' [
! s4 u3 b! `6 C6 l3 A" G: T
$ n( Y! {# Q! S( \ S2 a. _& [4 d5 _9 r( q$ C/ ~2 k. W+ y6 v9 | C. V
lc.release()
+ W. k3 E* z7 h0 |6 f! g) ] Q5 I7 C1 D' h h
8 k6 w. O- ~7 w) F7 ~" g% M1 J3 s$ V3 `- `, K
/ V# u* C+ x/ |- d; d) @
, P( e6 }( u3 a/ x1 s. l
- : H. A/ g: ^- Z: ]% k7 ]
, h7 D/ J& l" e) x; b$ zif __name__ == "__main__":9 c" A8 e( K" |/ j H' S( ?
# I+ K- P: |" l3 h) m! t5 v; j. c - 2 F# Y, S7 {) F1 T
# ]1 J" O3 c& T' f% o$ B6 A
array = Array('i', 1)
@# O* d( L4 P- a; V& n& w2 u
7 a' g7 R. e8 I, o- l# Y - ; t. F! s- c( Y0 m
) h1 z- o1 Q& L- q, R
array[0] = 10
$ A* X- d. n0 H8 P) u+ O, f3 }. J# d- R8 v
- # w B! q( P+ I0 K
; U* h- d$ c# p lock = RLock()6 T l' Z' H K; E& e0 ]! I9 | _2 K: R
* z% v9 s: K% X, G
. W# V+ c' K: ^! J5 o7 j( k( e" g
for i in range(10):7 O% a! }+ O; ? }4 N
: P, K5 Q7 j, O6 y) C
+ x A, ?& [; S. O" T
6 y! v% ~( Y0 G p = Process(target=func, args=(i, array, lock))
) o) I( @1 H6 n; e9 F3 M" J3 c
b! T# g5 U3 f
* F$ a- W" i# y( S9 |: j# A% j; U5 d' W& I- r/ }+ m: C* R2 }9 \5 I
p.start()
, H: q# u) s4 E/ f8 T
% ?$ ?5 E1 x! K4 P) r1 f; M* X; k- Y3 d
运行结果:
3 V- x2 U, B3 k0 B1 r/ o* [, g& k# ?: F% C, a8 y& L% _ C
say hi 9
K. o3 Z4 N+ S( F2 p
- b3 P, I. O# U
/ ^5 x5 ~! ]9 n& E
( o s" ]9 G$ w( i' ]& lsay hi 8
. {5 A- K6 n, J- W. i
$ Q' H4 D7 T9 s: v+ J- 3 [% H/ a- {, {: m7 _ y
. q) V" D; b H8 a- z: _3 tsay hi 7- Q1 ]9 k L6 g
U: Y9 L: `* r9 Y* m
" q) _- S2 c6 w* {
9 V3 o/ s s1 w/ m& q M$ ]. Bsay hi 6% Y/ a7 m' E5 W5 g
2 Y5 b5 R3 |; J( G/ q# y
! @$ l) r+ P1 D/ E0 n
. F( U% d/ F, a& Zsay hi 5 |! A* w+ M8 s7 {7 \+ j6 t
5 B- z" o2 b' `
: T% R5 r0 G4 l) B
& D" @3 U* a4 |0 | W/ G! \5 R3 t( usay hi 4
- s$ N8 B$ r6 C3 q- z( l
, u* @5 T6 P) T& d
3 \/ q. L8 }$ p" o6 W* [- R5 V+ A/ T
+ B4 z& w% n& [7 X) `say hi 3' o. V$ v! l+ V9 V+ X5 F
3 x! b* h$ d: Z" O- v6 {" X- ~. y
+ ?$ V9 Q! p) J) ?3 M9 E+ n3 e' t7 `8 o
say hi 2
9 ?" A% w! g6 u H, y) r; h& T [ ~
; Z Z8 K) w: e2 l. ^ `8 w( @# l- $ l4 w7 z. Q; L+ x. N6 L% z
( z+ ?* o9 {' O2 Q9 J& d, ~. r6 |- q
say hi 1
0 u0 S8 A& y9 S: G. @; x
$ P& d1 Y2 v' Q/ V# q& A+ M9 e
+ b- o; T) Q6 w. D+ R" r/ }: _
% r1 G( E0 G) Z/ V, T" ^2 {say hi 0
" h: L* o; c/ \! C% b& c8 q/ ]
3 F& O4 F! F$ v2 n7 Z" ^
: i- |! e. ^( r9 p$ @
3. 进程池Pool类进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。
比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中常用的方法:
9 M2 r1 T8 g7 e ^* K9 w0 s
' g* o$ b* b$ e$ @' K2 G Efrom multiprocessing import Pool
" J, ^( J3 i6 V: T% t
; s6 }( \6 j B: A6 j, {- ! y: h- J. A4 G# J; B! W
g- J& p/ m; u: q/ b% D5 j
import time" H4 f$ F H. h3 f9 x% ]9 R
4 k$ f+ k/ o2 X9 ~5 _5 w
}; y# f9 m* t3 n \" x
! Q% ?/ i( X; N U& d
2 K4 p- [5 V) Y8 _6 I; E* ?! z A3 T; N& r# t" I+ V" \
- # ~- i' n' ]6 M* g6 v% T! k! ^ h
' i. U- E# L! r! L" U
def func(args):
* R- t' r" a1 G$ |# z, x4 l7 F0 f
, d: k3 o( d- Y$ r2 V1 o4 f
% T* y3 {# l, d3 e- E6 G; O( v! @# W; f B7 R
time.sleep(1)$ U$ s3 G6 q6 C: G
- F8 ^3 j/ h( R, H- J* r5 s) H) T* t
% [2 X- u; D* Y& h( B' D4 |& X T. O; {# }) |
print("正在执行进程 ", args)' a! Y7 Z. h2 u
, K2 ]9 W& e G& p# K% j- ! Z+ b8 y& v3 q# X! J. u" P
* E# n: d. _. s
6 a5 y% {4 t$ x
9 }! w5 \" [$ [$ }$ v6 ^# R
- 4 _; _ E, V+ S F% n( [; }* w) P
5 ?# }5 y; J t3 }0 jif __name__ == '__main__':
, g3 V, q+ s% z* W0 m9 z" f
5 q& d# b6 h/ G% e4 w
: B) Q0 _8 l" w/ I+ X( e1 ` ^
+ Z$ b$ z4 S* [' ]6 x; Q9 h: [; E8 ^, I$ T1 g
* X. {0 @7 h W8 G
x' ]! _. T: U9 U! ]- [) C5 ^) q& n
p = Pool(5) # 创建一个包含5个进程的进程池
' N- K T# H o
2 C! F# v+ u \8 t5 n& C- / o0 s0 V9 k& `7 j- j5 T
9 S0 m) o. X( ?, _& h3 J4 X' ^
5 h' X5 V6 t. H T9 W+ [, r8 T, C
. y: |9 i# n, X
: R# D% r q5 H3 n9 R! l3 J3 [( U3 j/ v' u' }) D8 S+ E' T
for i in range(30):" @8 h( m6 X* ^2 F7 r8 a- E- ^
+ B2 Y* e, B. ~* d; C, d( m
5 V6 u/ V2 v9 k- p' x2 q
' g( Y7 y( p2 W; G% Q9 F; o p.apply_async(func=func, args=(i,))" @+ o" w3 Z% C& ~, F# O; s4 S+ f
" d3 L ]6 P7 e$ h9 G$ c
. ?7 m% e8 a: q! L8 l0 U6 d6 j) a% ?
% n6 L' \6 ~$ p- o( ]4 `0 r/ ^1 f7 U
% O5 f$ ^3 G( V- x' G. z
' |" d7 t& K, j- F
1 a" ]- ~3 M; \& j1 C p.close() # 等子进程执行完毕后关闭进程池
9 l1 e/ q9 x& T! D/ F' J' u% n& m" d9 c
5 j9 C. S3 l2 d! z0 }) S
+ q9 k% r2 A1 S; |) q) U) r6 p% v4 C # time.sleep(2); y6 Q! q) f( ?, l& k# D
0 j3 k( ]6 C! e( Q7 g: U+ e# ~4 D
0 d& N1 X: G- _$ e+ D. [* p. e) f
# p.terminate() # 立刻关闭进程池" Z2 B$ N, G! U
, U. i6 d( a2 y: i, X2 Z
$ P2 Y' O. T' a
' [" X% x$ l% u/ l p.join()
5 S* B, }# `" S( E; Y- V, W5 }8 O) ^9 C( Q' z
5 O, j2 G* z# Z
5 h5 Z B' e, n/ ]$ s5 j _请继续关注我
4 z! A, ~& Y" U7 z$ ?
8 f. ^7 V. Q# f" Q
欢迎光临 数学建模社区-数学中国 (http://www.madio.net/) |
Powered by Discuz! X2.5 |