% o) T& e& y5 N$ [
爬虫(七十)多进程multiprocess(六十一)
' x8 K- P' |* K0 GPython中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。 另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。 下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。
% z8 R# {/ N: X" B* r/ t* W5 E4 P( K0 T( \
import os
& T2 v' s6 ], P( d2 J, F2 S- U5 T3 q" y3 j7 y% b3 k
- " N7 V. K8 d$ A7 w
0 Y1 d- B( n/ J& d. O7 s
import multiprocessing( T% L# c7 h2 d' n6 K" o
( H5 {$ O$ i$ ]3 M
- 2 y- k+ J/ t% Z, s- {0 t- ^6 O ]" A
4 b7 u! O+ T% L
& c. n; p5 H0 B4 p. {! I, m. L6 k1 o s* g+ i
- 6 J# ?8 i1 H! B& b- G/ [& T
6 Y, T; O" S8 L
def foo(i):) A. X6 C% l' H j( k) G: L/ f
8 c/ i- R* r, [% \ W' \7 b ~$ l& p
- 8 e4 }5 Z4 q# v0 @" U7 O/ |& ^) }% U
/ `3 d7 z" O0 K
# 同样的参数传递方法9 ^9 w: M1 V. r* h
& i& } ~! I9 N2 q0 N
4 X8 l$ N9 @- f {1 R3 T2 b, Y/ a- o9 R H. A
print("这里是 ", multiprocessing.current_process().name)' i+ P P$ |! R2 @1 M0 q
, L" H- V w; |9 N; c- i9 I: d S; o
- ; k1 F% J/ L/ u6 L1 z5 m7 X( I
8 k5 I; p' D: M; \& ^ ?0 X
print('模块名称:', __name__)
5 O$ E4 S4 E! U; `! R
e5 s+ Q0 b7 Q% N/ O: P
: w& G* D2 _+ T; ~5 _/ c% H* f' ^$ J* }- p# }. a: p9 b
print('父进程 id:', os.getppid()) # 获取父进程id: W/ F* g: N' M% N6 [
4 L3 h1 \! w% I- ; U! H4 v V" m, G- ~
9 V, Z! a# S) r: Y
print('当前子进程 id:', os.getpid()) # 获取自己的进程id
1 d6 l6 e- A0 U% R
* t9 g$ {5 D$ y3 \" {, L1 ?
' o6 [! x' ]/ P' O; r+ X# k$ w5 f% i3 V+ D( I. g) h
print('------------------------')
( D7 x# |2 n, P/ o' R3 I0 f0 X
- u5 ]0 N$ q d6 a6 R- 2 T% f q5 t4 y z
8 g: m0 Z3 ?, \$ r) R# E5 T% U e/ M. u9 ~5 i7 Q4 b3 N9 u
/ g6 M- |: t6 a$ N) G - - h# g7 q( B0 z: o$ Y
( c6 }+ T1 i6 J7 f8 K
if __name__ == '__main__':# q/ w$ H0 C7 ]/ Q) f3 P
! H3 U- N( I" O! d - + A8 P7 i0 E* R- G- F
; J# J ~' S5 G( }1 I
e; p" v4 q& y$ b/ Q- J) d/ X: O2 h6 \$ U" H1 r4 c
4 \) a2 ?! u2 P5 j+ m0 P; ]! w
% @* J; I4 D- h, x& ?; w7 o& j for i in range(5):
6 a# k1 }$ A0 ]- |0 x" ^0 F
, n7 Z1 d- {$ M$ i S
4 J, U2 E2 H: L2 m! {
7 I- d% U+ I6 X p = multiprocessing.Process(target=foo, args=(i,))
+ L& K6 X! v5 {! n% f7 m& F% n
* ]3 j" ~; `, { m
- B! W+ j& @4 i( x% k r' H7 u- V. k
p.start()1 Q+ E6 M' q5 J0 j/ z: c
2 w0 ~! W& h. t3 w9 B& ] x. g: P1 m7 ]3 K3 a- W2 S
运行结果: - % m; E8 q2 s: b/ k1 [) p1 [" d
' _, f1 G; Y/ l4 f这里是 Process-26 J3 E0 c' r6 ?
" G _! s, A) A( y3 `1 ]( g5 D
# j" K1 I6 r" ~+ t) b, Q1 [: D4 W! n# j/ e
模块名称: __mp_main__
9 @" ^' [- i9 z1 n! `6 t, n" a6 D% P$ g: T3 N: |# f- _
" Z M* c0 T) a0 p4 ]- O1 A+ h- W3 B4 @( l2 i& y4 Q
父进程 id: 880* J* \1 N' P+ Q0 s8 O
3 k$ C( i: M! w. j" g O7 l; y
; J* s' m4 ^4 I5 B5 N1 I- f
2 H: U$ A" _% V& s" W当前子进程 id: 5260: f+ B5 Z9 ]+ E9 B! G$ J* n. V- F
I4 F7 d I: ]
- ( \' v5 O* i8 i. t$ r4 k
- r+ [" k2 }% A8 p; _: z: k. r
--------------
" w) U/ I' D) S
( S" K4 Y; `9 x( p& @ - : G) K# k9 Y6 q. y( M6 S1 x
; K" r0 F3 Z) ^9 y
这里是 Process-3
' d. w6 i/ c, N, i; u
5 L) X# A/ }1 C( G0 H
* H; M2 @8 t$ y: _9 k5 X6 i9 ~' R6 Z& g* d8 B1 `
模块名称: __mp_main__' i& F( c/ ]: J2 S. x
$ t2 v, @' K" p: d
; ^ c* g6 [. Z5 Z3 N3 N! M: \ k/ P% _& g: d. I2 \6 {/ Q
父进程 id: 880
& d2 H8 ^% [) a( N3 V5 k/ e5 U+ q6 |. h s) t
9 K1 d2 M; [ C, W9 y$ o# s0 C9 k: t
当前子进程 id: 49122 }; l9 b) A; p8 h. C
3 \! E6 m7 U; M, f6 a3 S& A) _
% ^& z: ?% C2 ]" w1 c: o
S( z1 U0 N2 n1 ]8 i. w% t--------------5 k, p& r: j& v( U7 O- R0 a/ e
5 d- b$ O) q! t6 j
- . d5 T2 ~" N& l/ O
) w) Q) P6 s% q& J6 h: i9 Z
这里是 Process-4
5 S" r5 ]! y2 @) I( g6 f4 M
. L+ i+ \ ?3 x7 P8 G9 F
) ^$ _/ k% y" T$ y4 F5 D5 `& `8 h: H6 }3 B' F, j
模块名称: __mp_main__
$ n+ P- W9 y' c" R/ P1 S. I
) ]0 J! O0 V" W" ~* C4 y( \4 O, p- ; x; _% m& x) A
1 ^* A d |' w# m- ^ y3 Z父进程 id: 880
3 _& N0 o( w5 C/ m1 V2 W+ w' [8 U: ?5 u! G | Q
% C2 {0 o6 ^/ s7 ]/ @7 `) E& C: z8 a8 L6 M
当前子进程 id: 51768 {6 x3 ^/ E6 J# D. m- U7 {0 y
, ^! z+ R# r0 G
/ l' a+ z* l) i, Q* L9 f5 y4 w
1 R* h1 M% y8 }--------------' b8 A0 ~$ o8 `2 `4 L
% X- {' c4 Y, s, k" ?' k7 ]) o$ ]; b
- + y' ]& B" m5 M* r7 k2 u1 _
8 J: G* F) F. V! [' n这里是 Process-1
! _7 R# ?9 u0 |7 I6 a2 t, O# g3 E6 `- B1 l% E3 ]$ J! R# r
- ( }6 m/ h# E- X: w1 x7 W
3 |- t- M: f- `
模块名称: __mp_main__
7 F. I+ k! t7 c& T/ t: v$ s- I. i" X& E1 Z$ c: [: l
- % K" i9 F8 b( \0 F! q5 U
4 S/ Z. Z2 B6 E. h* N
父进程 id: 880
1 f; P: w* K- P6 E* Y9 G2 l6 v' |' K. c+ B
6 {" ~: O- o/ s$ o5 z0 a& ~
+ }, a, `, V% M( L0 s( l" k当前子进程 id: 5380. g/ @1 r+ J6 b! q
1 ]9 [1 O8 N/ ]" E' {- 8 R" P9 X9 Z5 C( N( i1 I& ]: i
: ~/ X u; D+ Z% G--------------( y/ v! @- \4 ] T0 ?
# \! m( s5 I' H2 E0 m a6 y
2 e3 z- F+ h% J" ]- O
, U9 F W; q9 {% E2 f这里是 Process-5
9 x* z' e' i6 {) \* F" A K, n3 j2 M1 a0 j* {# F4 T E
1 l: e" L7 z9 e% i& n3 {! \2 \* v. \, o8 w2 }" A
模块名称: __mp_main__
$ Z; w. c% w3 ]0 x+ `1 J
& I7 t' S. @; g2 q- j E9 O1 F0 {. b% Q- , `: Y8 |& @! A8 n0 Q
# Z) p$ E. C1 ^6 D
父进程 id: 880
! c6 C4 R& C0 a) {2 d& r( N
* P+ o4 N1 |5 `' N7 ] @
/ W! m9 f! g2 p8 z: K( O3 Y! v- R( I
当前子进程 id: 3520
: F# @, r& p+ ~- U, _0 ?
' M0 e: h2 p7 o! u
; [- H( }: h/ f% p* V# W/ w% d& {3 d9 t
--------------; Y( v: ?& j4 n
W: z9 o, k% e q- n' d- q! D: X! S* b K
1. 进程间的数据共享在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。 创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。 下面我们尝试用一个全局列表来实现进程间的数据共享: - " U: F/ J" t/ U
" S0 T8 Z" z% S6 C, G" f
from multiprocessing import Process
P6 N' {) `3 j1 [- u- z% g+ b. K: v4 |& k9 O
- / t% R' b, Q& e3 _( U2 z
2 @# x' T( g: V9 |
9 r3 ~0 b7 J3 y8 q
$ A& N/ a" G/ M8 ?' ~
* g0 h2 N0 l2 ~: A t7 I6 H
H& [/ Y0 m8 R& B7 |lis = []
w) _; K% _) O2 v) o7 C, Y. e3 J1 v
" p" |; _* i( x# Z6 U
( ?1 d$ K) l6 N0 z7 r$ U7 s" H& L! D2 s+ f
' O; T6 K7 `& v
- , m' u! ]" f [1 _5 H
8 R' ~, v: A9 d. u' r
def foo(i):
. } E D6 P7 Q7 K, J7 K
2 [4 Z" U) `5 G& J1 ^6 Q6 w/ x& H - J! j m# F) O f$ ^6 ]
+ X$ U) T+ X1 w* d# B4 x% ` lis.append(i)
W2 j9 W1 e) E% q* K) E/ A& o% L4 X+ }4 V
2 \) ~& Y' d L, N8 E# @3 n# O6 U8 Y2 L" ? F! Z% J
print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis)): U$ T; F. I" O# Z$ d3 M; b
% a) I) I3 A8 x+ {* k- A6 b2 m
' h% p# \ ~# ]+ ?" B, h! F, X& a4 a4 D$ j/ l3 B8 n6 J
6 X9 u; } [% |( y( m* a( \
2 X% y1 c E0 q7 `
$ c! G9 R/ h- ^7 V) [1 c3 g9 I' N; q$ x, F/ I! T* e2 ^
if __name__ == '__main__':* D" L6 y; m( f( l$ q! u) I
8 Q0 p' O- P$ H
- `9 ]+ J4 h" L, _) R' X& D% Z8 j* u( r- T+ R5 D- Z( h5 k
for i in range(5):, G9 e0 X8 {) @# p- v4 ?
5 C3 c+ w7 c, Q2 n0 _
; [. m W q# Y8 T1 D9 P/ ~
; E& o" O& L, K/ L8 A p = Process(target=foo, args=(i,)): q. O, _% @1 k9 E4 B- w( @
9 l1 v% C( |& d8 I/ g) I
- # L# O" }/ l9 B# ^( j
5 O. @0 Z. i# _: Z2 M& z, d p.start()3 _1 o5 p5 M; D( ?" `
& Z: b& L8 E( T# F4 L# v5 E - / q) g6 Q. o2 z0 e% ]# G
: z: c4 ^# v( K0 f$ A5 x( X5 J2 z
print("The end of list_1:", lis)0 h/ e. H: d0 A w
! H/ w- G# `7 X) b* ?- l: T# W& M" `0 n9 `
运行结果: - : o6 {, c4 d1 ~: l3 V! t z
, _' T. _ @9 f# {: I
The end of list_1: []. l0 u5 ] t1 K& C+ M9 |4 |6 J
/ h. C9 H" V2 b3 O
* u# @% G% W2 o
2 y3 f2 b) G0 I% lThis is Process 2 and lis is [2] and lis.address is 40356744# ]9 A# m6 w% L8 R% D5 _& E+ v( {+ c
6 w& z3 i- e- O1 L1 q
- 4 z) N* b/ t: N" B/ g0 Y, e
) H7 _* V5 U7 p
This is Process 1 and lis is [1] and lis.address is 40291208
& \+ G+ q" S" g7 _- |8 [' P# ^& X# @$ i( `, p0 B
' x6 f2 z" M" }6 F, V
6 Z3 f3 g" N& J: xThis is Process 0 and lis is [0] and lis.address is 402912085 S+ K2 S6 |7 Y+ Z
8 S/ L0 j2 ~% ]- _; H- 7 \# T6 z; t; E& U
% Z% p- G6 W1 Z$ Z* U* @5 SThis is Process 3 and lis is [3] and lis.address is 40225672# ]* w' z, v& u4 S
5 k, _7 n0 x0 G ?3 N
0 T( P. F1 h3 d/ @1 w6 V
3 ^) T8 m7 ?5 Y( L$ LThis is Process 4 and lis is [4] and lis.address is 40291208+ Y8 o, I+ u9 a! k' q
# M. g- v( {9 n+ M
# @0 h' w# R* a; U; H
可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。 想要在进程之间进行数据共享可以使用Queues、Array和Manager这三个multiprocess模块提供的类。 1.1 使用Array共享数据对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系: - 5 |# b3 T# h) z: V' z
6 O* t/ l k2 m2 n. G'c': ctypes.c_char, 'u': ctypes.c_wchar,# Z" P, t# p0 q( J
) e: v. H8 j, N: [+ p
- ! [, f [* [7 h
, C" b# i6 q( \# ]- S5 i'b': ctypes.c_byte, 'B': ctypes.c_ubyte,9 L3 a3 i9 R" J# s" z
+ h9 H3 P# D; ]) E; D0 l
4 o0 ]3 e0 i7 v$ r5 `+ L
- Y0 r" Z/ B p1 Y4 ?'h': ctypes.c_short, 'H': ctypes.c_ushort,5 Q% y7 o8 {5 w! n" y+ @. |. G
. ^1 |1 _% {3 N) {* i" R# x: b
+ j+ P5 A9 P# d- Y4 u: O5 _# s& B5 y' t- C, j
'i': ctypes.c_int, 'I': ctypes.c_uint,& Y5 P. r' `# m, Y% S. `
^: c1 ]& R* c a9 X- - w" Y# `: d/ ^4 C) X/ o
. w/ w' k3 ~: t' G, o2 D
'l': ctypes.c_long, 'L': ctypes.c_ulong,7 z# s; n" {% d( D: v6 q' ^3 E; G
3 N2 J/ ^, P$ N8 a' R6 T/ h! Y
# X ^% X& y( y$ T" F& l6 u
; z, b \1 ~& Y'f': ctypes.c_float, 'd': ctypes.c_double- O7 {& e. I7 W
' b8 m! u$ g: g% L/ f0 ?2 ]( e/ a" r3 Q1 Z" f% V& y6 [+ Z
看下面的例子:
3 R$ Y% W+ Z/ P% V
* @3 ` E" j$ R/ }* ~# gfrom multiprocessing import Process, f3 ]- r: p# H7 P' Y: s: k0 o
% J' i7 n7 g6 b8 M& J P' Z9 g2 d
# E7 ?3 ?9 {9 l- v1 u4 @; o
) C2 ]; e( k* C i7 M. s% |from multiprocessing import Array+ u0 g. ^- t) F
6 E" a8 ? F1 |. [( R$ s1 O
: F3 E) [4 P; g( T3 Z$ Z$ n, Z; ^/ K
9 j: R5 l" ^% C, q! U0 X
+ w3 O! E9 k8 @+ C* Q- K
- $ p6 e# i6 X' q( K+ H
& d' [7 b' x" E: p n% w! \& _def func(i,temp): s0 y" @+ s7 Y) O! E. F" `+ R1 F$ d
7 e" N q5 {5 g P/ d! p
( y, ` h0 Q7 s! j5 p; _) N- L2 N. }* E: c* x+ n
temp[0] += 100
/ H4 C/ {* M. e N5 s- }& t# q
; G7 A3 ]: j6 p# j6 |4 {' \2 h. B
1 `7 F# o1 E$ q! @5 p" v) l
, p$ {" ] E/ G& V* \ print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0])
% ]! W+ p0 W0 l5 s" y* U1 B3 m0 w5 u+ {6 u' v2 f, p: @
- 2 g( g9 r( t, ]2 o% N
; Z4 n1 m: P' d6 L( C! M% @
7 O. f" ^, O2 n, s- N( O. x7 r6 C: {' l; ?' z, A
/ U( A+ \; }- l" v, x T
: [1 N, O0 T1 Q7 Gif __name__ == '__main__':
! |2 ^' M' \' i# W/ a
7 h3 B" j/ G; V$ E8 N
6 T+ K! E- l, b4 x) J8 Q3 I+ s3 c/ k1 G0 \, O
temp = Array('i', [1, 2, 3, 4])
* ` m9 G# h$ X9 i X+ R5 X% n' M1 w; B+ h0 D
) Q) r' r' G4 C& o! Z* i2 R# ^& I; y* p& A; Q
for i in range(10):
& s, E5 E8 n! [
( q3 ?! H) D% \8 D( t4 O- , i7 w, V) e9 h: k
7 M: E6 m1 p3 n4 B* X, q/ p p = Process(target=func, args=(i, temp))
$ E8 d. r! c5 w5 ]- m" h; ~! L) r! f, V; p2 N0 j
- 4 j" n% Z' s5 o! U" p# Q
( V! I8 W/ u- d' G: t' h# T p.start()/ n0 @& C/ Y( B5 o
- d7 v1 {0 q q/ N4 A% V+ e$ v5 b
运行结果: - 1 ] \+ u* Z; s: L Y3 V( Z' R
- ^% B1 @/ x! G1 N) m" N7 ?8 J& N
进程2 修改数组第一个元素后-----> 101
& A6 q- u" t$ c- v& o
" ]9 [* p1 j0 ]/ M5 J, C - ; V! r: K3 e8 g: x0 E
7 K7 \* m) V' h* j+ w$ Z进程4 修改数组第一个元素后-----> 201
. }# k) V6 \# T% W8 _
, q8 Z4 W* B! w( X# o
& |) l% Q* [# s' L8 R5 H6 t7 e
7 g- r! m4 ~- F5 T) v6 t进程5 修改数组第一个元素后-----> 3010 g u/ F- C5 c" v0 \4 X4 ^ R
$ D& k$ c& P6 y3 O; V6 z
3 V* {5 Y! z a+ \: @3 u, w& f% m u: Q9 f+ N
进程3 修改数组第一个元素后-----> 401
* }' u5 f( O4 o% i1 e0 R; u+ r/ D+ z* R
7 z5 Q4 C P. c0 T7 E# H3 E! N: ^$ m$ v# W- J# ]2 Y$ s- X
进程1 修改数组第一个元素后-----> 5014 P1 w* x% Z) e8 u8 ]! X4 s
" o7 v2 v3 P$ ~# O- G& x5 v* W" W; s% E- ; ^ M7 Z+ O7 y3 ?
% s& _! g# C8 o# `
进程6 修改数组第一个元素后-----> 601
+ T8 h% R0 C9 H" y5 d% D* H. h4 _+ | [! q1 V) `1 `
- & U9 l$ [' i7 L' L* E; g
& z; p0 i8 f: y4 D, d: [ P进程9 修改数组第一个元素后-----> 701
# N' O& k, [5 P1 [! l" H! z4 s% g0 P& E- e' s6 n6 {2 j0 }
- O+ a+ M3 S7 x2 J" j+ G# I
3 Q' u! ?4 U0 @' w进程8 修改数组第一个元素后-----> 801 R$ E- K8 f' ^
0 o. _. j- d3 m# D$ `' `( i
- , ^! o; N# A" s$ t3 T# R f; F
* c! [8 P3 ]% w" Y4 q9 Y
进程0 修改数组第一个元素后-----> 901
* l, y$ Z! x9 y& K4 T; F$ P) o3 k& C, U
- % v3 a. u( i- E, m( i n
4 |1 n, C$ s( |1 [$ \& V
进程7 修改数组第一个元素后-----> 1001
3 k. b. b/ ?% ~! Z+ _3 Q7 A
' h/ d, j3 V+ t% R6 P) N5 X" k0 s3 |' Q; X5 D: P9 N
1.2 使用Manager共享数据通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。 - - O5 T) p8 ^% i( d( K/ ^; D
. O @' Y. `6 f% Lfrom multiprocessing import Process6 h8 E' @0 `% p" L0 [% W) ]7 H* q
2 a2 b! t; U' T t
- W1 O4 ^$ H: L q! H) |, l$ R; E: ? @
from multiprocessing import Manager0 X4 s" O- {9 _: v( M
8 r! v3 ~2 g* w# q4 J
! f- W" [) p" D1 v: i
: O7 N" S7 I) I v) w% G/ j& M1 H3 {; }" {. L
7 t/ P; _# r" e, n# f6 W
% `& `" o0 O/ m( R: d( D
* v3 |6 l8 U6 l# N# m5 x' xdef func(i, dic):! v% h! ^ M& d8 g6 C+ j4 z
/ E9 H* K4 b" K) S5 S) [) B
- : d/ n$ {. C; q
. N9 i6 \% S. E9 z K dic["num"] = 100+i6 e; e8 ?1 A) ~9 Y
, f0 |# B7 ~9 r/ j" ]$ {
- 8 v6 O( t% F" J
5 R/ ^1 V* y2 J% n1 T, h# l4 b3 e; Y Z6 C1 i
print(dic.items())5 \7 {1 G) I( e( C
) g5 A) Q! l Q7 B% B% R - ) J2 ^( C5 g4 t' Q: O; A
( e6 m5 b9 q1 n: M' r
$ I8 b! x$ t5 d+ A6 e: k/ t1 @4 v, r- }/ E7 L8 C
4 U/ `: X5 W1 C+ B8 N" D* b: t$ B# d0 U% J
if __name__ == '__main__':
5 J/ r$ C6 l7 d. V6 q9 t$ d# x+ b+ T! E! c2 B. V* t) v' @; J
- l' F8 Q! b) Q/ U0 W* |* H: L; b, {/ X7 A
dic = Manager().dict(). U* l* @! t% [# p
8 x# ]# F# t% x& r
- + M$ `- Y/ _& ^2 b7 x1 l* K
; C' _* |: \% n
for i in range(10):
- d) L! G' d3 N" F e7 y K6 s$ S
7 ?, V) |8 ^/ b) c1 |' A) l' O) r% h, m7 o4 a- D
p = Process(target=func, args=(i, dic))
1 n( j; f4 P3 s2 _% L& [" E7 n( G: ?' i k& q
1 K( \& U8 Y; F( R7 Z7 C8 P8 C8 B6 Q$ u. ?; u
p.start()2 h. }: O( r; Y* k. Y. J
' D) V0 W8 e$ e O- 2 `1 ]& ~& p5 c2 t
7 q& U3 ?1 |/ C& k0 o
p.join()
' X+ f7 @9 E/ M/ }0 A" ~4 I9 S+ x
3 {/ J) E S0 t2 }& @
& o2 y8 X: ~3 ~0 o8 ?
运行结果:
% |, E( `9 ~1 {4 c' }
4 S+ t. e' f E. t2 U6 z[('num', 100)]. m/ E# ^. m: M8 G, D
6 b; u0 p5 o- X! m6 n
3 c2 j" u, I5 `. V' n& C
7 i2 O( o# \1 o ]- b+ ?[('num', 101)]" m+ m$ j/ V' r) d/ T( }' m" W
. a" V# W. ^# M3 T1 Y3 o
. r" x' P2 I7 x2 l) a2 d( M6 v/ _1 B/ {( _5 c8 V
[('num', 102)]
Z2 T- {2 U+ T3 n- h
- h% {# P7 X: D4 X- + g4 V/ K) ^+ l% q
& f ^7 `( \9 {/ c7 G6 @% r[('num', 103)]
. X9 s" ]6 x; d) P: B) O7 t+ h* s! ` o" x6 ~; O
$ K: Y4 `; L A7 K4 L( t; {- T. V: p3 _) j' g
[('num', 104)]9 t3 X/ Q9 w) Z
$ w- w+ F4 O8 a1 T/ A8 g) M+ z- " q `! ]3 M8 q; j) o
" ~ T7 S1 ?7 b: |' ?
[('num', 105)]
/ N# g: ]. W) c M5 @. o" ^
% p0 h V# [* ?& q! M - ) ]: `, x5 U- ^2 Y
7 o% @/ F2 m J# d4 Y$ z0 J3 T[('num', 106)]6 E0 S2 g& t: m4 W- x
2 m x& u0 u$ e9 K3 M - - C( g; U0 M3 B* s' Q
: S' @6 U6 K0 l" g! z8 ^' l3 j7 }[('num', 107)]+ |/ U' a0 Y1 s/ B* @6 C
1 S6 c6 S: v' G% o+ [% }
- ! W% o9 j5 e8 T/ o' g+ U3 H' F" D+ h
8 N( L! a1 ]& q7 ^& o: l[('num', 108)]
. g8 o) h9 \- i% Y* B/ k
2 V, q* j$ n( q7 f
5 q% K3 F3 Y6 d! }: ^& @
+ l; Z9 T; r! N3 x5 l& m; ~[('num', 109)]8 z* Y& b! }% p- D0 t; E7 n/ k
$ i5 `4 N3 q# e! k5 u
# a+ T" k# J- f- r" N6 P 1.3 使用queues的Queue类共享数据multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示: - ' s/ o% T6 W5 K
3 ?: Q0 }3 }- s# i$ T" Zimport multiprocessing
, B6 a5 i. i. f: G6 g3 g6 f x( s$ |% `1 b6 e7 V
; v& h4 q+ E7 u; Y; q/ B
# O0 E' L' G+ H) a! }from multiprocessing import Process% x6 D$ a" r7 [) s J
7 k3 Q( a; U5 ^: m5 q' _- * V: `. ^+ R! ^5 f9 l: n# |# }
" q. {+ u8 h* m2 z
from multiprocessing import queues
F7 ?; g- y1 U; D4 t9 b+ ?
* ^! _; V; }$ \9 F, ]) M' b5 v
: Z; b& O. Y# h/ x8 Y' ?2 e
6 B6 t4 K& b* {5 S$ o' X
4 @; u4 }5 F; m$ M1 D
2 M) V* L: Y2 t5 F+ M4 z- / f3 [# X H% z3 L; |) M, C
) T M+ J- `% P& ydef func(i, q):
: r9 {6 E/ _! g2 n) \! ?7 Z2 U: G5 k9 T2 F! w+ T- |
0 s; j" Q( W0 K2 t6 ~ e1 `- l
4 a8 e7 O0 ], S- a, L! o2 g3 W ret = q.get()% z+ T! s* t' p! l& ^$ J$ E2 h
1 R$ H8 D8 p' W+ {, G t2 N$ w- 3 j5 G$ V0 m! ~. d# c$ x0 S. J0 a7 g/ B
2 Q3 H3 N6 C, Z s% Z; ^! f, l
print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i))+ s& }" E* F2 ^: g+ ?
0 k: ?# x7 b; p {# p - 5 E6 V) x$ y% V$ w( S
, P4 D% X- P; a9 ~1 k$ | q.put(i)
7 ?! z/ `. _4 m/ G! a0 O1 N
4 L* W4 ~( \# S* m& l' ]& H - % w# k# N3 Y( x4 K0 r# y: H
3 }7 o) q; v, _; |/ V+ `( i) n% {7 O& v( V! h# F
3 f l3 o- @' H2 O/ }
9 Q! v8 o0 `0 e0 ^7 k" d( ~' F6 J, \; D8 c; i: d
if __name__ == "__main__":
2 f+ {, s5 K3 L* c( \4 l( R" \$ X- g/ o: L# V# u$ Z R- s7 N
) D" _9 O, z6 n% c+ s3 e4 D8 W: T4 |. O3 K6 |' V* x
lis = queues.Queue(20, ctx=multiprocessing)! B& p2 L) C, ?, v% k% F" m; C( J
$ @0 {5 L3 K& Z( p* Q5 M
* s+ @0 m/ _( a- a: n- O ]; _! C' \
" S6 U" A( h5 D! x8 G3 a+ U# [ lis.put(0). C6 b) w4 w$ N7 P! Q6 d
2 @' D0 a2 V n+ X3 d( U
- , P# n2 ~" |- O# | r
0 ^; j a! H; r; G" B i. B for i in range(10):( X5 m; G1 y: [( I9 p
- K$ n8 w5 @& C6 M: A
% A6 ]6 D- |: ^0 V0 j7 V
1 }! n: G# V0 L1 h6 e, c7 `9 t; t p = Process(target=func, args=(i, lis,))
& @) r. [* M2 L# }! j W
6 \; w( [8 e+ G! k- Q- 0 m5 Y/ y% R! u
- W' Z& @1 s! W: z
p.start()
2 b; [5 y# G$ y$ n) A8 I% ^0 I
& |* `; E7 t" k) S, h' G
. N( d8 N6 \' S% Z" P5 |) b$ a
运行结果: - # G1 D% r+ }" g, C$ d3 d
8 t' a3 O6 V7 C$ f6 D# C" M, f进程1从队列里获取了一个0,然后又向队列里放入了一个1 s! `8 s5 s" y5 O3 t7 W4 i
5 F7 r, o& _; T1 i+ r% P, Q - ' } P e8 Z% h [( U+ `. }( m
( `/ f! H$ Z5 c7 w& u- Y, a- [4 S进程4从队列里获取了一个1,然后又向队列里放入了一个4
z9 t$ L* X, b2 K
3 K1 n ]1 _& H+ x, G" \- U5 m" O& G
! l2 b, i, v! w9 w
" \/ N# U+ }2 ?3 f7 J进程2从队列里获取了一个4,然后又向队列里放入了一个27 Z% r% D/ [' n( M
# X- J, v& B$ o4 K/ \5 W1 Z; [- 2 i. [8 N' X* B( w; f. |* Q: V2 I/ U
/ O, w* b1 G& Q7 U7 u
进程6从队列里获取了一个2,然后又向队列里放入了一个6
1 w2 V/ F$ [/ g- b* _6 O
( e& W- F7 N' Q - ) c A1 o S6 I
! q4 x. L& V. H u! l) j% i
进程0从队列里获取了一个6,然后又向队列里放入了一个01 Y- |1 w( [: _& @) e% G
- K4 l% c$ }5 l7 Q5 P; J
4 C ^0 [7 J+ `9 N2 K y7 R
2 E' P' A# h- W+ \; e0 U% Y进程5从队列里获取了一个0,然后又向队列里放入了一个5
I" B( \5 t9 Z+ w& Y1 h3 C: W# z
+ E# M$ p' s8 P$ C5 Q3 v8 v
1 o: `7 G. o5 P3 Z" z/ m
; q) b8 d" `* x; \进程9从队列里获取了一个5,然后又向队列里放入了一个9& m9 u: l; E0 t" I& @
5 f3 b+ t# M* s
" L" y3 M# g6 P; M# r3 T* n- @; m- l! C, n. B+ w: f8 H
进程7从队列里获取了一个9,然后又向队列里放入了一个7
: s* C, r5 B- o8 [1 Q! B: D' p8 [+ K* k2 [
- 8 A0 u& R E- W- j0 B7 p& \1 v
' u. j, |/ m9 G/ j8 M* U进程3从队列里获取了一个7,然后又向队列里放入了一个3
8 n$ i' s' K4 Z% g- Z' ?: d
* h3 v4 A9 a: K( Y8 |
, o+ j R2 W$ w9 m
5 k* w: z0 ^7 |) L0 k! s进程8从队列里获取了一个3,然后又向队列里放入了一个85 Z7 @2 \9 B- c' d0 f6 M
- x% u( n& j* u5 t
; o/ Y! C! w% E3 @
关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。 2. 进程锁为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的,这一点非常友好!
' k3 s! {: @! q0 M% \ z$ T A
- p! P, I: b1 P, g, n! X, Jfrom multiprocessing import Process) J1 v4 |- w/ @& ?( ?' [9 x
( _0 |( Q& ]/ l' m) ?2 ~
3 a7 T; \! U6 n d" k! T6 h. k3 D) |7 a! `" ?/ J7 ^
from multiprocessing import Array5 w2 V/ z' b( _% j) L( k
% i7 {" L8 b7 r& {$ k
' K* M& |- f$ h: H7 T% o$ D
5 _3 u3 R( i; l4 A' V8 ] afrom multiprocessing import RLock, Lock, Event, Condition, Semaphore: G0 [1 o/ z% H
2 J4 W( }3 f( c M8 b2 m* s
- + A+ ^9 D- S2 ~* A" p
' q G- U: y$ C0 r& A( {6 Ximport time5 S1 F( ^) N8 y2 C
* _4 ^# l2 e# Z6 d3 y - . j. {: b! f9 p& ~
; T$ b; M5 e9 x/ q2 s( x2 W6 q, C4 Q+ b. Z' h
: `( C5 b8 ]( ? t - 5 f# B3 J) j) a' y# }" ?" U
" s8 ~6 w z! L; _9 }, r1 f
def func(i,lis,lc):& K# |8 q5 l# h4 Y
7 s0 e: O9 B" {# h+ V* D - 3 L1 Z5 O6 {: } u/ e8 p& N
2 G1 I. B ?: m" \
lc.acquire()
- O4 {; i* x3 N3 e; j4 Z8 D, f5 E: Y2 x3 Y9 u O- ^2 I. j
- # N& ]% H- D0 p, f0 r( ]
" k( H# x: b5 ~- v: T
lis[0] = lis[0] - 14 V& U1 ^1 l6 e# e6 e/ Q$ k# n
( K4 z2 b' O# m' O! ]
4 z0 r4 P2 ]* V* V5 ~ o# Q0 B; }9 \9 m
time.sleep(1)* Z9 e+ Q9 D% Y$ I; r" C4 m
) p/ O4 M+ l; n4 C
, L' `0 p/ Q3 e% q: D4 N# `5 T1 P* ]: u4 h1 } s( p C
print('say hi', lis[0])& @# I9 K% } d" `
' E0 k2 w* ?/ {. Q. T
6 ]% u! s' G Y
1 W0 |3 C% N$ z lc.release() \2 e8 u1 V$ K
0 `9 c& S2 o$ G v4 |' C
- % ~2 q( p4 I9 F: x
( m3 P. y* D; K: W
8 @5 f& M- n3 V. V
0 I' o6 r8 T/ b: K2 b* ^9 R
7 `/ E& E5 j2 @' O4 { d- a& [4 j! k5 c
if __name__ == "__main__":
& x7 Y7 w# u* x6 z5 }+ R/ ?" `* W8 U- x, J4 r. T6 z. O
- ^( y' ^6 O- x' Y" H: U
) e9 e9 N, `. E
array = Array('i', 1)0 a: r0 ]# \ j# {+ f
0 D5 O4 |+ e! b% ?8 P
/ c7 F" p: y3 g/ K) q
4 N' w8 ^0 b7 O array[0] = 10
9 A: Z5 s8 d+ c9 |& \$ @ u" c* X o* x$ j) q6 N9 V( b2 w
- ! |8 N( T0 k' c, X2 [9 g( G. U7 C2 O
4 N4 t7 ?2 t. N& u9 F
lock = RLock()" { e) d0 ]2 v
5 x" u L6 D$ j* y I' Q8 K' T
; H0 w! r) E) ]
' Z& [5 N5 U0 Z7 T% d for i in range(10):
* ^& |6 C8 c" c6 S% G
8 C3 S% l- F' Q/ Q$ h7 O9 i D
* q( Z1 [) @, Z
& \9 j, X- O$ X7 |# n! h$ X# q, x: r! R4 P p = Process(target=func, args=(i, array, lock))
# ~( Y3 a2 Q6 K* ~4 d
6 C! n$ `- H2 J- 1 W1 I, x4 R# P: `- P* }% n/ U6 Z3 k
) E/ r0 v; H; Q
p.start()
. _) @6 A4 k) f& a: ~. F0 W! [5 h- P4 g* C
6 I+ f* v0 V% F" b: Y
运行结果: - 4 w) y) X; Q% R* R* t
5 n9 P: `7 n ?2 \ y8 p
say hi 9
# ?0 Z8 B; q$ p
, D' u5 |" \! o7 K6 a
, u- z* p0 [: P7 b1 |3 {) {2 x+ N' t# p+ n% j5 Z& E) c
say hi 84 m R8 o% o! {7 y$ F
+ _: ~" C9 V# A
( L6 L( P0 m; j# Y1 h m; b# k( ^& }& i
say hi 78 {( s' V% K9 i
1 F& j% R4 q# T: n' D! Q- ]- # |# {' S9 q: y0 I* ?% R
$ M0 l4 n; Y9 Csay hi 6
9 u1 c4 s4 n. @$ [; |9 j8 \% [0 ` g4 A* \" v
- + e9 M2 V1 A# e6 x! z8 Q( V, i$ ]
* U& F* b( ?: p# q, D1 V5 C8 Q- Jsay hi 5
$ x* Y0 [* W4 w7 z- J" {4 W* L4 [/ E+ J% w, u3 S4 f7 J2 U
$ G+ i4 T; z! y) }7 G' m1 n+ i0 Y- a, C A& @0 g& E+ i' ~
say hi 49 M2 r, c1 X; l7 n8 o8 K9 C
. E( f. i- m8 o( Y4 m E- ) O% K9 V4 E+ h6 q9 W
8 Z; |9 v1 T- y3 ]7 W) A. N0 Hsay hi 3
0 T9 G0 t1 `8 N# l5 m. d9 S% E3 m$ C' h. k2 V: |. ~+ b* @
& N$ q4 v' x5 l1 l) S t- u( }4 T
7 S8 N m# C# B1 X; |' [5 i1 p) }say hi 2
, W# a# f% e" Z# M6 U
, E# e7 r7 _" p& n
9 `" P# j6 I* `# p/ H( @
) T) ~5 ~; ^0 e. o" }, Usay hi 1( `2 z% {2 Y$ ^$ N2 O. r6 F
: t, q9 r% n6 }6 |, a) r/ U' D, f
' J3 O! B7 v' B3 I% B- V
6 r9 |# C3 D& x; dsay hi 08 t+ ^) J5 d/ i% A# k+ |
4 G9 c; e1 p" a' D; ` T3 H! n: q+ o# m. ?6 S' b, H: X3 d
3. 进程池Pool类进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。 比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。 进程池中常用的方法: - ( l+ E4 x- {: M% z2 r
" c0 X8 l, x" f3 Sfrom multiprocessing import Pool
. L/ E, y9 n9 }' G' J/ q/ y# B
$ ~7 S9 a; p2 w( D* f7 Y% [/ j - 1 K. w5 H1 c* G) \4 @
4 l( W& k, T' @+ K1 ^9 x
import time* V' E5 }/ }: P
% h8 L8 \. H& a7 k8 ^# d
- 7 j# M8 S" _& b- _6 A8 ]
, G' ]4 g" {) [, ^" \* v, u7 U
" p7 r) `% C9 c' x+ K/ ~' g1 S
$ q% r* P7 H* V8 F8 e+ M
8 q: A" t3 R+ ~' v0 F6 y: i% v: E: o" J2 g! G
def func(args):
( Q) a; s( y( T) I
{0 h+ u0 u& U4 s" V* m& _. t7 ?- 8 {) I6 ?5 @# p d/ Q
6 C" T M1 y% K
time.sleep(1)
! K D) } R+ w( i5 K" J5 ?3 v+ f0 w
* F/ t$ z' r F* v2 n4 S) E. q+ n4 w% c& F' P. r& \8 `
print("正在执行进程 ", args)+ g. O# f4 N6 y4 g! Y
- r0 e' F3 t8 e& z
8 N2 `) d2 B9 q
. [8 ~, G+ l9 R
+ V6 @* w. b5 O2 P1 i' n3 M8 F$ W* I7 l- N& F5 O. \0 U
7 z; D6 p4 ?+ B( g: S
, O7 S. c1 x4 Eif __name__ == '__main__':
; b4 b, N+ D! E
$ J: f2 C8 [4 r3 U- ' Q! S- v' ], {2 L9 a! |
/ v( A' G+ e1 V+ J0 E9 o: V+ n! ?- t& i/ H M7 a* k3 a Q
4 \8 t7 C; n% `% }/ T# u1 X
6 A$ [, H" P% {: S9 S
2 \6 L+ a G, V' f+ \ p = Pool(5) # 创建一个包含5个进程的进程池, F# t5 k \, `
4 H$ W3 F3 K8 M% w
- 2 k, d7 N8 U* G) m# k9 I; X
/ S* k) @1 h* G' b: r3 f; x5 U' T5 N/ G. F D. k
% A0 s( u$ t: ?7 ^% j+ ?
* o* R- [3 @1 L$ F6 H2 y3 c) \2 \
0 ~% U$ V% Z) H0 _8 o) _ for i in range(30):
% e0 p% R, V- J, Z s {7 n; c1 f9 x1 p$ ]6 z
- N$ x% Q9 E( k' m+ ^( Y q& S: D; C* |8 W H% M/ r4 m8 Z* V7 q/ }8 m
p.apply_async(func=func, args=(i,))) T" h% y1 _+ r" ?8 q4 q1 j
" T$ t: }- _- R! Y7 A1 d- ) U8 ^9 b4 n) ~0 s
2 t7 s: y( c% b) ^5 |
0 }% l* O9 T% I+ r, a. @
5 n; N$ r. z! h8 H
6 x$ ^: S4 E. Y) W6 Z
) o K- @( M: u, v p.close() # 等子进程执行完毕后关闭进程池+ P+ D7 @) d1 r2 Q' B
( K/ R, u4 G% I9 l- - B1 \, `( r) a
- R2 m& f0 K2 n8 _% @, Y* A2 l+ c
# time.sleep(2); F+ f0 r+ h7 I z! y0 K! T
# P; C" K3 X# O6 Y5 i - 1 ^: u. J& Z" ~8 ]( _
9 a% [6 A( X/ u- [7 ~0 M
# p.terminate() # 立刻关闭进程池
* L4 @( h" f) i. q# u9 Y W( c2 g6 c
- - y% R& X! x( g% R/ I4 I! \
: g! v g, p$ K' h4 f% y D0 Q0 u" \
p.join()
6 W& y# `+ ?6 L- L+ k7 _- N9 Z1 j1 ?, F& Z% J' e
: h0 l x% I3 r$ w( J+ C3 }
5 W, F0 `3 J. q3 n; M请继续关注我
; ~& [5 L4 y" Y1 C8 ?
- j; i7 i0 S$ S9 w& M |