+ D0 V: }8 x: _7 L: }& a
爬虫(七十)多进程multiprocess(六十一)
2 q ^$ |9 } O8 x4 j" T; uPython中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。 另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。 下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。 - 0 {1 z% o: m1 _" d9 b! X+ N! o
% U; U0 T e9 n( g' o1 |) [
import os$ \. e6 x. h. I$ F" G/ Y- ]* ?
" `, W2 J. I( U$ x2 l
- % R0 U, z1 I# R2 x9 {
~; K0 ^2 y5 g( B# d" X6 Z# v9 Simport multiprocessing
1 r4 h3 I5 n+ u5 K4 O: ^7 {0 T, V$ h. A# `
- 2 n: y9 e3 g, [6 b
, O4 y: J; @7 d z+ Y
1 S: {# W( Q/ J$ y- w
/ p0 u$ S9 K' X
+ B8 p2 X8 I' v5 i& I ?2 j& z$ N3 W; I8 z; R
def foo(i):
% v2 U7 l" F0 p. K) w; j; L7 R( x) B: z. v- ^- A' v
1 z5 y( q5 q- {0 E4 {9 k7 J: @- j6 p# p" m" x5 Y* N
# 同样的参数传递方法" }/ `" e& G. R! e" G+ F' k
: H$ i6 V X3 p4 R A( t+ f- I# g8 [" U- i0 Q, N
6 \' F4 c1 P+ L: p print("这里是 ", multiprocessing.current_process().name)
- K1 l. V5 F: j2 J6 [. g. w4 \/ U% r* g# ~% z4 L. F: i' t
- 9 H6 l4 L$ X% i
$ D% _& Z) s: B
print('模块名称:', __name__) j |% n) s4 m9 d4 W
( h6 Y: Z: a" J4 J: y" S - ; l( I) G4 Q$ H& \, {& y
. U% `6 n5 V# _4 N( A4 f
print('父进程 id:', os.getppid()) # 获取父进程id
+ x% h4 N- H3 v
! d+ [+ G0 U5 f o ~5 I - % S9 i" h6 h' K+ D- r/ O3 [
4 D! ?+ h. ]5 H5 u print('当前子进程 id:', os.getpid()) # 获取自己的进程id! P1 {6 M1 a5 e6 o+ A$ f6 m2 \) u
; ^+ ~7 g/ v# w; {% d3 S' [
2 w( B: n$ }/ q- L
+ b$ ?2 J" A3 V+ a7 } print('------------------------')% d3 Q7 E, O, Q' l
, Z3 Y" B: r; l& `
$ x& j% q3 |0 w) Y) I7 l2 F. | K. L* V5 Z1 C. e$ y
C6 e" q7 V8 i! O; z: G# ~2 m
. T. b7 ^1 t. U8 ?: a6 |$ R8 e
+ T* V% w. h1 s9 N" X) O, x; D: m$ g- i0 L4 ^2 \
if __name__ == '__main__':% Y1 G \1 G1 T# O E
# T# m" x2 m- A. R4 s- 1 i/ P$ I7 W8 _
! \, q: Q' r! O' R) o3 Z7 |! Y; S# M3 \- w( I: j
7 @- x' Q0 _1 D2 [ - ' m; b- s7 d8 \7 \# e
/ S1 b$ e7 V+ K% k3 f+ ?; a
for i in range(5):. E$ Z0 L; c; v8 x. W# H
: ~; f6 I3 \* T/ n - 4 ~9 T% n+ K( n! W) c) Y! B8 t
$ U4 Z7 R: y- J: q. b, J$ N p = multiprocessing.Process(target=foo, args=(i,))
' }/ ]+ X6 |% ~) ^/ p9 F6 h6 m8 f( B5 S$ u8 t$ H
3 t) F. y- K# R
4 U' {' X4 W2 h% s p.start()2 g6 {1 n$ m( _! m" D
6 H) @+ p d" C, c5 _. i0 Y9 N
# G1 A6 B$ q" c9 ?' [
运行结果: - 6 ?8 a+ z. E; d% k, `, S2 ~
# K0 a: m* a7 @# G4 o# U1 C1 t
这里是 Process-26 ]3 k# c# ?- ]. F
- I. X) J# O- y) H% Q, t
- 9 g- H) S) i! k; p0 V
# O. }% w+ E0 Q" `模块名称: __mp_main__6 l* L6 K8 W( Z; Z, @% K
9 {1 j+ R+ ?" F6 Y7 C
# j% @. k- Z" f. S6 Y9 t, U: e5 Q8 d& K, Q7 N- ]# e
父进程 id: 880
8 A+ _% R, [, H0 f \" |5 V [- [( u# B1 w( M9 M- f
- . v- Q2 K+ _# T' ]; Y
. U4 u. Z) b ^, R当前子进程 id: 5260
! s* o2 V" J2 B- V' w
# J8 Q. m+ A2 a1 z+ p7 o - 1 c3 T* x" {7 O1 x; P: G7 A) |
- f4 O/ C: d) Q. }
--------------
* ]# v: \6 _6 I5 j7 U; Y; S- Y' q. o' c0 n. y" a: P
5 j7 c% P# x6 @8 ?, E# m& h, p2 P* m
这里是 Process-3
& N* P1 W( n! F6 q3 z( Q. f
( J, z, Y" X9 Z( M# L5 N: M' H
5 ]1 O; [. l, b
/ r9 g$ q3 {* b& f: P* L0 ~模块名称: __mp_main__
) M0 L1 X( t! z4 e: T
5 {/ E+ O# h+ r8 ^% M- & H1 q5 i* n9 e! z4 Z/ @
M7 R) N7 v/ \
父进程 id: 880
5 a# U$ Z7 D; j5 h1 P+ k' i/ ?* O; U8 G3 K' C6 E
- 6 c7 t) H3 G1 r
* ^) r9 [- A6 B: @, r
当前子进程 id: 4912
: X' M/ f( Q6 p/ S, d* N
" O8 [6 O7 w- ~6 G - 8 ~" e9 H% R* `3 |
) Q0 s! A8 v) X) d. ?--------------7 ]7 R: L( [/ h. X6 ]" t7 B7 e
, s8 L* [2 Z6 }2 S
- 2 {6 ^! I7 e* u" v
$ S e2 x+ H, u; m
这里是 Process-4& e- w/ ]& _# \% d9 z! l4 O
! o& b# @# j* z. c1 |
$ G8 \7 \9 b; _6 J. w9 S. y x) H A- h/ l ~
模块名称: __mp_main__
. {0 B3 M+ e q0 J8 }# q# I7 b
5 z' j: B+ L: C! X5 ]( C Y: W" F$ O/ [$ _
父进程 id: 8802 q( P) \" m6 J- _& Z' B! j
) U0 y( ~+ N4 w# ]: N4 Q
- 7 g# {+ r8 Z8 @, t7 a( p
: R( l6 [& M& {- \# a* \# R: n% m当前子进程 id: 5176% g5 q* A3 H$ [
/ b( m/ x$ v. d* g: U- O! | - G& w* g" J( f) W) R0 W- n$ N
& t n/ s% O: A$ i
--------------) {" b' n) R _# n, h
3 f9 R7 K8 N% g# a) @, I
. |- @) [$ Q$ ^- w) k+ I3 u" F4 n0 f) y7 ]' o# ~: b0 }
这里是 Process-1' t# c7 C4 }- ~) s! J5 d3 w, {
, n$ Y K, A$ C1 \6 a1 x) j
- 8 |7 T p9 o7 q& V' X7 n" [
3 g0 z1 P# l4 z& Z, }& f- q8 K& n
模块名称: __mp_main__
! u" ~, y7 u0 q: d7 c* @1 v; l# H4 D
: O& R, P% K" N. E* H) G
; {- q9 E6 z* w! W+ ~- M. G
; V8 x: q0 @1 h( u2 G+ t; z# G4 j父进程 id: 8800 v' J( s+ X. o7 V9 _$ C
- L( ~( h. {. @% u: x
4 Y- N' S4 u3 v, U" S. P o& [
5 ^( K7 _: A# K& M+ w. ?当前子进程 id: 5380
0 a9 X1 {8 H8 Z R1 C) `! T6 n5 c% r8 z2 _0 i7 N) _: `
8 B+ j/ x1 O* D/ U% G$ [0 ^8 {, I: C: R M# u
--------------
t* L- B) Z% C4 F: w/ I6 Z) l
, r& Y, S$ _8 \& Q9 `
: Y# D5 p: @6 n# c8 I) G
, d9 G$ M" z5 ?5 I这里是 Process-5
5 @$ Y; m; F/ [( P% y" h; R8 Y2 i; j2 f& z L- j2 t2 i) t+ W& W
- ; \; J1 D { E( L7 R# K8 M6 L% @
# {7 T4 [; Z4 K: \模块名称: __mp_main__! p8 y" X3 K {; H% X
0 K* E4 m& b! {# e( w+ R$ J" c- k - 1 N; O0 a3 B/ |3 _+ U" w3 l
) p/ q5 }$ D- ?& \9 U( R. U6 K+ x
父进程 id: 880
3 S$ ?8 o9 o f- d6 K. A, d9 G" _) X# ]' x& e. H" T
. I8 o3 {+ c7 F+ j* \
# I: ^7 D" ]$ `; v. \- v当前子进程 id: 3520
% ?# [' q e7 c) j f/ _6 n7 {% q5 N7 T3 `5 ~5 C
- 0 I+ x( I" o' i9 O @
; L+ L# ~) V' t7 R--------------
% n6 ?# t! @9 _ U _5 N6 t! Y: d! ?6 |4 ^+ y4 P" v$ d% G0 u
2 c' U3 ?# |( k: E* ?. I4 e$ N6 H& b
1. 进程间的数据共享在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。 创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。 下面我们尝试用一个全局列表来实现进程间的数据共享: - 1 W, @( {3 ^' z2 W3 r) r
; E5 B1 g# x' M o. h
from multiprocessing import Process0 w9 t" O( B- H
+ ~8 q! w: d* v6 o' C3 ~2 I
8 {2 w# _% F; N, s+ M I2 ^+ W* s& G& K
2 Q) w9 x9 F* }% U t+ i+ L3 }+ i/ w" z0 n
# Y6 e, n8 H% `
, n0 ?8 j1 t9 p8 q% q( l) ?* ilis = []8 t3 t; b G/ G4 ~ R- u) F$ |. {
; R3 C4 Z# l% c! v
7 M# [/ u8 C. p {. J/ D) F0 E& b
8 x$ a+ v( p, K% o" a
4 @, S% q( s7 U2 F/ J, C/ h: m- 4 h# a6 j% d8 V
+ o! A; L/ Y' o! S+ @2 ~def foo(i):
* n3 b# o( Z$ |* b& g* W- K1 Z" l8 l
- , ^, i$ y! ^! f- j% J! O
$ d( M: X( h" T8 J$ E, c: r+ C2 m
lis.append(i)2 T* o7 y, n9 M5 r6 b5 t4 `
1 M3 A8 m7 I5 m% k5 ]; O" [
- ) }/ E& m- O6 U0 o
3 `$ c8 m) A- L% z- B2 d print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis))
& j! |* h# F7 K7 A0 F
( ~$ o4 K; F6 w# Q
- X L8 F4 n' b3 {% X) c5 u
: q) v1 c' o7 O) C' r3 l4 b3 R
8 o" Q* E) Q8 _: Q; R S C
1 ^5 [! E' ~! g' q
" `1 l# g% {2 v5 s, H7 j( I4 T+ {$ g5 J
if __name__ == '__main__':
* u' e8 w0 f9 u: [0 ?; C p! u
5 O5 S+ e7 w8 ?# O- ' g0 ^/ k+ ~* P, I0 j
' Q; Z( }( G X# y7 Q6 @ for i in range(5):" f7 e8 h0 n* A6 g- \! W9 ^
* e/ \! A7 V) Z! @ T$ R9 P
- " K7 l; V8 A$ | g# I* e# b4 D+ M& ~
) j/ x+ _+ y6 F5 \
p = Process(target=foo, args=(i,))# Z# X$ }) i$ k1 [- i9 A, s0 P
* z* r' |% X) ^+ N' X& N
' z* z; E% q& z1 \9 e8 l4 j9 a9 u& t, {) f9 \
p.start()
5 O- E$ U9 q) K
4 S- b5 L. E" j1 h- 8 |! t1 @, K% w3 @6 ~& e2 M6 b; G, A$ j
4 J3 y3 o" \8 I- u# \" K2 `. J print("The end of list_1:", lis)9 a! b' `4 p% s( Z* Z2 @3 S
: G& U3 x1 K0 _$ |$ q
0 K# z2 w6 i* o: m# u0 S: f
运行结果: - . r. C* y, Q. X, ]* b. Q C/ C" m7 v
5 P; f6 R+ Q/ K! x5 R; ~% L0 P+ D5 {The end of list_1: []
4 }( ~" C+ }6 j+ S
6 b `1 q, m* K8 _9 e4 d - 8 x" k; O* `) z t- x. n' c
' b) [* p# n0 U
This is Process 2 and lis is [2] and lis.address is 40356744
9 {- r; p4 @. i+ c0 s# U) @& ^3 }+ l- @9 F1 |8 F0 u. g' j* \5 U( U, v
- 4 c N6 y ]! _
" i6 K# h1 h7 ^
This is Process 1 and lis is [1] and lis.address is 40291208
2 ~5 t* m1 K* b' D& y* b: l+ j k" j% d7 L5 R4 Y% h- f, c+ s# h
- ! }+ p0 M$ R9 X& W# w e1 l
* }/ {& v D" gThis is Process 0 and lis is [0] and lis.address is 40291208! `: H4 W( M2 M8 F3 c" _
& Z0 B9 Q/ A7 l
- ; o/ v( _+ }, _7 y
; ]# o! K( G5 o: W0 NThis is Process 3 and lis is [3] and lis.address is 402256723 t1 i v% S# @8 {
2 w; _4 U0 {+ @; \% N1 A
& V0 \ v$ M% h( H4 M3 J5 E$ K/ B) ?+ R) }6 o4 _
This is Process 4 and lis is [4] and lis.address is 40291208
% t/ u7 K6 V8 N# a! a& t) m8 y# c+ o' L! A0 b
' \' M" o' m- g7 J( S& o
可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。 想要在进程之间进行数据共享可以使用Queues、Array和Manager这三个multiprocess模块提供的类。 1.1 使用Array共享数据对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系: - ( g" l% X# P# ~7 C$ h
[* w' w& }: q8 U" S0 p. |'c': ctypes.c_char, 'u': ctypes.c_wchar,
% t$ i; L" @1 ]7 W* O. {. v# `9 U& }
- % y$ e! ^( I' b/ y% T- N/ k
7 z9 s( j: Z7 f X. \( P
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,: `0 Y* @; D* D& e( Z* K
) Y) w2 w/ p9 d6 z S7 M
1 W: E" q; @' } Y' p
! m. [/ n7 V' f# S0 y* f'h': ctypes.c_short, 'H': ctypes.c_ushort,
+ F$ i, f9 c: e" g* g* q5 ?' y8 F
4 y1 B2 u6 A) ?+ L4 b2 s- i- / c8 D& w4 h. q7 p
& t! L- ^; C8 g3 v3 ]) C'i': ctypes.c_int, 'I': ctypes.c_uint,) r0 t9 u+ d$ |. Q; m G+ N! ]
1 x3 g/ X" O3 e9 X9 |
: |8 k. Q' e, F. q
# Y3 H8 |2 F& j2 H0 J' X'l': ctypes.c_long, 'L': ctypes.c_ulong,
. O; ?' ~9 A# g% p- @( b. K3 u
: I3 d) V5 |" P0 m2 p8 N- Q5 j7 ]- , K2 Z9 u7 A5 m. r* F
; A2 C0 d5 v( Y2 i l% G9 s'f': ctypes.c_float, 'd': ctypes.c_double
/ z- k9 A* W" G E% m
! O0 F+ G. f' b/ u5 W8 u9 k4 L+ n J) k% m9 t9 e) }
看下面的例子:
6 a5 ^% {" o. ^) l8 p3 E! }
: {* C! H d2 y* Q4 @+ _- sfrom multiprocessing import Process
/ a# q4 u8 [! e' {
, X k$ Q1 M) R* A7 x6 |% R
' }) Z( k! q! _) I4 l4 w
$ j0 [6 ^& O2 a: F& Bfrom multiprocessing import Array% c, i8 a5 v8 H' n0 f N8 c
+ o1 X$ `8 }) P4 |" v, H. z4 W
6 W& W3 z# H, g2 i* ^ N
6 H5 P# T# U. x, L
- b4 e L; h5 D! ~5 F: e* F9 c5 s# u: W8 `
- ) E, r6 g) R( J) s9 l- h
5 I, c6 k' S# t$ v6 s+ a
def func(i,temp):
5 L4 ? I2 P5 H. l9 M: O, B1 }
5 p( ^6 F1 W. N+ h - 0 F) \+ f5 t9 y: r& U, `
( f6 p( k7 M$ S6 W- }; F3 S" U temp[0] += 100
8 Q& {6 P* b \, z2 _1 _
% l% F# u9 w! y' P% q
2 U0 Y. x: f# c$ _' i
. Y& A8 ~. N( V print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0])
4 l X$ s/ J, I/ P5 v$ R/ p9 D
8 X+ M+ X# Y8 P: N |; E1 @. Z- @; m0 z3 e& G" P( E
* \4 t; p* O9 R: t( a$ A( Q- d4 j$ n
+ V+ L0 `, p9 i% O5 V! v" ? - 3 Q! o8 W l+ ] {% J+ B1 C
( O9 G ^. F+ t$ @/ H
if __name__ == '__main__':$ J- Q* r/ A% F% W9 }6 }, j/ `
8 I8 {1 N2 J# v' i' p2 `8 z. E - / t1 e4 b" [/ E* h6 z6 B" w
3 t# n" N% `8 `0 t f7 k* _ temp = Array('i', [1, 2, 3, 4])
, }3 Z X6 b" Y1 o1 V5 D' c4 i" P* _, y6 W
0 A5 T c3 D8 e( d; u) ^4 R7 x- V/ s9 q8 F/ b& u
for i in range(10):
) |# {7 K4 m7 k$ |$ S' \8 w6 k+ O- Z3 |
% |1 ?5 x" _3 q1 {; W8 P1 B0 S$ f o" f. `9 k, M1 N
p = Process(target=func, args=(i, temp))- Y6 i: b! ~: h( |" p/ s
# ?4 C# w, ^7 v0 {6 W3 ^
% s+ i: @% x1 Q# ^9 r
$ _! P2 i) w2 b0 t" q( v) W x- B7 J p.start()* N/ I& R( ~5 [9 }
" i' f( D1 R; v# ] ]; ?
5 v0 c7 u( ]; R9 C/ Y- X/ Q
运行结果: - 0 |. X/ S3 w1 D" U5 {: B
: r2 C; Q! K: f进程2 修改数组第一个元素后-----> 101
) M, N1 u1 R& c" q Q/ y2 A& _9 y- c) G6 C) e) h
u6 v# n3 o( Q, e( q( T* V0 r. y. u% p3 d; Q5 b, }; s- h
进程4 修改数组第一个元素后-----> 2019 C- B# G- W( }" x; I- r! B8 K) `
: A2 j( B6 u, s; x. f
0 f+ u4 _+ _( [0 B
( [: M; }! Q$ x; e9 m" N+ n' {: Y" V进程5 修改数组第一个元素后-----> 3012 @( F, l4 a2 ^! Q3 t9 p% f
, q/ m) J7 d/ }5 t, i
- 7 x& K$ F; a7 A6 J8 u, G9 E2 R
. X, ?" P! |7 A. Z% G9 a进程3 修改数组第一个元素后-----> 4017 G" E$ s. y4 K. Z) s
$ C$ }4 `- z( ?
! u6 l6 B! V+ \, O# B$ M! \( h% C+ j
进程1 修改数组第一个元素后-----> 501
$ n( _4 r. x. D$ y' C6 f
3 L8 x/ s! s* Y7 M, i- * s9 P' K, J- l! }: \; c5 h y
+ h- C4 I- z- w/ E2 h0 _! `$ |; e进程6 修改数组第一个元素后-----> 601' k& N* X* j5 j- _ n+ {
6 p2 K5 i2 E: p; s! @7 [ - 9 r8 ?8 K. i! R3 i- q
O2 b" t/ v6 o( T9 T7 L进程9 修改数组第一个元素后-----> 701
# }2 j1 f" E+ R, G1 I9 u- L2 j4 E3 Q' }
X: K0 m8 H. o- u3 \ u' A# V2 r- ^6 N" @) {6 l
进程8 修改数组第一个元素后-----> 801; u% c( {+ u# T$ L
8 u6 D1 g- y8 @) q7 P3 S1 Q
' C: s/ v! y. I. @
. F3 N( J5 p1 a- W7 v+ u进程0 修改数组第一个元素后-----> 901$ F8 i6 P$ m: ~) G! y- ^- \
7 [- S& V8 B/ E. u: b: H
- 4 c5 m9 z1 ~6 q; T& w
# H/ {- t) a* a0 G
进程7 修改数组第一个元素后-----> 1001
9 g) S1 I* Q2 E9 I( W( r( `! R6 ~8 I4 U0 ~
7 K" r0 h* E& ?
1.2 使用Manager共享数据通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。
3 L1 L$ p1 }! Z5 D, E( y8 u# w; s
! r+ {8 P) I4 u1 w4 _2 a. ?from multiprocessing import Process
- B* A8 [9 a x" @4 O0 ?4 R$ {& s3 V- G, s" ~8 K
- ( V9 k7 b: K0 n
& j" Q7 ~' p+ @# u2 L
from multiprocessing import Manager
/ _' @9 _3 e; q ^0 X$ d6 S% q$ m. [# B
( f6 C' F/ w. a2 S0 F8 z4 E/ E' k$ }9 U: t, _6 t* C
, G( D1 v$ h: o. y6 _
( k- q& C8 z! N+ h/ [- V' w) l- 4 e& H; j! M8 P( k( z" P, l( {+ Q A5 }
( b( D0 P8 n( ~/ h& l8 |. Kdef func(i, dic):5 @2 J& |. W7 B1 Q* g+ E9 E, }% K
/ g9 p! v) `0 G0 x! Y - 3 l r! ?6 o6 M. f2 }) e
( m% @9 J0 s8 p
dic["num"] = 100+i
* ?& C/ s. p! ~% b; g
w6 K2 C0 e4 S2 ^
/ [/ S% }; f: O$ g+ K3 f5 f8 u& N: v& C. |( z) p/ @
print(dic.items())
$ I2 I S! r$ r; T4 G
2 r {. p# l0 J M0 m2 I
& j+ T4 \+ ]# B, k- w8 \" t' }
) R1 c' f! u- W0 \0 @) A X: U0 p# g: O }* j* a
$ Y+ ~; P3 B6 G4 w: A; ~
- 5 m: G+ J, b7 z4 b! F
+ p1 g$ C/ N, P" c
if __name__ == '__main__':- O% a: b9 M9 H6 B6 C, q9 r5 Q. W
0 `+ Y, U6 T& [8 B - , ]7 J, d" s& K' O6 ^1 g( b4 Q+ g
$ N& H. c$ d* R; c
dic = Manager().dict()
6 e# r9 O, h- y K* i8 ]& J; s, i" s8 R" D* T4 a4 g/ R$ Z" x
1 Q9 P2 p$ |9 S. w$ n5 n! F( Z4 z: d8 ~) a5 t5 l
for i in range(10):! z: Z" j5 ~% t" d2 m
8 d& v: n: O8 [) s a$ A
% {: J4 I' g, H0 c) s2 \5 {, @9 w, ^7 I/ M# M! `- a
p = Process(target=func, args=(i, dic))) W2 l6 y* m, m# W
, r* I0 `$ ]( O Z2 ~
- 9 Z( O9 w6 ^" n( Y! K
$ c5 i' H' ~+ @% J# j: d. A p.start()
2 m; o4 M) h3 j0 e- q! a* L
3 {! p# D. k* j/ L9 l9 {
. h$ m' V* |1 e; L9 P3 u2 W+ ?* e' I) s$ P" b7 v% w0 [
p.join()6 y$ a- L+ K; a2 D
8 r* ]9 r6 v! J2 [0 a4 z$ m- r4 `5 Q+ s, v0 ]9 I W' P
运行结果:
' {4 i, N" u8 w7 J, {+ y) B
* q! o2 r2 `+ }* [# Z[('num', 100)]
8 M) K: W4 u& V( @# N2 B6 h! `
% t Q, l( |7 B; K- 8 u% S. Q' ?$ o6 |5 z
' Y' b8 y+ b, |[('num', 101)]/ b+ h; @* l# r
A& b8 F* E( e0 R+ I
9 z9 T. \: P/ h9 I' k
8 Q$ v/ m# N3 E! @% A# w' P[('num', 102)]
$ k* D1 |( |" m7 L" A( _/ v" x- C2 o
- . j! s& c: Q% A8 \+ q3 B: y3 V
9 w" ~1 C+ z C: B. z& @' A[('num', 103)]& r7 M* _( ^, R! k
) v; a, f. V( l; L - ) _# g" C. t+ r4 N2 u# {& h
, t$ l* U% }! o$ Z7 S& _; t8 {, w
[('num', 104)]
) I4 }( h: _ W, N
3 |( N; B. m, U, M
% M; i, K/ C: m2 U6 A1 e8 L9 P6 E" m+ [
[('num', 105)]! [6 e3 A" s: Z, i+ A
5 m) Y9 _3 V! e# Q+ Z# V0 I4 W
]: N6 N2 x2 ]) [* K% E) J, y6 N- q$ s
[('num', 106)]/ K& f' Y9 T. J2 h4 C! _* g/ N
, m) a W: N* v5 Q
- 3 }* V: V$ b0 D$ g' h% v; ?, _4 n o2 }
0 D5 K b7 ~& Z[('num', 107)]3 y0 ^5 a& e5 p$ b: e2 W3 X
: u, B L. P! p- D: v1 z
- p7 S r, ^( H, c/ R& \. \. D
# w' c' r9 z2 A# Q* O) U
[('num', 108)]
$ ~6 L& f; u1 c% P& B: N: w( a6 C& k- U
8 Z1 T3 d! |3 I5 d2 G0 |
4 w; V( Y+ y0 |1 }. c! |[('num', 109)]
+ y+ K) M5 X% c" l5 c
$ Z+ t" D5 n8 H1 l" ~# O6 S4 Z7 U4 P
1.3 使用queues的Queue类共享数据multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示: - 6 H' w& E/ _* h( L$ \
- M+ K6 W5 d# w& p' \" rimport multiprocessing9 Y: H$ x2 F' D7 r) U
8 X6 Z6 |9 N' _; u* A1 ?8 ]; m
% |8 Y& ]4 V- Y4 w9 v! T3 P5 T
! c" D. p! M1 |3 Tfrom multiprocessing import Process! M% C: z4 Y4 f: V- k
* |6 t% d% L$ A
\+ R$ [& ~& T4 z+ n1 D( X& g
3 R. L- v( ^8 X1 A! j9 T9 ~from multiprocessing import queues
& r3 k& i0 t' z: ^ S% L* V$ C8 U6 @4 \0 T r$ }
% c1 k- w9 Y; [( d
W( P' [ J) h; G' q. X
, O9 _* s" J& m* |; ~9 K8 ]5 T$ n3 g! c* g# T" [8 D/ {
- * B: Q5 C' j. G% Y* g/ E( n6 X4 F
. r& Y" \8 b0 W- A, E+ Y/ ]' p
def func(i, q):
2 R3 h+ Y% S4 T0 P' R( d/ O: A" j
. l! e* f' g0 s3 c _" A2 J8 c - " d" Y7 P$ w& i3 W+ j
* j! v# f0 X# R2 x
ret = q.get()
: k2 B4 }" m. m# M3 e3 v# B8 ]" B; r7 V3 r! ]0 g
! B" E! b; j( b! G2 s2 ?' b2 K
8 B# B% K3 e' H" t9 C- Y; J print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i))
4 q( `% F2 A" P" L, ]* F* v/ H( z6 p5 t# v& P# ~- }0 |
5 M, T" L$ y7 e% U( Z; M7 g1 {. C! A2 u/ t" f2 s% r7 e
q.put(i)
8 @* t: p0 ?2 c/ s( @; |9 t) H0 W
4 V7 x- v, h1 x8 T, V7 v d. D
5 \% `- @' V; Q0 w+ x, ]3 b- X/ A- F6 u& @
2 ^1 L, g- b& Z. Q
1 J* l* o1 h W* o a9 K. v% f* ]1 P
if __name__ == "__main__":4 s7 m( M3 K6 H" p9 K
p' W3 Y0 P/ H+ B( F) {
- ' k$ f4 V8 w5 S3 F% a! Y8 W
R2 v# e" w1 x' U4 r0 E
lis = queues.Queue(20, ctx=multiprocessing)6 O4 g& V0 ^# T2 k
/ D: G; Y1 I4 j# }8 a( R3 E5 P* ? - 8 |, A8 R5 l t. [1 y# o/ v' c' q
4 q; x% v; T, M" ]3 m1 C/ N( M9 T
lis.put(0)9 _1 N" `8 ?) [
' U& ~5 \* C2 s9 A+ _; G6 i2 h - $ ?8 {3 u( H$ p( p- i! {; e
" g2 m* w, w* U- `
for i in range(10):; K G% a" ]/ T" `
0 I! _) a o8 J" H' n5 s - - T5 J* Z1 U) k, A9 B) d
( q$ r& f0 y' ^6 D4 J6 F0 p: w p = Process(target=func, args=(i, lis,))
h% v& G' q5 U+ H! h' ?3 u
7 E/ q; b) |; ^6 K6 x
0 e* w7 \$ u& R o) Z8 Y' w
. g- o+ a- Y" N! y- e p.start()
7 w1 p/ P; c6 s. d1 u9 D7 u( H, q$ c g- |( N& S( k. M9 a9 y
- D1 Y$ |: `. J7 W7 h6 k+ x1 @
运行结果:
, H' b7 p3 B9 \' V
# ?" X9 n/ X" }0 v! ?5 q9 t进程1从队列里获取了一个0,然后又向队列里放入了一个1
/ t8 C6 n0 a9 r0 Z. u/ m# m
6 B0 f$ N; |) X2 N- 9 t1 u% E' w" }' S& R3 I
5 |$ k5 H3 Q( i! [( k+ V% d& N0 ]进程4从队列里获取了一个1,然后又向队列里放入了一个4
$ r u1 l! ^- N( {) n0 u# J7 ` d7 `. u2 |! D# B- g+ Y6 s, y) n U
! x7 p1 }% Z, s1 s7 R6 M0 e: K: h/ i! U* p5 D# C
进程2从队列里获取了一个4,然后又向队列里放入了一个2
* P9 K) }2 n& f, j5 t6 m! A9 ?
: r4 j5 q. w0 k4 p
# A7 V2 \' C9 V$ C( i- e
- R6 t6 V9 n2 B2 g! T( \) d进程6从队列里获取了一个2,然后又向队列里放入了一个6
: v }5 Q0 e) F4 r V x& y6 J
u+ z) r" [( b: ?
& ~/ U4 z, F! h/ u" I" S+ q' F! N% S5 O4 x S, [+ N0 r9 v. S
进程0从队列里获取了一个6,然后又向队列里放入了一个0
7 B' T$ T1 J9 A% P5 }: Z4 ~3 W' K3 Q. G' u; ^ c$ ~- c* e
- 1 N2 o/ k/ d! G
! z8 E: R+ N, u% _8 i进程5从队列里获取了一个0,然后又向队列里放入了一个5 h. g4 ^3 [( j. b! n8 n. Y# d
' n; r9 Z3 C$ h m. |
+ B5 M! |* a9 ^* g) g3 Z8 d' l- }2 A# v) W
进程9从队列里获取了一个5,然后又向队列里放入了一个92 H/ Z" P, m; N
2 y3 \8 W7 n _: w: u" {- # u' g. v6 C& c2 @3 Z
2 K0 x; p7 B. z& j8 F进程7从队列里获取了一个9,然后又向队列里放入了一个7
$ {% m/ q) X5 g7 Z3 b: x# V% Y
( o! J! V& E6 c; p4 X: {) o- y - ; c8 P e' c. `9 q$ V
( m" k, N0 t9 b1 k& W进程3从队列里获取了一个7,然后又向队列里放入了一个3
$ }# M5 G! v/ |. m; Y2 X
% ~' n2 ~9 f" B# q' o2 Z
7 j3 E' v/ i' }, m; I1 f: D6 t; u( S8 z
进程8从队列里获取了一个3,然后又向队列里放入了一个8( d8 I& N0 d( Q; Z4 j" O3 A/ r
9 N5 Y) ?& ?# W5 Z; I! f1 d0 V& W' G8 [9 d
关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。 2. 进程锁为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的,这一点非常友好! - * ~. N8 L# ~+ X
* R7 n& {$ s: j b8 b
from multiprocessing import Process
( x# h- m- M1 n
( _' X4 ^) _$ C6 y' ~2 M
% A+ R7 Q W8 U' F( \. X9 I, U Z$ D9 m0 o, N
from multiprocessing import Array
9 C e7 ]" u, j3 B4 g5 N1 s5 I
8 E( @& Q4 T3 {! J3 b3 V- 9 i% T) p( C! @3 m. b. @/ w V
4 A- q2 ^; a2 v8 n
from multiprocessing import RLock, Lock, Event, Condition, Semaphore/ ~$ g7 N: ^# j* }' u( [/ \
; T- S: [: l% E1 a7 H
" w* u0 K! y4 b5 d+ _5 w$ A7 v% X, E% W$ Q3 f, y% R8 @8 E% Y
import time
' h. E& a' ~: g/ E. k9 [7 m
4 o. E; X- D6 j- 2 l4 {7 F8 G4 Y$ B; r
# r& v: w, T: }8 m7 U' ^9 v. Z: q% Z+ ~& `6 b1 O. h- F3 _
" P% E& \2 \9 K0 A/ {9 W0 }3 Q8 z( C
$ d% a+ {" M1 c5 k U% S& }0 o8 [# ?6 E
def func(i,lis,lc):
L# L6 G4 Z; G+ q" N m) o' S, \
1 s- C$ C$ p, S& E1 b/ w8 r- / e$ S- m* d2 d4 p
2 B n/ e8 v& p6 N$ J5 W2 G( ~
lc.acquire()7 p5 I1 B! [ N, t# L+ [
! i5 y I: f! J
0 _* T! Z% Y3 h: a R
5 r3 H0 {: q, k; o# W; M1 H lis[0] = lis[0] - 1
1 D' X. v6 V, |2 b: M4 P; k2 m& A
% Q' i* u4 L, ]
' v, U! L, @6 m& \9 Y' W' d; b, C; k h6 p5 L# f/ y o
time.sleep(1)
U9 m9 a: t2 f
+ A+ \1 i& K; \/ Z3 p- H2 G9 H0 ^- ( `: a; R+ P* d
9 ]+ F# u# T; ?% a
print('say hi', lis[0])
9 Q# p# S1 d/ S( b3 w& D- ^# A D- A0 D8 B' ]* ]4 m
- 7 Q' E9 F* `1 |. x
7 M' F) c6 L2 B$ j( a/ h% U lc.release()' k3 e) N' d* d; W
2 n0 G. w3 k' J% y - ' g/ ]+ C* Y0 y3 L
9 D+ g# F4 A2 @8 z: Y6 \
; v' }: S7 f1 b: G# m
6 e8 y% _4 z$ ]9 `: F! E - ! f. p' n _2 K8 @) I
3 c, b S. u5 {if __name__ == "__main__":1 x. w6 Q2 t) V, N
' a* g0 r* o+ R) s f t
& f" @$ X9 v! i* n5 R& s: |& \0 e5 B& O2 A3 ^
array = Array('i', 1)
" @7 P8 ]$ l6 q s: y- y x3 C T% y% a* }1 a) p
7 J% x. {: ?) X' b) p, P/ q) y5 V0 P. a* l4 M Q0 Z
array[0] = 10
$ I( h# C2 p) H: v! D
- @, G, a2 ^, E- - V" i7 W" {1 L( L' K
* m9 D4 V- ]' g0 p/ f R- W+ c lock = RLock()
6 A. |$ J. V- {, R( F0 V
2 H( s0 I0 R j% k
( b) W2 N; }4 ] t6 j. T2 D" f" W5 n; A1 g1 ]8 ?
for i in range(10):
7 A% T6 L. o4 t2 \' y& ^+ E6 k7 U+ ~" T
- ' j; o, l6 F& u$ a, P3 J
4 _; C* l: O1 ] p = Process(target=func, args=(i, array, lock))
4 w5 R% h7 \! Q' G+ Z* I5 T8 {* q t- Y
2 K4 I2 ?5 A" g A% b4 G0 k. h9 D3 G& _% H0 b! v! E
p.start()
1 z1 p' O5 j5 H8 h2 i5 l4 i9 S" X/ A2 |8 G
( i1 c1 E! d0 ]$ S5 t5 B
运行结果: - / p8 K4 N: I$ g- t! g- N) v
6 ~5 l6 R/ I$ X* u" [say hi 9
! i" m8 l. y+ T" Q) V
Q. U, @& X4 \8 |! e. I# H4 @ - & ^: t0 S4 e' l1 j
4 f( `3 H2 I9 D6 I* Ssay hi 8
+ I$ h2 I- A1 p' a/ M% n& X) F' r8 U. u4 w ~5 D
- " R3 V5 f* n2 W7 x% {
4 L! D4 n% T2 m* v
say hi 7
8 E; @! j8 d7 g, w0 `0 f# ~, j8 Q! ?
- / |: C5 ^( _$ P+ X# k+ f
8 c1 c8 E9 @# ~1 W
say hi 6+ I4 J; N X ~4 R" r
3 @8 A. M$ S# e5 W8 q - , Z0 J3 R- w7 ^8 |" o% T; N
( ^3 d. e. k+ E4 M* R+ ~say hi 5+ x3 L" x& t% T8 K
8 ]9 b1 F* L. g ?4 r
- ) @' z- N4 R9 S
1 w' @/ |4 Q7 w
say hi 49 k* w' ?9 C9 p' d
7 X0 }3 v5 w. U4 r8 X
" M, W" t! Z$ K0 O' I, A/ W( U
% V% Q% Q6 `$ }+ y Y1 Hsay hi 3$ O/ P( [' y/ j) r/ R/ J8 j
$ [$ {3 L8 x/ G
P L& w& R* u0 V+ K8 D9 C7 F9 c
9 ]2 T D% E1 x4 hsay hi 26 d# o2 X4 X9 Q( y" ]3 S
( ?. G2 d. O A0 w7 b; m0 o: d
2 F0 ~: F% Z, k' r5 K0 L7 b& A7 I& W
say hi 1- ^; D4 {% O5 q% o+ Z
8 {* A4 { u( \5 [8 m- [( C- , S9 I) \, d% j9 K: g( I y1 \5 `
" s3 f4 e$ P& Vsay hi 0
! g* Y/ j; R8 ]. [( M8 b G1 u- z
) E4 ` T: _0 \ K% s2 i0 J
9 D$ J* X( i$ o0 [6 T 3. 进程池Pool类进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。 比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。 进程池中常用的方法:
% m6 t1 P- t& ?1 S# d# ]
5 P/ c# | j- T7 Vfrom multiprocessing import Pool# m. m; w. Z$ M" T
6 }, a& w" ?2 W: w8 V& U/ B
9 M; ]3 f* ^- n" {% L6 Q5 |1 o
+ J" k; ]5 s+ b) z( J; oimport time
* X; N3 u5 }+ {; c' |* |) ~( V; a( O
- 3 g( P: L# E: R c* @
6 V2 z: `7 L1 k) v2 [" z6 [
2 x1 G# a+ ], b
. L; {+ ], Z, ?! m* o+ h
+ z& F+ u6 N; Q7 f0 e" e& |% _9 ~% h7 s0 t) F
def func(args):3 e- k3 q% R5 A2 R% B
) W9 m* o+ B0 Z8 r1 z \- 3 w% O4 p& o' K! L# {
) _$ t6 @8 O6 s
time.sleep(1)
8 m; v' A4 o0 I8 V. N" E
( m- P) E2 D0 M
* X) @3 r t1 N# U5 o
+ p6 q) p9 X" ^& f& P. g print("正在执行进程 ", args)
" l3 D, J& s+ c
. E" s# Z: B8 C( w
% Q4 v- m/ s5 r9 Y1 u# n2 f0 J1 a U& n2 v- U1 i, w# c Q8 q1 }, u! P; q
" ^, x$ D& x" j# U
9 z# t7 h4 u% ^
5 o; ]( |8 Y4 s/ j! ?5 {
7 ?$ d/ \3 o3 k) |2 b, @9 Vif __name__ == '__main__':6 t) J3 |2 L, b; U. }7 m
* h0 b( k# f+ `8 B, s# n6 G
- 3 g5 @' L3 b9 i- a1 ~" R2 t
0 d9 q5 {1 i) a9 B" \ u
8 B! I5 n. y9 C `4 Q$ j/ K6 r' {
9 c6 A! e; `7 ?6 m% i( K
/ ^6 I0 o" f1 ^2 p" q& D4 k0 \) |, D
p = Pool(5) # 创建一个包含5个进程的进程池
% T0 |# w% _8 x
! d' J; y0 \% i. B
- d3 n% R0 H1 S6 E$ T7 @2 c& f
2 g% |: w8 j$ f }3 k; N" z, ^/ |. s
" Q* A0 m7 D9 Y$ G" B9 U
% b. i0 E+ I& i/ u" @% [! h
% M3 k! `7 |2 f! W) J
5 Y5 v4 q, O# r2 N' M for i in range(30):
1 X9 b2 ^6 j8 x* o6 L1 ^* X7 N4 q1 ^2 i" e6 U
- : G) P( m; e# W3 w( \' c
3 j) R: [" b( `* @8 e- U
p.apply_async(func=func, args=(i,))
- X+ n6 e/ Q: T: Q
6 p1 x0 M4 k5 ~8 A4 v) L" A - : N* y$ I# k( I( H* ?3 p
' x' j. M! ~/ D, x2 _3 _! [
8 Y! \9 g8 u6 I- N9 s
& }0 D7 j) m8 W' k) n - $ G* O" g( a7 ?' W0 @
7 v% t/ J; w8 h4 }' P5 N9 Q7 `2 v" ` p.close() # 等子进程执行完毕后关闭进程池
3 _* I8 ~# @7 X6 \# N0 z) q- Z2 [/ S7 z3 J) p+ {$ x* Q
- 2 G2 Z8 \8 p$ s
" {+ i/ k5 g7 b! Y) O; Y
# time.sleep(2)- |$ v- U# }: }( [
6 V4 I" W( v) r
$ b2 l$ z* o+ K$ C6 l& D3 i0 f( t" A# ~% l
# p.terminate() # 立刻关闭进程池
' y; S+ U: D( s$ b& Q2 f
A( L. Q+ _; ~1 O- [7 W" e- 3 m# X# w/ ^# j3 j2 K* D
/ k# a$ V) Z4 K2 a1 L
p.join()
$ I3 B/ s0 t/ F$ h% K/ i; l* k4 ]: S; s/ S+ N: c5 l& t3 \
8 f/ g+ m+ }+ }. I) y
% R; ~ A# m m# L. o! {! i请继续关注我 6 J7 _) A# b p
$ t6 _& O. o" O* U |