数学建模社区-数学中国
标题: 爬虫(七十)多进程multiprocess(六十一) [打印本页]
作者: 杨利霞 时间: 2020-5-31 10:35
标题: 爬虫(七十)多进程multiprocess(六十一)
3 U9 F- o( M: x% `- p ~爬虫(七十)多进程multiprocess(六十一)
2 b3 @, G; p: J; ?2 w( ^Python中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。
另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。
下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。
- * j' J$ w7 i" T/ \
: L) j% Z L9 G3 \* Qimport os3 `: B, O5 Y% M8 s
. r7 { E( V2 Z: I
- M, \$ X0 n' M
+ `3 L$ h; e, T/ C2 Bimport multiprocessing8 t3 d2 F0 {$ w' o) a
$ V, B6 q3 X6 R- H/ |- O
# _: @* I& i2 L6 s6 @& n
/ w3 u, f5 O5 _. q4 z8 I0 s' E/ Y \+ e \6 U9 q" X. e! x
- i5 A0 I8 }3 M9 C8 S; S
- I0 S/ k& }/ M9 b6 \! l5 e) F' d7 j7 u q
def foo(i):, D2 ]" o( ^7 `1 @$ ~
2 v& S. u7 r1 F! O2 c* G
+ f6 f7 x5 l& K. J4 r7 f% m. S9 w
# 同样的参数传递方法
0 B+ G" D4 `- `, J: R/ V3 z) B D$ M0 m, h8 U+ _% N, E
- % Z0 t$ w& G7 x5 f0 S
. p1 A" c4 z; I- e E, M print("这里是 ", multiprocessing.current_process().name)0 y. w; L# m* A/ {# d$ W* v" Z
& q% |; U7 g X2 _, N+ `& j - , I0 X/ B' H; d+ ^) `. w
2 k# w( T9 e* Y7 }. ^: u+ i
print('模块名称:', __name__)
3 K* r% q* c/ F4 p( w! L
6 S; E( O1 w) \9 f% L8 E - 5 e2 S% y4 P9 G9 e1 J0 V) z
* `% L1 o; u- I9 T& E print('父进程 id:', os.getppid()) # 获取父进程id t8 P3 j6 h. S1 w G4 L
i) O1 A% r' z
; y* o: q# S! ~* m- V
- o5 T! K( |* o, B: l4 t" u print('当前子进程 id:', os.getpid()) # 获取自己的进程id/ P3 i/ {0 \) l0 \4 h5 B
% } U$ c8 a3 b0 g- j$ Z
- + K3 T. }% j: f1 `! u. \" F
3 p/ b' j$ A5 f3 t. A7 B, Z print('------------------------')
. B9 e- ^! b0 F/ O, K- o% K
/ ]+ ~( Q/ n/ g - * A1 G% i+ o: W/ |
/ _: e& D& Y, P. h% f
. e) ^. P0 _* ]1 \
" O7 M" c# i/ g7 d& _; a9 @7 ~1 M - * b" M/ e7 N8 j# k* Z
. a' c) K) b' k, U! Pif __name__ == '__main__':
6 b' Q* Q, E i
( q, D4 j5 d' {4 i: S& w& B" d
6 C2 t$ h5 |# Z) A$ H; N2 p# I5 k4 ^' K; H( l. B
4 v/ b, Z4 r5 j/ w- l7 l
2 a6 j; L0 i8 T7 `! q" Y: Q- 1 i& l% M1 [3 J$ X0 _6 l L ]
/ z( z6 b" Z2 P$ w5 V, a8 ~
for i in range(5):, Z6 @! B* S, k A2 B5 q( n% D
1 h% z( ~+ q" t4 C
- 2 q5 v c& @. O/ _# s
: a$ a" h4 `2 Y5 x6 r p = multiprocessing.Process(target=foo, args=(i,))* O, b% Y( ?4 e) J5 K1 c
0 I8 i) n$ [- p - ) C( ^) k" ^; C! \9 Z f
6 G0 S w/ P$ E+ d
p.start()
! x' J' M: c' ^; ]) x- n7 L
7 `' w1 D! I E& z/ W! Q
% G' |5 O0 z' {/ p+ V" X
运行结果:
" l1 ^ w$ |- q7 j( B* h- V) J" w2 F+ _
这里是 Process-2
8 L3 h# P. @2 f& q% w5 m6 p3 ]& ^
- # u6 q0 ?0 Z/ y y6 T, }
@3 A3 u' }0 l9 T. a: T) `# h
模块名称: __mp_main__
. t& K& e8 C" E. f- _8 X6 Y: V' |3 N$ m; J/ O O u1 S9 @
. f; `1 i4 s- O5 F: J( }" n+ z0 o6 A
父进程 id: 880( F3 p6 p* U& t. e
# x+ U% ^+ t$ B6 b* t$ y- - D4 }5 a# q8 G; l* Z! F
+ I. \2 {( j- o& F+ {2 Q
当前子进程 id: 5260
. h" g7 }% A/ |* w
2 O2 r/ a; H+ X1 V+ C
- {2 v: w7 T- ?. ~7 O5 B; [. v* l" ~7 F% N
--------------
& c7 ~0 E- ^, h! S
3 N8 Q. ?# K+ b# ]% b; n! q- V* n/ V6 W7 r" Z& W
/ s7 g4 k4 `( k* R
这里是 Process-3
9 t6 @& _, Q1 w& n+ s) m7 H
' `+ z' L- _$ w4 W) r3 f
& @5 K5 C0 n# i1 ?8 g/ ^0 v# b0 X
模块名称: __mp_main__
; d( }# c! d) ~+ _- k! {8 }0 z3 N) d' I: Q! b4 a
, D9 @2 l' ` _+ Q$ D
/ q. X# d( A9 s! o9 B父进程 id: 8800 A( w3 a8 f( f/ G0 b2 A' k4 L) O/ a3 \
3 L- h) v: z( c3 Z( A& D0 K& G
5 ~2 z" O* A" n7 Y( S, o
5 g0 W: d$ F' o6 O6 {, E当前子进程 id: 4912' c1 S" {# z/ y" c2 X8 O' @# K
6 q g" Q# A- X& k$ n2 h
@ {+ F2 w3 Z, b( r5 i% V6 X0 X
0 j* y J& i& r7 l6 @0 W' C--------------
' W C! \8 i/ r
; w: m7 v- N/ B. l- . J/ ]: }+ Q5 j$ m5 B$ K! @# h5 D4 V
, v$ ~. M4 W$ @7 g x( Q& p; k# C
这里是 Process-42 x- Y% M) T0 [6 [3 k. _/ t% o: ~
e9 M' [; e" X3 D" @ - / s' H2 H- R: J7 o) t( u
& C2 ~# l% k9 D
模块名称: __mp_main__
$ L% s* ?4 Q3 ]; O- r5 O8 @' h" x: ^4 T/ v
- : {" ^" U7 R l
3 n* r1 g# O1 c! I5 s父进程 id: 8808 U7 `; z1 G2 P4 w4 L7 \
C1 U! S9 E( b( O: h0 F$ m4 h' x8 c
. x0 j7 z9 r9 t. R) v
& H7 G' `; O4 M5 [2 T* t. ~! E( u当前子进程 id: 5176* \7 J1 V0 p) z+ U& f! l
- x, u9 a7 {3 ]0 ]7 P3 x6 Z
4 `+ \9 a4 [5 l" H7 x% V+ y/ g3 q. L) Z4 S8 t
--------------
4 Z- _0 e& x3 R. ^, u6 r
) `: r8 A/ C; x: C: Z- 8 K7 l6 \$ u: L- k
; y5 l- j5 J; w8 a, W4 m% C: y
这里是 Process-1
) |6 ]+ }2 ]4 j% d( |& }9 q9 x2 k2 c9 w& A [
* D- \3 _/ n" f. O# j- U+ R9 s7 _( ~- o9 t& b! F: i
模块名称: __mp_main__
5 r2 G+ `' F; R* l" k% X7 W) o* ]! X+ C3 H" P1 {$ o1 H0 m
* H7 t) Y$ s1 h$ ]. t- @6 q" [* ]* F' B. p# i
父进程 id: 880
% b, M0 i: E% K% [
, i" h9 O2 X8 E) P, }7 z0 _
0 b! i- E$ M+ H. E" Z9 @
: E) I6 ^! l1 H' B9 e1 z" e当前子进程 id: 5380
% Z, d. H f- z& H& z2 r, S7 |8 b- I0 }# r9 q/ v9 i( x2 V1 ^
& R% I1 ?2 D6 J4 A6 C1 x, v
) i/ S% s: l8 Q1 l8 C: I" m% ]--------------
/ o0 O- x3 s9 o( j" `1 z2 O5 b3 y6 A; J. u7 A. d z# A
- 4 l' @" ]0 r7 g) h* m0 \5 H* q) u: q: @
; |, F: Q; K+ j! ~: A
这里是 Process-5
5 s' F }" v/ l" ]" P8 @1 N: ]
7 w. {6 h( i% n( V) \, B! P
6 L: g, z* H5 R3 M- C3 F$ `2 z% F$ x; @2 a" {
模块名称: __mp_main__
, `! k/ w$ Y7 X7 Z9 _, h* ~
e/ F, y( I9 d1 _& F3 b
% i5 m6 u3 D0 O' M, A& W) F+ X: N9 k4 I% H
父进程 id: 8802 E$ K% ?" ]7 A: M0 L7 y- q
- E$ x' o& a6 v; w6 _6 a
- M9 S+ W1 q( T2 y* A1 `
) E8 g' R* Y: m4 l! w, s( \ I当前子进程 id: 3520
# B2 {6 B: x: i3 M
5 g+ m+ A! ?* j& N+ d7 D
3 J$ o) l7 y0 P$ A: @9 [6 U0 I! _) M( E$ N& F
--------------3 V0 Z) p# x- A, k! g; G# W
1 G( c7 n. t9 p- w& D
$ B0 W3 U# w) f0 v6 n2 X9 n
1. 进程间的数据共享在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。
创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。
下面我们尝试用一个全局列表来实现进程间的数据共享:
$ |* {* K& L9 E! r g: ^( \: G! ~% k# X8 g( z! y+ D$ \# ]; g
from multiprocessing import Process1 q |- G+ F! Q2 n$ s- m
' Z5 V. U* o: p/ x- # A4 K, R$ y7 \- \0 z
0 y& p0 Q3 U4 r' F! p& e2 h* ]
- Y$ K6 ^0 a- D( s' C/ @" a; D; ^3 x4 K- T5 O N2 j
! |) W7 v7 V1 W7 Z6 f& g$ Y2 r' B! N3 j& A2 Z
lis = []
( P& E+ B- B$ y
, L9 e, f6 C- w- . S+ e1 }* q3 l* {
6 r! q% |6 D- E7 e" y; I
|' g" P/ I# r1 n5 s" [, Z% _+ p b1 I$ D4 C* Q+ t
- # f# `# S5 i! J5 Q/ b: T+ L, J
+ C- ^; M0 R J; v) i8 h
def foo(i):
. ^0 W3 Q D/ S8 J. v" G1 n' Y0 j3 Z8 [: b* y+ u! T
- / o! G) y- ?# P. H3 w: a
- G$ _( W/ I& U+ s9 e" U6 l lis.append(i)3 e- b" l2 q8 H& H
* m5 n) E7 b' O4 f+ K, n& X" o& n& p
+ O. ]4 W9 r* \. C; I3 u7 [3 H8 ?* s: O" a! U
print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis))% S' }7 |1 S* P
8 O2 ~ F$ q5 C9 o
- $ m* E0 a: A' S& y3 `9 P. z
& d$ {7 x0 H0 u( I) b
( K, B( z. Q/ m/ e; ]
$ |. E( ]+ ]! E; L* {* j5 b; v5 c
- $ Y6 }+ P& w) }8 K! ^
0 Q$ e0 @0 Y g: U/ P* F p' J6 {
if __name__ == '__main__':6 J$ e$ R- T# F0 Q
# s2 z$ t F4 {8 ^
- 1 z$ m0 _" q: S7 R/ f
, W# Y$ r: c5 I* q; n for i in range(5):+ T( A: z2 ? F$ w7 p
3 z& }& ?9 \# M6 k. Y3 g - Q; w/ d' s4 n1 h- s/ Y
! a/ t. ?, U& [5 E+ X p = Process(target=foo, args=(i,)) v( X' ~' Y3 k+ {8 _) ~! W3 q
7 F6 [/ G' m7 W7 }0 t
- 8 D8 B2 Z3 r* W2 [
+ m2 p2 N k- r! e0 \# t
p.start()- r0 l) K% p( c, O5 T
5 H% M3 i- n8 K
- 0 S* s _6 {% E3 m; R5 A) D- y
7 S k& j6 J( T: b5 P3 N print("The end of list_1:", lis)
9 T Q2 |; t7 ?: [- X! T! h K% w3 `4 C" p& G
0 b g7 R, F; G7 ~! o: f8 {
运行结果:
* e O+ t$ n6 Q# k5 q7 o2 y4 @1 \; d) m
The end of list_1: []" `5 p- u0 T3 e I' F3 k1 i
! E" ?$ O- F5 @7 K, t! t" [6 E; R- 3 ~6 `/ n5 g& J* M/ M& y- X
1 V% b# w* g* |9 ? {7 _) L/ }$ Y1 l
This is Process 2 and lis is [2] and lis.address is 40356744
" i9 ^) ]: K$ `7 t: P8 j3 Z4 Q0 ]+ `) ?6 U) _( D! M! r' B4 `
- 0 i, C$ `$ Q; a9 K+ X" i% @, i
# E% G5 `6 L6 |' `; G3 R, Q
This is Process 1 and lis is [1] and lis.address is 402912088 G* Q2 l! I2 S, [/ P
- g7 D: P0 y. C& }
. O3 J0 x/ w5 a8 Z8 U
6 f- R- }- N/ {) l1 Y1 jThis is Process 0 and lis is [0] and lis.address is 40291208& Y# X* ^! k( N2 Q# v' }
/ m( U1 y$ ?6 M. q
# R% s% }/ z! R/ l- }3 m" `+ A) a0 s f
This is Process 3 and lis is [3] and lis.address is 40225672& X# ^, ~6 J+ J0 t; y) x1 j
$ i) a1 h6 D8 h3 l1 m+ ~4 V+ g7 \
- 4 b; I9 D! P( z, T: u2 o" Y
" u) l. W3 L0 d" u+ OThis is Process 4 and lis is [4] and lis.address is 40291208
; k0 L3 Y/ ]: u
+ m1 G5 U' s5 j: z
& w7 b* V/ V. B. S& f" U
可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。
想要在进程之间进行数据共享可以使用Queues、Array和Manager这三个multiprocess模块提供的类。
1.1 使用Array共享数据对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系:
6 A: A$ @" Y1 ]% ~: c' k
! b* a& g3 J; @& y6 J3 \7 D7 D1 y'c': ctypes.c_char, 'u': ctypes.c_wchar,
( f# |$ S1 a C" X1 R0 H
/ x" N- e O3 {. E P( y# P1 m- + g! B8 {( ^+ y
2 o% M' |" m: z @& q! k q3 n
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
9 ^8 F5 z6 U1 Z/ |/ ]# q7 F+ L* N+ e! c5 v8 e+ D, Z* W0 q
. ]) @2 T( x5 c7 s& Y' J! Q# D6 i T% Z9 y
'h': ctypes.c_short, 'H': ctypes.c_ushort,
3 f2 b. u7 C2 A4 u1 {( `8 g: r; Q/ R: x/ H9 S
- ; J- A: B7 N2 {* F1 c
" ~. V# k, O/ w7 q'i': ctypes.c_int, 'I': ctypes.c_uint,
+ Y. X, h* B) d' a) A5 t
/ i- K/ f" K/ i - * Z) d* N2 Q3 e; l% S) L
! N# f" I" q8 j6 \' r$ e'l': ctypes.c_long, 'L': ctypes.c_ulong,) J4 j* W" ~) F: a; ?4 C9 b
8 g1 O2 a1 i( v( K
- ; v3 J3 l ?* d/ e
9 C; W% S! ~7 Q3 r& p3 g& B3 D8 p'f': ctypes.c_float, 'd': ctypes.c_double1 K' n' ]' R* p
) U; Z$ o- {6 G h
5 D. w; l1 l+ e8 T# |! w L' k
看下面的例子:
- 5 ?+ P$ D( r% U. _8 |
! e7 @! I' j# h% E
from multiprocessing import Process( ~3 h$ f) [: Z1 d; M4 A- ], q
4 ^& ~; A' c, i) w M
/ q8 y) h! w/ l2 v8 `
6 l" \4 T+ k7 _4 @0 V7 Efrom multiprocessing import Array, k3 Q7 o7 K& G
s3 k5 p# ?. @1 a2 @6 g: F- . Z9 _/ I0 n( w' z0 i$ K5 W
: E( y- m2 @6 u. k, @$ T
, g/ J. W2 A' o* _- j- g; r s7 D$ S; {3 d/ \
- ' [# J) A) n+ K# Y$ ~( K3 y
; O8 ]1 `; d7 t7 I, z. L- f
def func(i,temp):$ f r) D* F5 ~/ Z3 Z' ]
+ d- g: x" r% L
- - t ?+ n: K) o5 V, E0 y& S! p
) B: P0 j$ Z6 s4 w. W- t temp[0] += 100
, E- R. s7 K0 w$ ?/ H+ I6 C" o/ ]
% H' ]* n; N5 H/ d1 m
1 t4 H- z1 e: ?; u" E& g; `/ Z( C6 W3 P2 s! W, s9 h- M
print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0]); a8 V0 U* U u: n% H
0 H3 C( i+ S% Y2 |- + I, O' |0 q q, |
: R' T: Y$ Z7 J1 W, M) M. U+ s2 E
& `4 y2 g. d& I. g# ^
4 F" |/ m2 Q6 Z: Y - # }! Y; `( c' @$ u. w; g
* j: r$ Z; ]; z( n1 w' Zif __name__ == '__main__':
) ?5 O0 ?" R( w
9 `8 M# ^! Z. J; o$ \' ~0 j, M5 b - ) I; S6 U2 b( B3 J' o) t! U
|7 f. C: |+ {) ?1 S
temp = Array('i', [1, 2, 3, 4])
) o$ [" u D& Y4 Z+ |4 _2 G( `: r. i. q2 H8 e$ p5 T
- ! ?1 L! `( h2 F8 G- A6 _
: ? P6 E* ?1 } e, | for i in range(10):- X( D5 v/ B+ P! p$ e# J8 `5 s1 L6 a
0 C' x, s1 j* W% S+ x2 R/ z - $ J/ I& F) _! f: x# ^! y
% c1 _0 Q: W) q6 K# b
p = Process(target=func, args=(i, temp))) d: ^( H# g. P; G) h8 t2 f, f2 D6 G
, P4 _: W, t' m! I- f - . Q* D+ k, ?! m9 ^2 _: P% N- z" T
) N' J2 V9 ?( U% g/ k% W! T
p.start()+ |. d! l- T* y0 b+ Z- D
9 Z. q5 G Y1 |
/ i' n2 ~$ S+ e( { x( W' r" U
运行结果:
$ a5 e% [; C) s$ ?% {* l' ~; Q6 M$ x7 X2 E( T: Y3 i' c5 Y
进程2 修改数组第一个元素后-----> 101
# A6 y) J& x( z5 K2 e7 v! `8 ]2 d/ y6 }+ H* ~3 g2 j! J
7 `$ I I: O3 ?0 W- v" ]# C: y: g- t
进程4 修改数组第一个元素后-----> 201; c- D4 y9 D2 Y- c
4 Z2 J) B" n' S" e2 ?
" \& G9 p( F, d& w2 m4 O2 g4 g: N$ {2 _$ D6 T# E2 x5 U
进程5 修改数组第一个元素后-----> 301
. j/ x% d" W3 P' g7 S; Q
% W$ P, h( h3 o. E
( ]4 u% v% m8 }4 S& o p
, ]) ]! c9 T3 S- u* z) K R进程3 修改数组第一个元素后-----> 4018 Z" v! W. `: i! U- h
+ Q* R5 W m+ |, e& D
- % ~' |1 U$ k5 d3 P" F& M* N1 _$ ]4 ^0 J
& O6 }* z9 S, ^' }0 g
进程1 修改数组第一个元素后-----> 501! l" _8 U9 g2 p9 x1 N
* H' @- m0 W- T
+ _% F* l4 e9 f9 _3 k. P" ]$ Y' U% I+ Y
进程6 修改数组第一个元素后-----> 601, |- Z: \ L# H! [1 j" n" p3 |2 |
# W' \9 ?# M: i3 ^- , l% u" g5 W2 y9 p# h+ e1 u
* Z8 e. m) G" ~) P
进程9 修改数组第一个元素后-----> 701, h; i5 D5 x. Y
7 n: [4 |9 L5 x" h
4 |, D4 M0 Y' k8 P
8 y* X0 |. W d% [进程8 修改数组第一个元素后-----> 801- H3 ?$ ], A4 N( H4 i* z0 `
5 d/ k9 V6 v: K: N$ M, d$ f) f
: }& a" y: `+ F2 W+ ~: b' r$ b# `$ W8 ~+ \
进程0 修改数组第一个元素后-----> 901* Y m" h2 {3 Z+ f6 |+ [. @
4 U/ X1 Y* m: I
- 5 A/ j. {" d& o4 `
, Q# G5 l0 b, Q/ y1 m7 g8 B
进程7 修改数组第一个元素后-----> 1001
; Z1 s; I9 |6 a
% F: R6 j1 \' D6 ~4 G/ _
$ Z3 Y I8 [6 T& a! r- A0 e
1.2 使用Manager共享数据通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。
# p5 i; k& ~% ?
% C0 [' \8 D% C% y, hfrom multiprocessing import Process
( K/ H9 V7 \' l: }
0 d* t! U+ T2 |$ f( L" h9 K- . ^# i( h$ i) V) P! m5 l9 ^
# F" h" R6 P6 g& V* A
from multiprocessing import Manager
7 C' c+ p1 F+ w J l
& l9 ^9 [! f; C
3 n8 e, g! k2 w8 o- E X, ?4 X
; W, W( j& y: D+ E$ X! V0 I4 N1 b- F- C* T# U
$ d6 G- Q, }+ ~, r) j0 o
P5 E$ T. @( Z5 y& W" O6 d% {" \% ~% b- V9 g& S
def func(i, dic):
' Q$ K8 h" C% F/ O& o# J
! F; h2 j. w( m3 \: V
+ Z6 P1 s( O. R- K( c6 s4 B; O' c1 s) G7 \ O
dic["num"] = 100+i
) Q3 V% y6 u5 N. E4 {" N: Z9 R0 Q' q4 M8 T( p4 x
- ; D( K. j* x2 l2 v( U" c- O+ U- a
8 z! ?' T6 l1 W8 Y2 R/ a! K7 f
print(dic.items())
8 S3 w g2 c; w9 h4 ~/ |. Q6 c. P- D: M. B2 n2 W
' J$ q2 f9 e% O+ b1 E) E/ x" n, p* r! e+ B2 p
" W8 t; c( U# U& I& Q, ^. x/ ^) k
% f8 t6 e% P; ?6 L* u6 b- " \3 @1 |! L4 n/ \. {1 i9 K
9 `! X: p& h# j. @/ ^% T4 Qif __name__ == '__main__': X8 I$ b, f2 c+ u5 g! x
f! \4 K4 ?: i - 0 g$ e) x/ P% f- a0 P" L
X: O1 r/ w1 y dic = Manager().dict()$ |3 W! \# V5 E! p
9 g7 Z' g2 Q% S2 ]5 J. w' z - $ D+ D+ C& o* u$ N* P& x
4 P! v& I9 E/ _7 a) v7 _5 l
for i in range(10):
. C2 r$ d) d, O6 R L
9 ^7 Y ?8 S" x7 L& _* F
& m# k. I5 |7 {( k8 n2 p8 M9 f/ S* |; M+ k8 @% \
p = Process(target=func, args=(i, dic))
2 \3 S+ z1 \; f, {1 j
$ Z( u3 G# y% q# x
l4 V" k5 G0 K) f: @7 h
4 ]6 R4 ~ c# v% o p.start()
" M& M% j9 {. w) a! Z6 ?4 V4 m; n$ M" k9 x% \, T4 L6 x* ?8 M* \5 i
- - F8 z1 ^9 `1 \! L+ \% ~
( p4 N2 Q L+ V' w
p.join()
: _- W9 S% B0 a4 V; H0 q, k+ T2 i+ T8 J1 ]3 N% `2 i
5 F/ \ _0 W8 ?$ o1 P
运行结果:
( ^& M) D$ k6 S9 t, \% \2 D
+ T, x+ \# T) g( C3 B0 J C" U[('num', 100)]6 R6 g3 ~# n0 u& i0 I" l$ M& a
& O& x, [0 R; C1 h- # n: y, Z. @) V! q8 l( j
% P8 e/ N# y- G; H
[('num', 101)]
4 L9 [2 g u$ R, L% G$ b+ ]9 @* A# u* _0 {5 O* f
1 P% }5 K( k5 z t4 A& M+ Y0 j1 r# ~
[('num', 102)]" Z( T. n( \( }* y% C ^& G) ]9 N
) ^. H2 B/ d( B" @" V, P! i; d* m5 _
; \; z3 P6 M, n
! c6 p. G! F$ b. |; U[('num', 103)]
/ C7 L$ w1 l+ q* v u6 |+ e! Z5 r7 K4 f, K
4 a1 _2 ~% `9 y0 m5 ]8 m
1 F: B: j3 j: |$ s' {[('num', 104)]
: K, x$ q7 ~/ U* W9 A6 [7 Z
3 Z* Z, L0 y* u7 P* u$ V
' D" y5 {) V$ x% M/ {* R# R$ a
2 Z- s: ?9 ^( i% {[('num', 105)]4 k4 ^- l( T r3 k, _; N- L
2 T+ }' m) P& y8 d
$ w* u0 W9 c# V( k: k& M4 K( B, L, ?9 Y$ \# B
[('num', 106)]
p& G6 _( s- F8 l4 L* Y
8 T4 t9 [3 u3 f& a& E* L- ; y. X5 f2 h2 q7 h! u0 V7 V
1 Q+ e8 |& h2 o1 H ?, i; d[('num', 107)]
7 m( C8 F" i& L. ` K! ]- [+ P
- M; F) w+ w9 [4 h% o6 w/ }, d2 M
# y! m* H2 S" W! x! a& B9 m, [7 _[('num', 108)]
; x. J/ Q& X: p# }4 Q; @7 `" a8 C2 B
- . y0 t+ C6 b& W6 `
) p0 ?7 u) u) K& i( ?+ p" I3 j[('num', 109)]
9 v H$ _( C' B% @, O& A; L+ t2 T: j/ z3 `4 m
) k; Q2 ~- e- _5 f$ L# |1 X p8 w
1.3 使用queues的Queue类共享数据multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示:
& D) G- K* ?& i8 J0 g
, B0 @# v. d: g4 timport multiprocessing
% o+ J4 }2 f5 q& ^
$ n; P+ T0 \2 n9 `) ?8 F: q- ; D' q6 X' q' a0 r, A: z
( F: P; S$ v7 @4 N0 _9 jfrom multiprocessing import Process
. {3 z! h$ d: a+ O/ W" T1 u, X# N. N+ {4 w' X
- + Q9 t$ L# |- O) V. g6 s
; p/ m$ q, B' d% Q" B! z( d, Ffrom multiprocessing import queues1 I; \2 N) A, m) ]& r
) v) o9 ~5 b! k9 t H
B0 z5 I2 A0 d9 t) E: K; |# }& z4 \' ?% s( A/ d2 n
9 ?; `8 V; t: ]4 |4 w8 d1 c3 M/ K1 P
# G- S/ v0 k2 y) l6 F/ j8 x% a* M! |" T
def func(i, q):4 t( l2 g+ o; H, L
$ s- b& _) i4 e& u! c1 x0 C4 r. B
- - ^5 O' R( M# S! a& g) _
! h: s2 ~# m8 K7 i# u$ E% ? ret = q.get(); o0 W4 l! v6 I( k7 l. O* \" I6 b& }
5 l- ~* r+ T2 Y% Q& |- G
5 M# x6 b- p2 v0 V& p/ f7 C. H5 N4 p3 L2 l [6 P, C7 I4 x i" ~
print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i))6 A! ` r7 x6 E; j9 v* D
, o1 a- e0 [9 @
- " d# b! I9 R, X7 w) }
: T5 E4 ^' \7 `0 c# [6 e
q.put(i)
' j* T/ U2 y, Z/ W) G9 b* Y7 N z/ ], l' q; z3 D
1 j" [, {! m+ t& _0 E0 W+ ~4 l; {# b' I4 X
7 V' c* m" o# o+ j" ?( z7 K: Z4 }
0 D! s! r# X0 W% O
6 K* ~* P# y' C- J. J9 R3 i$ d1 I# y) C3 F, Q3 s
if __name__ == "__main__":/ i: e7 n4 E3 J( F- K8 C
9 A$ Q7 C B4 A! Y2 L7 v- $ x, g5 T8 z! f9 S# @
* h: D+ Y/ W4 x( [$ G S- G' ? D: O
lis = queues.Queue(20, ctx=multiprocessing)
2 `) q2 V) H$ Y* S$ s0 R, l! O; V( K
- $ S8 z7 S8 M8 S$ H! w) M. ?5 o
1 W1 G+ M9 j) l0 S
lis.put(0)
6 r7 P8 ?6 t% B* w( A; F& J+ K! V# o: {1 L/ P) F
, h3 f! u% t6 `" d) x' f1 }* J# j X- C3 P" y
for i in range(10):
: R2 m1 V1 A. J6 e' P7 e* Q/ W9 G \1 g& V
- 0 x) c2 Z& C7 [
; p3 M5 W3 g, F {6 t1 \6 Y! ? p = Process(target=func, args=(i, lis,))1 v3 s+ c5 z/ q- M/ [. V H- ]& p
5 u2 t5 }- n+ D, P' B
- r. f0 L' C+ @- [2 d3 D' c
4 n' A( V& s& [" s p.start()
3 D7 M. C) q7 D! {) ~5 b" n- D. s- f8 X K- r& h0 D1 w; \
' E! S/ {5 m+ x0 _
运行结果:
- 3 @/ H6 R, c! ]+ v
6 ?0 s: e( X/ [* M( b: ?* a
进程1从队列里获取了一个0,然后又向队列里放入了一个1
: J5 \$ e$ F% d
+ I* _2 r0 m- F! d S7 ~5 n- s - . h- V$ J) p' z0 C) K4 ]% A* B6 U' e
) ]0 O7 j/ I) t进程4从队列里获取了一个1,然后又向队列里放入了一个4
! J5 e6 N" R/ ?6 L; f6 B, k) Z/ X% m. X
- # H: P4 N3 s5 j# N& k7 l
" N8 X$ F$ c* Z# V
进程2从队列里获取了一个4,然后又向队列里放入了一个24 \: `1 J, ?% ~, K v# l% \5 k
+ @, s: t+ j3 W6 w; _
7 |5 |* M% e; y0 E# ]; l# m
! u: x+ g7 Y) P" q7 _" u1 e7 h进程6从队列里获取了一个2,然后又向队列里放入了一个6% A3 v4 j! f3 T. }$ C
( ^2 y% A/ f) `8 C( d7 r! `- & R/ c4 P1 K+ j, s t2 C. `
" ]* { o4 A& V" ?
进程0从队列里获取了一个6,然后又向队列里放入了一个0% q. `1 S) Q( _) e
& ~& Q t' b9 ]7 W6 W" Y7 r9 y% g
- * Q2 g7 W+ ~1 X! `' Q- H5 z5 g
8 m- j% w. A" y& R# z
进程5从队列里获取了一个0,然后又向队列里放入了一个50 ~9 z. ^7 }" x6 H* V6 [4 B
. `3 n/ v7 m- U8 r2 t
- 3 R8 b4 p" e. J2 t( D% i4 p! M
8 W) x, U) O8 G" S4 f! y* D
进程9从队列里获取了一个5,然后又向队列里放入了一个93 n6 \3 R5 s2 N- j8 g, d9 t
! Y+ L2 y3 a& i% y
- ; x% T- C. }5 e! Z# g2 d
. y: A3 h1 f# L" w) q1 H
进程7从队列里获取了一个9,然后又向队列里放入了一个7
3 r3 I9 M5 q; a2 q1 T- B0 k* r( [, l1 d5 |$ v: Q
- * H2 ?6 O O7 b$ r& D
& k7 D8 N$ G5 N$ B/ `进程3从队列里获取了一个7,然后又向队列里放入了一个3
/ M2 m0 _& j8 H+ h3 ]- g. g% z& \0 Y8 y4 V
. S3 Y7 }) t+ D' H' l( C, q' v4 G d+ B2 O- j3 V; P# i% {
进程8从队列里获取了一个3,然后又向队列里放入了一个8
4 ^$ A! g7 @ T' o% d4 W: g; o8 X- w) e% Z' c
8 Q' s6 a3 R6 u0 m% l$ v$ H' H: V8 C
关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。
2. 进程锁为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的,这一点非常友好!
: ~% |; [3 g: a! }8 H: |5 o2 p. N5 @$ X: f8 P) X* j
from multiprocessing import Process
# R, ~# N ?9 Z* _/ J4 H: V8 z \2 j# S k1 ?* R+ G
- : j: B; L$ {% M, _/ ?
! Z0 ^1 l8 h& a+ Y( K+ @from multiprocessing import Array( t7 h5 w4 G/ X9 x, W
$ R9 W! \8 h N& t5 ]2 f - 6 z9 v! K5 c1 B5 ]
; A# s" z: D- S" O" }from multiprocessing import RLock, Lock, Event, Condition, Semaphore0 m' t" ]% Y8 l& O' r" m
$ s9 }2 }! K8 ]* ^$ h1 B
8 F9 Q/ E! h" o8 ^: T
- P1 f) Z- U6 B; [ |9 Y: v' ^3 r' bimport time# P% `. d- w. c. u
$ T) Z8 \* V# [
- ) S# g [+ j# {& C& d& U
+ c2 a% \! |8 i, D7 Q T" X* `; C
- {8 c+ ~, S( {! d+ d4 e* f7 S" ]
7 ]' t: }3 O y
6 g$ P! W# j# j0 v3 p* {" v; w, _
' d2 W% ?& e- f5 |, wdef func(i,lis,lc):
& \' W' C6 L" ]% B' h5 Z8 s! f) Y& a, k; m6 M. K6 S* o
- * ~% c% V! o% U5 [
3 a0 T8 l" K4 r3 _, {
lc.acquire()
2 R, ]7 d$ F2 G' t/ ~, ~2 R/ w6 Z- [7 d. k! o+ M
1 H: M6 J e5 d0 l! }) }
" s" M3 N# A6 Y lis[0] = lis[0] - 1
, Q1 |0 K; @- D+ L
- m+ u1 q0 w& O0 d! H
( \8 [' [* |" r2 R" Q" Q' @+ Y4 H% j: P! C
time.sleep(1)
' e2 j6 N w5 {" W @. W
$ u( j* h9 Q0 g7 _) G) |& A$ j- 8 L0 ~6 D0 v2 B0 `
& n0 ]0 H( ~. @ print('say hi', lis[0])
1 j, K2 M7 `( i) G, s, @
# d9 i5 g, c- L- N3 D; s8 v - ( W0 m* b; D* P6 K
( b) N7 A& ]* m& _8 N lc.release()' K6 ?+ M" G/ u* w' d* p7 s
& G \; I# [! a
- 8 h% D* v6 v. f- i* I! i0 t5 t
! F7 s" F) H* S7 ?% q
* {; \! |: H+ H5 M! {1 I6 D. J$ `& }
- V$ H6 |) p) T3 { q6 H4 \' P7 ^& a8 O
0 \. Z* |: B$ y# E/ Lif __name__ == "__main__":
0 {# G V& N, E/ W
4 Y; o% n/ \" B" n! G2 z, }
; Y! U; u, z) D$ d& M0 e [/ J. i5 G1 H- v
array = Array('i', 1)
R) v# e# T/ c9 h9 L' c+ e# `, B# y4 w& l2 M: n2 r- X2 c! C9 {8 m( q+ ?
0 B& M" K: F4 `( t# o* ?
2 v" t1 J. ^: e! w5 v array[0] = 10
- U: N; Y ?+ s, [- { N' D3 t' D4 [# M3 }
- # ^, k7 D) p; V8 _
$ [2 U6 i) `$ P+ U: p0 d9 ]2 | lock = RLock(). z9 y; ^. m9 Q7 W9 t, M$ o( F
0 V7 k/ B; h3 v, A) Y y6 Q& `# K
- - s/ a3 t4 H% Q3 ] d( a7 h
& W! G1 F1 J/ t9 T& K for i in range(10):
3 _) e. p i! X$ t: a6 I/ e4 l4 g# ]
8 U c$ {" @5 M! Q* U& j$ \) F* s5 E) Y1 ~& J
p = Process(target=func, args=(i, array, lock))
, e3 D6 [ n; b. q
B2 f: \ o$ J
, a3 D! n Q* R5 G6 F9 K' ~8 t& R
p.start()% j; H+ \( x- x V8 ]
3 u2 ?; x; l/ t7 Q5 J3 B
# m# O1 m. B# c, U( H
运行结果:
- . C5 @( j( G" U
& x/ D' ^/ B! d6 X
say hi 9! ], r& v& v' W3 [% `) c5 D H
5 N8 Z. H( l# e
. N8 u' U! f; z- d; }0 @( e. j3 o
say hi 8 v5 x" {* Z, G; L9 Q
& E2 f* \1 d) p9 z4 s
- ! h) j( V/ A$ ^" W+ B
8 ^9 s2 @5 K9 D0 R8 m* @
say hi 7( O/ l: }1 M9 K* T
% G n0 ~2 k# }9 D* w
/ m0 r! L+ `: t# e- {& p: v f: m6 |: t% _
say hi 6$ h" z/ H y9 s7 k
- C+ `& u( N4 r& w$ s! k
. |/ h$ ^0 J9 T6 `' T: R1 b1 A! I
say hi 5
+ F, ~4 l; G; e% W0 h' t; H) a. j
+ a6 X5 i8 F: |# f% |1 d ^
. r/ B. d7 j" `+ _8 `+ y
: @! Y" v- }, c2 b4 Y9 {8 Gsay hi 40 U: H& E$ u5 m3 q/ O
* q- X& k: Z; v- y! U- % a1 z, q- O5 M/ F( H. y' _
( e6 k/ s" K% n% t# @3 O
say hi 3
1 f2 R7 k) h8 ]! `
% B: T! L4 G; Z' R( _" D# A - 0 U& x5 }6 z2 {* N3 e' E
, Q) _4 z. F% s0 b9 J5 T2 dsay hi 25 r' D; M9 Z6 i |% {& Q! {, j
4 l+ U6 W: Y. [9 L
6 O# a2 u$ j) R4 Q+ ~. Z* m5 ~; `
/ i% o+ Q7 x$ hsay hi 1
! I0 T) i' a4 x8 h. M
- O' f+ D5 j, m2 f, ~- 8 w; @6 u" D. ` v
: H' z: R3 S5 @* s
say hi 0
# @$ v" d5 O! V) U
( i& ?; R3 f6 j+ q( l: ^- E4 U9 S2 p' F) c# W, I) Y2 _1 N
3. 进程池Pool类进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。
比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中常用的方法:
+ O; M" x. z( ~+ Y/ t' U+ o( x4 a% Z. D9 g; P" e
from multiprocessing import Pool6 A1 K! _! ]3 R& A5 s! D; ?
3 r6 z$ D" S# U! b) o4 D- ! {8 ?1 q0 `0 ^5 a. ]
" g( g/ @+ ?. G
import time
# Y# R3 i& p. I$ J' Y
% s) D3 f( ?3 Y8 V5 F
. r: d- R+ R" J; I' ]; x# q! |# ?9 ^$ z& K3 j) P
3 t' D6 P" _5 k5 l& @2 \
X, J7 {3 y* R l7 d: k n2 v) f
0 l# t; h+ ], D- L) d7 ~, P+ Q! B9 H _4 A0 o. v
def func(args):2 [* R) V( u. h# I, _
" d% C9 B8 C0 O5 w l& H
* n5 Z" w$ x- B# n9 @ U0 c9 Y5 n4 z6 U2 V1 w
time.sleep(1)
" K9 ^, h) }- K' I; J# E8 K" Q% t
" B( T3 Q5 h* E1 o; G6 f; {
, y. D& D3 K7 ] A
7 d8 Y" ]! f) h; e. K6 e }4 } print("正在执行进程 ", args)
, o& o0 M; V! A5 Z2 \7 c- y% f! ^% |: q' d& ?; e" _( ^9 N
- 0 C. a# O8 R3 H: i4 s1 x, W, |( ?
+ @1 A0 w$ j2 C _
- B5 E6 B& Y1 b! f
6 A- W( ?& p8 n4 {/ M - ! d: }% w, j8 z) c" s* A6 y
3 T. @1 T& N# a/ q% Y1 c
if __name__ == '__main__':
& Z/ Z( O9 S4 i* x- T& E' k, M( S" }0 @5 l3 i G
. g s% z6 B5 O& {- q/ F, y" s: F( {+ G4 n
% b6 W+ ^0 v8 J& k
! N0 N: d/ Z9 |, M& x! D) G
- 9 O E$ k" h4 V% U1 D* T
5 J+ j. H* f# s3 @0 b p = Pool(5) # 创建一个包含5个进程的进程池0 i3 {& `9 F9 ~
0 G5 i6 q2 E: H* c. a j
- ( x3 K, |% I: R" V2 F
- d# t$ a7 A" ?& @. f$ L
2 K" {$ P) } j" v" h; V3 Q
7 `6 G& |! u9 H1 {$ V& t+ W7 i) Q
7 B% l" n" l& B6 E3 Y* D5 e) W) h% p$ p3 I/ J( d: m
for i in range(30):
/ e! d& Z* g: u- o) t
: E# `' J& Z6 ^+ N$ y" _- % h) n8 V0 Q/ H* d$ \% L
( V; }6 j: Q# {1 m- I* H p.apply_async(func=func, args=(i,))& ~4 o$ @+ e/ c3 } X, L
# d' O( c; S! q2 ~6 g& h. |# s
0 H7 Q( L/ I1 F3 b) D
( }6 J/ x: ?3 G ]3 ?
w- G8 b: b) J3 e! V# `0 Q
7 R9 q2 }+ J2 g) V+ r2 P
# x# F: X) ]0 ]2 w) g5 O2 s# R' {. C
p.close() # 等子进程执行完毕后关闭进程池( {6 l2 g9 D! R6 i- ~( j
: e" p7 j' ^: F) i- 4 G3 m9 E4 p, s; I1 v% j
$ [. e0 @8 z' j0 j. U
# time.sleep(2)
1 m) w! Y/ q P1 g) }2 b c6 K
+ v9 ~2 b6 v8 N5 B/ P
9 n: y T* n1 `/ _4 N3 X8 G* z3 r% O
( k% _+ {7 V8 b z # p.terminate() # 立刻关闭进程池4 V; k" j k( X. c9 g2 S3 d
" F) j' D" y+ V: d+ N3 }8 C' q
1 F8 R( _) E; k/ I: a3 v+ s/ ^! Q' ]; b3 j1 D: r6 l8 u
p.join()
/ H/ N! P9 p+ C& P9 l7 G' S/ T3 Y# d, z0 l- E: H3 ^0 {
& U7 u4 j" s) s; x6 D% W3 D
) r% x2 r; @4 h6 w1 a) x请继续关注我
2 y7 B+ Z$ L& P2 ]
: C# e x. j2 i
| 欢迎光临 数学建模社区-数学中国 (http://www.madio.net/) |
Powered by Discuz! X2.5 |