数学建模社区-数学中国
标题: 爬虫(七十)多进程multiprocess(六十一) [打印本页]
作者: 杨利霞 时间: 2020-5-31 10:35
标题: 爬虫(七十)多进程multiprocess(六十一)
) y7 L* [. N+ U3 a8 c' Q( R: t爬虫(七十)多进程multiprocess(六十一)2 v" p" F7 H, l+ D: v7 S6 \
Python中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。
另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。
下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。
- $ m( A1 E- l' T
. q2 \( |, T1 u! U. fimport os! p, |3 ^; R' F+ f1 Y& b
: S A+ y& P0 T
( U) l+ Z( @/ u. [
0 M- P1 o6 r% h; {7 Yimport multiprocessing
* G6 A( T- `6 k1 C# f
7 s- _5 M$ }& v1 h" M+ K* f- 2 E# `' W( k7 d
5 T) g9 y' E; a+ p* H$ y+ a2 f/ W5 e" e
* H; [9 u/ s4 ?% H
5 Q: f: S# B$ ^( A- L3 Z. M# e* ?
9 Z) U6 O7 i5 P( ]
$ m6 V, B+ E# f. rdef foo(i):8 r6 i& c9 W$ }/ |' a. |6 @
6 v ^) B, O' Q* V0 O; u# k- 3 W8 Y# b% v1 ?
$ X% b: c6 d# E% u2 g, [! z) |9 n
# 同样的参数传递方法
+ D$ b5 H0 ~) g) x$ J+ B7 g, j! Z
1 D% D. J4 l' b- _: e6 I5 K+ G% q( q' M2 y+ d
print("这里是 ", multiprocessing.current_process().name)
3 S1 C8 c q1 L- O$ J
4 V V+ E( U0 O2 V- 7 A% P9 J7 c4 k5 \
0 u3 X6 i* i( N# W
print('模块名称:', __name__)
8 E# r D* D: O
0 ^+ @( w% |, A: n( O" O2 m1 i
3 b* {) y* Z8 M* p- v! |
; ]6 ~3 e6 {9 j" N; i( a5 \& \+ \ print('父进程 id:', os.getppid()) # 获取父进程id1 H! O* f' X: r! [
# X) C0 f- Z% g. h
9 j. S2 [+ z* o2 |( m& D" W/ B' Q b# k: T& \1 Q! |. X1 p
print('当前子进程 id:', os.getpid()) # 获取自己的进程id% v# V M T; R M
! _4 t2 d& j7 p0 R8 c {8 |- 0 L+ b; A, d5 U( C
1 W. N. d: b5 w ^4 R8 m8 D print('------------------------')
: Z0 e, g2 f* W& ]% y4 p
# J- A8 o4 j) }6 h* ` - # a) i* ~" p5 n I& P) ^; ^3 K
9 w* j _+ d' u# ~ \0 b' e
( B6 c% ^0 s2 l" V
+ L* e- W0 n0 Q0 s! e% H6 Q% G& \ - 8 j: W5 C/ g% R7 r
" q4 N. R5 L8 i4 n4 m; v6 d
if __name__ == '__main__':" d& Q; V' Q; z" j% |3 P7 v
- W$ N+ Y) p @: d - ( Q, E e4 c! j8 C; v; Q+ t
3 W' j, b2 T/ f& c+ q" i
# `5 ]. ~0 P" E6 f1 V5 w" g8 B6 i: P5 O& Y" H
- ; x! g' n7 R: X$ D1 M! x' H
. [& S! H, z# ~
for i in range(5):
2 h5 J3 W( ]. I$ _" r
" v6 {; l6 r2 `9 ]' k( B - 2 j! G4 y/ W/ n$ x! \
3 o9 A9 c$ ]/ H9 ~# n p = multiprocessing.Process(target=foo, args=(i,))
) G# z0 t# I7 Y7 c ~4 L" R
$ e& Q9 B6 k5 J' v' q+ x2 J7 o - . I# F! }% h9 q9 H; e4 g& l
( d d# J- o4 e, g4 A. H" L p.start()
$ ], A6 D4 I7 s4 s$ j& v) `* \8 g0 X0 c0 X9 k9 E* u
* @8 c- T" Q% B9 k0 H
运行结果:
- " ?. @) R: A) o% s( B7 w
! G6 G/ v+ j. H" V这里是 Process-2
* m& r- g. N9 d
% I* G: U2 A" B+ ^0 z' u9 c$ t - & a1 V" A/ ?! i( H# l7 {8 p3 B- U
1 Y+ H+ s3 u3 b模块名称: __mp_main__1 |! e1 X' C, J/ _4 X p% X1 z R
3 e7 j$ n' @1 i
$ J x+ T1 I9 {% W$ h& G: B3 l% o5 P. c9 q% ^6 H' U1 T& ]! H
父进程 id: 880
; _' V" p$ l9 y, A5 L. | N( v( S* M" _9 |
4 ^) Z8 ~9 ^/ u- O- [8 m$ P/ g! }1 L* b# U
当前子进程 id: 5260 t4 l/ o% A1 n. L+ |4 l
7 v2 `6 a" |) \- ! s0 v: }: d/ b. Q
" ~" G% w( q W) q8 ~
--------------" d. K E0 s1 Z% s" P2 g$ t6 L
- y% y! A& y/ ?1 _9 @' l
- z( L. O+ Z* @: y* q* ?( P& D* C- D
这里是 Process-3$ N, y0 }4 N( ]6 B) B0 G
( L% [7 A0 O/ W) x
% A) F7 v5 J3 T9 w) G4 _ T" s" C* E$ O
模块名称: __mp_main__
" o- P# [3 X2 r/ H6 U. n; i/ G
$ |/ c8 t; J) X
8 k. k/ S) P+ y2 @" u
) s# [ W v$ |: e& o/ l3 S# a父进程 id: 880
# R1 _3 M: r0 Z; ^
9 c1 n8 A; Q# g- g$ J7 o- $ e# g# f# x# W5 w0 A9 S; }
( Y# i7 T. x& L% O I! b2 H5 U
当前子进程 id: 4912! m; U: f) n& C" v
& M1 }8 w- ?1 V& x8 { T8 a8 h
/ z+ R6 U$ A( e: G, B: V$ E! T) l- k3 K" h* m
--------------
: r" Q% [- B B; C# P
* f" X/ `' u, S# C4 t6 ^- ) Y2 E6 @. S; r4 Z' g. v
' n7 }! [1 L+ A+ \
这里是 Process-4
9 z+ f) _. c+ ~) x X$ j W$ ~$ G# ]2 ]# K, A9 b
- 3 U2 w* N5 h" N; z, t1 v! `6 p
9 I6 H1 t# V, h5 c4 g! m
模块名称: __mp_main__" E0 ^7 @5 [1 V1 Z; ?2 d- p' p
' i* t6 Y5 G: L/ T
5 l. @/ ?3 e. g( Q
9 i6 R# Z5 r9 F2 Y) h# [父进程 id: 880 C1 K( h. q0 ~" a. A0 t8 z# Z
4 N$ s& e( ~* o9 H# n! w- : j0 K# }0 C% @9 a% W" w
: R" M1 I f+ N当前子进程 id: 51760 F4 X2 W$ {, A& f
/ a3 \7 R5 `* U9 h3 j! l
- . I1 K3 `: f2 X' Q, ~! a ?* k8 R
. r1 q* I* Y g* h. W--------------
; i2 h9 X, G8 T$ q2 [9 `
9 N. V4 a5 n7 u3 V! h; r
/ f) i' D+ E" ^ g" J6 C' a }4 S; N- i2 M
这里是 Process-1' k& P0 |7 ?2 M$ d
* `, Q* ?) e$ |4 x* C' S8 o) w
( B; U# T5 B3 F1 u6 f+ o4 o' U" }' O! u! p/ O* Q' l( A
模块名称: __mp_main__0 h6 U, }: Z" o
) }/ e+ F4 h+ D. p z8 ^
y3 K' s, z; E7 Q" J2 l/ x! V$ i
父进程 id: 880
4 P% D& S8 O7 D; k8 d/ }9 @: _+ a
4 J8 b' r. U! e. ?- " ]6 K' a, O5 d% u9 j
3 j' y" I$ I3 j& f( i% M
当前子进程 id: 53807 C# v0 E1 N4 r/ x" E$ X. A6 \
1 y3 O5 X& l1 m' @
- h$ O! s& I2 m1 `5 r7 {, j0 k- C/ v/ S4 `. p
--------------
! \, g9 s. J. I# a4 P, d! ?
3 Z% z I" M: O& ?$ ?0 [
& S, A! {$ G. N5 _4 @1 \; v) ~) T, }! h$ y5 k! w7 C8 Y f% L0 V
这里是 Process-5
% } L) [! J. e) y" u3 I3 l9 i r! v* b: U" [: l- V2 B" V' o7 K
- - w4 U- F( F# I7 H. G4 d i
$ e9 Z3 F% V& C6 x$ @* D) {
模块名称: __mp_main__
% {, p1 _$ b. I7 @) W
9 E, v0 l" X$ z
6 Q) e7 k2 D6 C1 L: q# ]
+ p( D3 c1 S$ ^% m父进程 id: 880
) Y/ W2 P8 b/ p r* I/ ?% o. `3 M% ]0 r& X. }
* D/ G6 N8 k& y# D: M- g
5 { ]' F6 e n# F7 B1 ~当前子进程 id: 3520
9 |9 }" q+ n6 ^7 d |4 z' i$ \* A8 k, |+ k: r
! n# {" V, Z- d6 N$ [) V8 ?9 ?
- t3 M( ~9 B$ z( d8 j/ E--------------. s+ _( S9 Y. S. @2 i4 O/ n4 I# m
, c7 H- Q, s8 c& T/ F
8 z: E) ?: Q. X* R1 ]0 S! O
1. 进程间的数据共享在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。
创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。
下面我们尝试用一个全局列表来实现进程间的数据共享:
- . S: V; {4 E; q/ x/ \: o, t
, H0 U/ S4 h+ H& Lfrom multiprocessing import Process
; z' Q1 l9 |8 @0 f9 F3 }( L- d9 f/ H
- 1 I6 M0 g0 ~+ h0 ?% L' T% t/ ?4 K
# g; E& {) H3 l
2 `( }4 x; [0 i- q
3 O1 H1 d ?, J2 ~% s - + T. [9 b) ^0 N( p- r7 [0 X- p" @
; a0 Q9 R4 J# o! G$ K
lis = []; W* i/ s1 k: y/ [- q! z6 p" u5 G( Z( Q
4 J3 E i3 M- b" }3 _& |$ W$ E
: z+ X- A c8 v, y! K3 v, b, x2 Z+ ?5 M5 w# A6 D1 i. ?( B
2 U7 O" n- E0 i, T5 P4 X/ b
* A3 U F( E6 m; @1 g- ! s* s7 U$ a/ X4 P' S3 c1 w
4 |; J! v6 k7 |# f3 I- ^8 qdef foo(i):2 M1 U- @& y& C7 p9 e8 l
1 ?5 N4 T7 [$ U: Y4 ^ - ; ?3 I. t5 B! w; Q; u% k3 l T
. [- U7 U) p3 ~. l G$ _9 L( [ lis.append(i)
. I" b- h1 r! G6 j6 U. M0 \5 ?+ }# b$ M! d9 h
- + D6 f* T! h# A( E+ Y0 P7 z0 P& i
/ P& q; s. t2 l print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis))
9 p$ {4 T% x" F+ y2 M" I' ^7 {6 q, r( T. S: P0 O
- ( i8 o0 J. R1 `; ^- Y& d8 u
0 k+ s8 U( l0 ?. U2 m
( s* {2 m' U1 H8 a
8 G6 `6 D1 P }: _- F# W6 N - : U0 y9 C$ X- V! ?5 K
a% e6 S+ Z' X4 v+ p8 Mif __name__ == '__main__':
: x0 k4 N# E9 M2 B/ K! P
- D" e" V0 A$ }1 l, O
A j' N0 K G4 j3 o, H/ x% g2 Y3 _5 V4 b. o/ g2 u+ n
for i in range(5):
. Y" G$ Q! c6 y; {. F4 J
8 ~! d$ X+ |* [6 a- 0 j8 K. l9 ^$ w2 ?! a
, S7 c5 O# I/ D
p = Process(target=foo, args=(i,))
4 A; ]6 ^, s) X2 j- l9 v) T' w4 o" v7 a) L$ g- a
- . u6 T6 R# K# i4 W
9 |# t# ]/ w6 L o* v6 E
p.start()4 q5 ?# ^# ^& X' z; X. c r) v
( i9 }, \/ G" I* A6 U0 q9 A4 u
0 v! Q( f- z2 A( A( V9 O2 e
6 I7 V. ?( w a% K' K. A u3 L! l print("The end of list_1:", lis)
! k) }1 @. |1 W' w" Q' e) S$ m
1 k' n: x4 j$ g2 T, e$ q! [0 K+ ^ Y. x# j2 h$ ]
运行结果:
8 v+ C3 X% n& E2 v8 t5 E6 b& {
0 O% W( W# C4 a& K) A7 OThe end of list_1: []0 |" R) J: V. ` a; }
0 V. g3 O( B2 X' p- @" C6 v/ i
- 0 e3 t; ^" h6 s
! _7 s# A/ h% r3 j6 G3 j2 F
This is Process 2 and lis is [2] and lis.address is 40356744
6 R2 Q. j* Y: k5 W$ S& S2 X- A/ Q" l/ q! C" N) P
- 1 T: L3 _5 _; D" }5 R# O: w8 W
# g/ _1 ^" `" a" S8 q, eThis is Process 1 and lis is [1] and lis.address is 40291208
& H1 C5 y+ \ m9 j
3 n: Y. O4 o0 E7 {/ w
" S9 h1 G! [- |8 d8 w! z7 L2 G" }8 a$ e% E) I7 Y' s( ]
This is Process 0 and lis is [0] and lis.address is 40291208& e: }4 k# p; y d7 b
/ K+ y2 i2 r1 S0 ]* T5 F Q9 m$ }2 O
- ' y7 ]+ y$ R) z# l$ O
7 X, k( P, K; b6 g, v
This is Process 3 and lis is [3] and lis.address is 40225672- o* D& c) C" b4 v
! c6 E* h' @) k: F8 z$ f7 c - ' V$ J) U* R+ ~
1 ?, Z$ k( L7 @$ Y0 u
This is Process 4 and lis is [4] and lis.address is 40291208* w6 U+ u. v$ r7 u
2 Z. v: ?) O" ?
0 e9 Z) u- O! P- a! y/ B
可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。
想要在进程之间进行数据共享可以使用Queues、Array和Manager这三个multiprocess模块提供的类。
1.1 使用Array共享数据对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系:
- , R* f9 E- ~' @7 Z* G% @6 l9 `
* @, {/ w" G& L) K' I
'c': ctypes.c_char, 'u': ctypes.c_wchar,
, ?! e$ U/ N) f( p7 g# n0 I2 j- O& i( o! v- d6 i: m( Z5 V& D c
- 9 ^5 Z4 q" w1 D$ }
0 S1 C* L; v, e4 q4 t5 Z, w
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
: P$ \: f9 ^# c: H: r) Z4 Q0 B7 T- X/ f- Q! W, R
- 0 h- p% C9 P+ ^. `
! T8 m, R L/ v8 h [. L'h': ctypes.c_short, 'H': ctypes.c_ushort," j, d1 ^- A4 Y# i7 e2 y
B' Q5 A: i* B
9 P# o5 W( I. }7 M: P9 C4 F! ?) n" a4 O4 ^1 Y( L; m o& B
'i': ctypes.c_int, 'I': ctypes.c_uint,
1 T8 Y; e# N2 A6 R
2 { ]+ E, ?$ K# u$ b+ r3 V- , F$ E9 ]) H2 ?: @! |
4 c: V6 D! f) \; K: N'l': ctypes.c_long, 'L': ctypes.c_ulong,$ X/ m ^ v; @ M3 ^ y8 |
9 K( e. b' _# O
2 J. l8 T. |, [
+ y" k3 O, B7 W" A4 p3 Z'f': ctypes.c_float, 'd': ctypes.c_double
/ e1 G5 U% z4 s# z4 l6 h/ D+ {* o* i# p4 T
; [$ T: p1 H* }" f. Y
看下面的例子:
- 1 B4 D I' B6 b! }, q. F
3 x* w5 R( u5 a0 z; t, A
from multiprocessing import Process( \ O3 L7 V' c0 ]9 |6 s
7 U0 T) Q. G3 m. _
- 0 L9 A9 X7 Z$ e0 F: A( V
+ ~6 Z" {# K9 C) D' ffrom multiprocessing import Array
2 L3 L8 H2 A) [% _
6 y5 w* Z1 d4 a& Q, G
$ h* Q' O; X# q: }' _) l: I+ |$ ~ A/ {; I5 p& b' i5 R7 L
, _' h0 i. v1 M. }
! Q' v3 q4 J1 ~6 m1 e
$ V7 m+ t4 K. m3 j/ W4 @" D# y& C6 l" h
def func(i,temp):
1 N7 G( W* U5 ?9 n" w4 M
2 W- t( S/ y! s0 B" P8 k% x# w
' g1 j6 B0 u* t
5 N5 p7 b# a. G* \$ l) F6 C, ] temp[0] += 100
+ \$ n3 b: O! J! U, i0 Y* E9 f' x; _3 S5 c" X( q6 F
- " Y9 J8 g9 u. b8 s9 K
: q8 K6 _# ~+ h; C* k& |; M
print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0])
! C2 ]7 i5 R4 n
z* g, s$ H( m/ l - ( d/ c3 u/ M7 L, C5 u# s7 G" u7 N
. E4 j: i# @9 m1 I" M) N
; H. w- M; v9 h; u
6 c: ]! S' F* o - 7 D8 a6 [/ g# U8 y8 y# {% G \
3 N6 E5 t3 j# k3 ] z" J+ p
if __name__ == '__main__':
$ O) b: d* K+ y5 Z' W% a# S0 @, V" y; M1 y6 F4 A- Z3 h
- 4 C: R7 l+ r7 q2 Z; E
5 Y# `3 B+ A9 h% e4 e% f$ ~7 F temp = Array('i', [1, 2, 3, 4])
' o; u2 T1 \9 @: q% |0 u0 u$ `$ s7 `* |' v
- / m. j1 b1 ?9 W% O- w+ O
& ]. t! b/ L" q4 ]
for i in range(10):
/ ]+ V/ N# f" Y. s& ^
" ~" I1 l2 }, z. e d) e" r
2 j4 K1 o) \0 u. k
$ i9 z! h' C) l9 h8 w p = Process(target=func, args=(i, temp))8 {/ J$ f8 h0 q9 `2 d) G) u8 G
7 d5 d- V- B- m: k# j
) _0 _7 b) `7 _' V. ^. y! g# p, g$ q* s; q/ u* m P" a
p.start(); j) V6 E3 F% D/ ]( h+ L
5 V4 J( n- n2 {! M/ J" L: D( _1 }
3 X9 R( W' h3 \; t# {8 o
运行结果:
3 x$ ]) ?4 N2 B( D2 j) X: F! g0 \. W" j- b# H
进程2 修改数组第一个元素后-----> 101- }! m, ? @8 }% u- J2 A
+ W6 k& F0 ^/ C9 l; l4 s/ j4 I3 M' O
( L8 D2 ?* S. ?2 A! o1 S5 V2 w( m; q8 f! S: u
进程4 修改数组第一个元素后-----> 201; O m$ D. U( a6 k# }# D
: J( P. B& @; i$ ?
1 \2 a/ B" ?. B0 M3 g9 P, F* l0 U( C- e0 p* L" q9 R
进程5 修改数组第一个元素后-----> 301
7 U2 x$ K7 a" F# q& ~7 m' R0 M! ~; d8 A3 Z( `# r5 |
- r7 Y; M: H; u" v$ e! ?$ l+ N! e2 H) |% b+ q2 d, o
进程3 修改数组第一个元素后-----> 401. z3 [( ~- z5 P& |# {) ]- H
- L$ @/ o- X# ~6 \- \6 |/ g6 J6 ^4 W* `
1 Y k! t/ z0 U* E进程1 修改数组第一个元素后-----> 501 `$ ~1 D) D7 f' B. _' J8 c2 V
6 T# E3 a9 P, I; @ e
- " y j% p) Y' A. x& u+ Y" \* H
- J6 @8 G, G) _5 h进程6 修改数组第一个元素后-----> 601
6 c5 W% N% f8 v6 P' K! [# h9 j3 L
- ( @) U/ [0 T2 d
$ l5 n' _2 o* R& a+ a" K: [8 H1 s进程9 修改数组第一个元素后-----> 701' `. L9 R# `) {7 T; U4 m1 d
. _1 c* z6 P; k* g- V
- 5 L3 { H+ F5 j1 a
`, O/ i X, I4 d- Z# M, [
进程8 修改数组第一个元素后-----> 801
9 z8 H% _8 b" M7 f" I2 { F8 W& j$ O' N9 P6 t
- $ @5 H& g$ G9 q
% m/ T- ]7 D- H, J* Q y) k
进程0 修改数组第一个元素后-----> 9013 l' a; ]2 K/ f* {
3 ?, ] o, ^$ Q7 B; V
; B. _! I7 ?# X; g5 c/ C0 S3 I; B( |7 a, W/ ?# A+ u# T
进程7 修改数组第一个元素后-----> 1001
" c. w; M% {; H! e% P: `# F9 q. g0 P2 r; I2 X
c! e4 y G( v( F! r7 l4 B
1.2 使用Manager共享数据通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。
0 G* b& l0 V6 } P
+ N3 v+ `& W: afrom multiprocessing import Process
4 w6 W9 d9 S# J' x/ O" y( C+ Q5 g) C' ?- ]- B& j
$ f: B% U) H3 y5 U+ F$ {6 h" V% R5 q4 q- t9 b j' G
from multiprocessing import Manager2 f! J1 C9 R( [/ b4 y
0 Z1 M7 C0 g/ Q0 o7 t! l4 r1 r4 N( d/ T
" R, d3 v5 B0 d! ]
5 @" @+ _& n8 L% M9 x8 i" A5 D2 @- a6 ^+ Q% I4 h1 e. ^0 `! \
0 i2 r$ K, U+ I% n# E, r/ y9 j
- 1 |7 C( P2 K: H+ c$ @( ^* L
^# y e" V+ l+ Z3 Rdef func(i, dic):' B# S" h& @# u
" P4 y9 _; W6 z! V
- ) s! _4 _0 X) e& H: \! i( [5 V" Z
( y+ i- R8 q" T' n5 n2 W
dic["num"] = 100+i
2 R9 [7 k, o9 {6 L0 s( I. S4 ?* [( L* \+ k' F
0 O: L& v4 x2 a1 H" g4 ?4 N0 e: I" e) ~3 o9 q7 b1 K: v
print(dic.items()); N$ R' X8 @0 K$ U
$ l+ ?+ K6 V7 X8 M& S( ^
+ a' L9 A/ m+ K8 q$ r
% s$ P% g; Z6 A. k+ P* J% h1 t' f( c3 t. Y2 V
5 ^( T6 R$ l9 X9 H8 l( I- ) A! K& ]1 `9 @! D( N' G
# c" L% P9 o1 x& Kif __name__ == '__main__':
4 O1 f B( w Y4 \+ c0 P1 g" V4 A- n5 t! S- z
# C+ O0 a( O/ W
, _* c& H8 b% N! @ D" z dic = Manager().dict()( M- u6 Y8 K0 M' h# U
. H2 W$ G2 l- |" ^; ^5 P5 \, P4 R
C+ @# D- H+ R& V8 z! I9 Q, V6 y7 O- X& U) C- \. {9 b. k
for i in range(10):
* j2 f8 m; U0 M5 I$ u" \8 g: r- v0 L' y5 R- }
- & t7 n/ ^$ z! l* H4 F2 d1 u1 V
8 {( x9 S* q* l4 |5 U p = Process(target=func, args=(i, dic))
, ~( ?: I5 ^4 e% d' M* {; l2 B# R, F% N+ \( I, c$ K
, I( n" k+ C9 X& n; ^ ~
8 @0 D) b9 S; M p.start()& ^) f3 W) u( f3 Z
j9 @9 W, D9 t R
- 3 d8 |6 ~. j) y1 M4 O
8 e. ^# ^1 r+ M6 [ p.join()% r) G7 o: L$ v q+ T/ D2 i
" U& x5 V6 P: c0 u' ?& X) l, j [6 F J9 T4 N4 m
运行结果:
& Z C2 n+ f; a4 z% S
& C" l4 ~- h P$ X) X[('num', 100)]
7 X7 R/ a% A3 @3 E# X+ [
& F p3 b; z1 I, I; f5 \- ) f5 n, H7 F& u, a. ]4 Y1 ~* r3 r
: z. Y4 Y9 n3 v% H
[('num', 101)]9 r E: x% i$ ^7 s2 _! m6 N
$ b, x8 e" k# N3 M2 O& y
! J9 b! t; _1 B5 r6 T* i0 w9 n
' o1 I0 G, Q* u- v9 U" t[('num', 102)]
' T) Y2 \* K* I2 t. @ z/ I# B, q! I+ b
! O8 g+ e3 J* r) t
6 g- u* R- m- ~/ @7 H, X& n[('num', 103)]' N1 ]# T2 {+ x% a( z: i5 `! Y
4 ~3 O8 Z( p: E- 0 B, ~/ c( S+ ?6 P5 g* }
: G( x0 x0 C' e[('num', 104)]
% h0 G0 O' O+ a& c y D0 q5 r ~$ h3 |: l; V
, U4 W# X; M: m' ]6 s% t6 [$ }9 ]! F/ r9 h: X0 I
[('num', 105)]5 I8 n" ?5 P/ R: @+ T' P
* _- S; |; L- Y$ r0 u
; ?# Y9 a/ \, c9 ]
; F |& u/ M& z+ @# ~[('num', 106)]' u5 d" v5 C5 W9 E
, a; ]3 [1 q7 U, i# j. N
; G, O9 ]# v+ j
+ c% p2 @* R6 ^. R* i! ?[('num', 107)]
2 E1 u* p. [ b- X8 }3 { I) Q- i7 r X* | h3 m: X
- 8 m3 ^! X8 M) A+ r. Y1 o
, X [# z& c2 q& u) W: v. y[('num', 108)]# T2 z1 z5 k5 k; l, D- v7 [' f
+ _* M% G# L) y8 |1 A6 @
! j. Q* R% h( n6 x$ C
- Q2 j' n# I6 v) {. z5 s, p8 a, m[('num', 109)]; I7 }* E! G, {' u
0 P4 f) |1 q/ `% a( s
# J! o8 x1 h& P: {" U
1.3 使用queues的Queue类共享数据multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示:
- ' q7 y( y U# ^" ]; g1 e
$ I# ]) ~3 r j2 z- ?
import multiprocessing
! M! I j1 o D$ x8 O h0 ]2 Q# {$ l3 ?+ _
/ l! ]* p: `0 t* s& Y" ?3 ?% M- N% o4 ^: J
from multiprocessing import Process% S. m% M } G( i$ h
! m; |! ?4 ]7 N# F# c+ r
- 0 l u* R" t" P T9 X1 y; Z
* u- Q0 j; {& g5 J4 F' _7 H
from multiprocessing import queues
S/ H+ \+ ^$ L2 s" s7 q1 O4 g7 K! L
- # ^# q5 v5 J. q+ L: a& L# Z2 s( w9 `
( V: t" u# U, L+ C. |& h
s* e+ X: R4 @0 D5 G$ l
+ U( _& f& X% j1 g- @4 d1 l' \ - 1 c4 n+ V" m' W
h( g- T) m1 k- R( g/ Ydef func(i, q):/ Q5 i* Z7 H& m6 F2 O
& W" v% E7 Q2 `, L) C+ d - 8 t: P" y6 S) g& V; ^
s: B5 c4 w7 I! E ret = q.get()9 B$ U8 ]8 ]1 N$ u% j# a% e
M6 A* D$ I. U* i: `+ L0 R" j
3 A/ \& Q. o* @) ] e, V
, r5 P. y, U, N, y print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i))
4 C' T) o% p% W0 C( I2 g5 K2 c3 v, V% w. E
. q" y I# X5 c9 e: G. r5 {2 o# f3 L9 x! x8 O
q.put(i)
/ g) y# F8 H" \# {2 w$ Z8 {
) I* W; F3 ?. w9 f- ) q, }0 m* q, D! v
6 g# p& e5 d) s7 T+ h* D
u" N4 C( K" v5 R; @4 l) ?3 l( \& }1 g. Q2 V4 @! s1 J1 z9 e
; v$ O8 c- q5 Y/ {: O
9 w3 p9 N9 x. I* Q2 d0 A8 r0 Fif __name__ == "__main__":) L( t! b- S8 d$ O
: u8 T" F* c/ y5 G1 R! x* G! e
) l% L8 Q1 U5 n. t; o" S
# d- r# T+ o* V6 q3 Z8 Z4 f1 p. s1 W lis = queues.Queue(20, ctx=multiprocessing)% S, ^! U3 [1 s( K6 v2 w
5 l a6 M+ g0 ^
% E1 a# k7 j* p! M8 Y/ Y! Q* e
, J2 e8 H* Q: x- _, D: p- k5 o lis.put(0)
& a* j( F6 m) s& n* i2 [& Z$ h6 H" D. s# L8 r
- * Q2 h7 C5 n. c2 E! X
X- E3 S% }7 m: x; F- z0 J
for i in range(10):
8 z6 ]: P, e5 K; i7 B8 c* l
9 K: [9 ?; U. E6 }- ]" `' N3 f+ Z
5 B# u" V) Q5 w: ^, o P) a# r2 q7 E3 r0 c( u' f
p = Process(target=func, args=(i, lis,))
Z2 J4 J0 t. G! V% C3 V' `
: |: e; {8 j: q8 Q5 J9 R- 5 [6 L+ E" Y2 m8 v9 a2 R
7 e' Y2 Y3 V. s5 D: v+ k
p.start()
! u) \% {& p* _& h
" l$ Q" [7 q$ D0 n. f& o
" M6 u1 p9 F: e6 s6 a. L3 d& B
运行结果:
& O2 V4 I) S& D+ d+ Z& ?/ |: E2 J. `# q" [' m: P
进程1从队列里获取了一个0,然后又向队列里放入了一个1; p# U2 e$ O: y# I) h# D0 H
9 L" ~+ O& _( w" U- 4 U/ O. @" v% T. w
+ _" l% K& o( `7 w/ n# l8 o; A
进程4从队列里获取了一个1,然后又向队列里放入了一个4- Y) |' [" X# @6 e& S, Y. J& a
( |$ R6 g. V9 c3 g
- $ }( V. U, G7 b6 w w
: v8 [8 o4 B7 p. S/ f进程2从队列里获取了一个4,然后又向队列里放入了一个2
( { J" k" U1 o- U" X- @- t2 k) G% s3 J' R# {
& d9 B6 k/ E; ]# l' c& }" n0 X
- Z, @6 x& u' Q! ?, L/ m% X进程6从队列里获取了一个2,然后又向队列里放入了一个6
, B8 k/ a: i% T0 h. p; S; t, b1 \) s% o; |
- 8 W. e$ H8 i9 _/ Q
' s5 Y6 X& }# i2 }
进程0从队列里获取了一个6,然后又向队列里放入了一个05 }$ m- ` j4 \0 T4 P8 q
. j; M9 \" l( X$ c7 k
- 7 \( `) m$ I. Q" \0 S( s/ C# z
1 k8 o& l4 k) z1 m" f进程5从队列里获取了一个0,然后又向队列里放入了一个5
3 }. F7 P K, d: u) ]* n2 f" m }8 l, K0 a" p/ p& x. O
- 6 p* l8 @; h" h+ Y' ~- D! Q' e& Y
6 ]1 c7 v, U# K6 Y& |0 h
进程9从队列里获取了一个5,然后又向队列里放入了一个9
* X" L* I1 W& O) E( u% }% j6 x3 e4 Q$ w& J( Q
- |3 \" O( Q) l
0 l( G/ d2 g$ V5 l& e# D
进程7从队列里获取了一个9,然后又向队列里放入了一个7
7 R* Q) ]) V) q* B2 F" h# k& j- U
- 1 T" \7 I. `6 Z. J
( Y# K3 h z5 O进程3从队列里获取了一个7,然后又向队列里放入了一个3
! ]. ` ]% B2 B+ u2 q
3 i+ p* j" c% j3 f6 K B; T1 [ - 4 f4 b7 [3 o8 e1 P7 v" P
0 `, e/ W$ ?+ u9 Y$ O! d进程8从队列里获取了一个3,然后又向队列里放入了一个8
% t8 P! c. r0 k4 q
- I C! r& ~" @+ i+ D$ `
5 a* k0 O4 ], x7 k- |4 \6 E
关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。
2. 进程锁为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的,这一点非常友好!
- - E9 @7 J; g1 @+ \8 R
9 c2 O9 ]. A0 z$ x) C+ v( d
from multiprocessing import Process
4 C6 g, X% ]/ I5 Z3 q4 s" b0 r& I0 z5 z' ~8 c/ J( p( a! V
- ^2 G( X( n/ Y7 K) G
5 [1 [( } W" I0 |- V3 ]8 }; R' o
from multiprocessing import Array
9 H# i. o: J h& ]' e5 R4 U a9 X. \2 r9 \
- 8 m d, }# I; y+ e. \/ r
8 F2 I; m g$ G3 i4 F) L' B
from multiprocessing import RLock, Lock, Event, Condition, Semaphore% d5 I' j- o1 G4 o
. g. D! a4 ^, V% j
: S3 G& V2 n ?( `" Z# ~3 y. ~( q& J2 a( F* N& ?
import time9 v! `8 ?8 m1 v- W8 r
) r1 |" N2 V. v) T; V3 [6 ]
5 f# s. b1 X5 b; h" m, I# Z S/ a6 I/ M5 h& ^- r. j
( @- u, |. _, d2 c1 s8 e) Q
+ {: a2 Y) U! M8 U+ b( F! E
% S. g, v& V% ]8 k
U+ m/ i& i+ h y# Mdef func(i,lis,lc):, v$ B# S8 `$ ^# A5 h' V- Y+ H
! j$ B; _, }- ?7 N2 [
/ w; Q9 E- r) t& c6 L, E W2 j X- p3 t
lc.acquire()
* m6 W: H( }4 z* \7 W
; B! d( W! v( q* ^0 e" @- 1 h; K. I+ R, {& |! B, W
1 ~5 |5 \0 d8 E lis[0] = lis[0] - 1
1 E* v2 F# X) _9 P5 s- ^
, C* b% {3 ~0 c# f - & ^- e1 r9 f' d" t& X
7 K2 O$ n' w7 q; i7 L time.sleep(1)% Y$ ^) w5 [+ @: k3 H v& E4 F0 g
( D; t& p8 q: N: C
$ D7 P& i* c: w) t" ]5 A
: R3 d: M# _: G. X! Y- p print('say hi', lis[0])
3 \. q4 p; e3 T: S4 w( C, X
' A" V& j; `" H, q! o
2 u; p$ Y6 a' d1 Z5 _" F- R$ J" B- p9 S
lc.release()3 _* E5 i0 m8 P ?8 {
' T4 W- @6 x- ]1 g) `
* @0 i0 ]/ `8 T! p
" o9 a4 \7 ]& E) |
1 B& }0 R' a% l8 _- T+ r) R" M1 P6 p: ?$ c$ Y; R" f3 W) z
- # H2 U; {; C, D' T8 F& \$ D
6 Z6 V7 W0 v/ g
if __name__ == "__main__":
5 P! X% \2 Y+ P$ @1 d
6 [, y1 F& k: A& }' y/ a; o. p4 C7 Y0 I - % I1 u+ K5 Q2 p Q
% O+ C {, h3 I3 \
array = Array('i', 1)
+ W: g9 i: l( d5 e q; }3 M% m# y3 r% H' [7 ^- V; W
7 W0 t- m7 }2 b2 L
9 X* P% V d3 e array[0] = 10
7 T U6 P1 i. R& x. u' K
) k Y9 v5 d c" s! V# n! ~- / x8 a& W4 F1 g% T6 `
; f+ g2 x9 V6 e& [7 Q lock = RLock()) Q- c7 O1 |. _: W
0 O' P& @- P3 i: I% H& T/ o
: q* K, i, T6 S+ ~5 r3 w& h; y6 d9 a2 k# V
for i in range(10):
- a6 e" _9 p2 |1 H4 ^# F
$ h6 w# i; X9 y/ N2 ?& D, f$ p- % w% @7 \4 H5 s0 C" A$ b
* O! n' T: [; |9 I2 H; v p = Process(target=func, args=(i, array, lock))# q5 E" e$ Y+ u& I& E
2 f! B) u" y* {" X4 ~
- # Z E9 a* ?1 j3 {' |: }" Z
+ ]- w o2 R/ H$ P) d( O6 l
p.start()
7 }$ x8 Z5 u1 c+ k: n# t# v, A" ]' G! o+ G3 D' t
B* s7 D1 @& _, Q2 w6 d2 `. B; E
运行结果:
- 9 J) x2 h$ A& l
/ u# p4 d' j; N% j& usay hi 9
. B$ P) a+ k9 r
3 y' i2 r7 W" l. s7 L% D
" S! }+ S$ O9 T$ R
! u2 X B' @/ p* M" Esay hi 8
" w( a. R- V; L9 w. W! @
* \( A ?: t9 ^7 b5 L- # j: E' T# \- k& @" ~
- j! p+ ]2 ]* R. P6 Bsay hi 7; x A9 l4 K2 Z- z V% e0 ~: Y4 ^
0 A0 M d4 K* w4 M- X& T
+ t6 D, z5 |2 H9 L- v2 `! @1 M% T7 W# I( q. @
say hi 6
{2 V, H1 K2 d8 b! M
) @' Z7 k$ b, a
8 Z/ z- z' Y4 G+ i: T) G0 J6 r) G
say hi 5, q& n1 f3 g) Q6 z
* j0 T; {5 v; W1 G5 n* r! C
$ v' e1 x( b+ s/ x# y$ L
- h( }0 o: a/ ]5 F+ Esay hi 41 [2 b1 Z0 m6 F7 F
" m$ U* f' B& a' O" l9 g6 U
5 W. V/ B. |% `- a. N8 R" o# u8 o8 k/ k! L4 K! g
say hi 37 E- P+ Y7 f9 F9 G/ y
, \4 E- w- U8 A& v
5 X$ c7 ? C! G5 F* f$ [0 M* g) ]/ P' m5 y0 k
say hi 2
! _# X! F/ V0 X; b) `9 Z! P6 {. [4 Y# C3 x
2 y3 i2 {4 ]# a u! l; I: r) n9 o' U G- A5 @ f- R! c6 l
say hi 1
: I! R E; m7 I4 {; ^1 g& g
5 T) K; r( m7 B# n) Z# k- : v6 s! Q8 E' u5 w1 M) n, x
1 P4 E. c$ D5 j( r$ `0 ^) Qsay hi 0( T8 ?& `2 M" i* v+ H0 b% F& Q- v
& B. A5 G( o* Q4 w1 g' [
# q# ]* b7 S1 s% |
3. 进程池Pool类进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。
比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中常用的方法:
- ! q( c5 n6 z6 F+ w
; y4 E# w/ ^! m1 ^$ Ffrom multiprocessing import Pool
5 j! b" j' n ], t
5 w# M% U2 d. W
. ^2 h* |8 o! e0 J @8 z/ j% A6 ~
3 H' M( _5 o/ q$ Bimport time t/ O* D \5 o7 U9 |
2 Y! t! _& G \7 ] ]+ s4 k$ s
7 @' v S, R$ g' b0 A; P7 W# @- ~( n# z! Y& |# u% L* C
- r8 `4 I( S5 E+ N: j, X7 ^6 @! p, A: W, ^7 w/ X6 L
, L& N$ ?: D- l3 h0 L( }0 G( n: n- g j+ e! Y; J
def func(args):
1 }' P" k- E: b5 N, X9 @% n
# j2 t! \6 x3 r- ( v, |# I$ D- |' D0 _ k3 C9 K9 w
! R- k9 _* z4 b) U- a9 c; w time.sleep(1)
% q2 _8 F$ J' H4 [; m: i9 D' \5 Z4 u; W2 B0 d
; b/ V6 {) S3 Q3 G- l( c1 A& l3 E/ a/ W. Y0 C
print("正在执行进程 ", args)
& T+ {$ q6 D* F/ _0 f% C. w# g) [2 U6 p6 |) K; R/ }# E: _0 N
z% ]! d6 t- h# ?" j3 i8 D4 j/ Q4 e: k( Z
" g9 S6 o( I& v8 l$ V( [, {
1 z% @( v3 v2 j5 L F( y8 Z
4 D" a6 m% w+ x/ y7 e- g* Y" u1 M3 p7 d
if __name__ == '__main__':4 @+ |0 p& A2 ^: e
2 f9 Y, e" O" G0 M: ^* K F- p1 `# z) h7 L; R; `
! K& y1 x! y$ ^5 ^+ q
; f4 f8 f2 g$ q u$ n
- W! `3 X0 D7 D) c' y4 D w - & w( h+ |* i0 T, X) p4 `
) ?; V7 {% x+ [, a& W0 y3 ?
p = Pool(5) # 创建一个包含5个进程的进程池
; j4 _. S- P7 U, b
+ m) v! [2 ^) L# s% X4 L# N - : r5 h% V8 D4 x
% F2 d' G: g; _
( v9 L& z0 @3 F. e0 u; j
) _( y' q$ \7 c3 m: X2 I
- $ d) w9 |) x3 Y7 y+ i! Q
) O! y7 k7 s9 |( T
for i in range(30):
; W# `% ]* P+ f/ ~# H% d$ z4 A
, \+ _+ J. c. O - 1 t' t- Q( K8 X4 A( y9 I
" C( a% h4 ^: H5 U. ]0 h; V& n p.apply_async(func=func, args=(i,))
, z# i: v" h* _$ B. o
5 |9 j4 y3 S& ?& M0 C# U. ] - ( z( M. K! k& a1 t) c, N! {) A
; S1 g4 t" Y+ U: ?/ W. h9 f+ T$ `0 w4 y/ O$ N6 h& X; U% o1 W; f" {
9 t( ~4 P1 ?' T( k$ U, v
3 A4 A$ A- u9 i
4 V7 ^! m% E0 j. t+ z p.close() # 等子进程执行完毕后关闭进程池
" B5 U/ {0 q9 I( C6 e3 S! h0 H" a! l. H' C7 Z9 o
- + R6 F4 P' |+ |1 h
3 V. w. t& ~. E" ^7 {6 W% W # time.sleep(2)
/ M0 {2 [, h# H! T2 E0 V* \ l4 t& Y2 f$ }; |9 \- F, @- v8 O
- " r: e/ e" Q/ W! _ G) j
. |3 N* S6 u+ S2 ~7 W; Q4 A
# p.terminate() # 立刻关闭进程池
+ U P5 f" M, X4 ]9 Z/ Q/ {, C& A, D" i6 M, O- L3 P
5 V% X9 r" A. _9 l P! y& s7 C9 a! ^% L9 [! r
p.join()5 g- u; n; t o' E6 a4 c, S9 ]/ A
* H7 Y8 z( E$ ?. d3 J) l7 g: H& m5 Z+ C( c0 w
0 G6 L* P/ S; I* ~* D请继续关注我
9 ]8 Z0 D* ]9 C9 }- L: ?' }" i. s
. v) B1 p4 T {2 R+ y; P
| 欢迎光临 数学建模社区-数学中国 (http://www.madio.net/) |
Powered by Discuz! X2.5 |