9 R' }4 T2 b4 a" J
爬虫(七十)多进程multiprocess(六十一)
. x1 z% F4 W0 j7 L/ hPython中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。 另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。 下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。 - 6 w& |* X4 V$ a
1 \' ^7 z0 w2 H! N' E( z5 E- k" o
import os
" H5 h5 Z$ {; Q- P9 f4 l5 O* u; I5 x- A% q
& w* h! T5 W8 O. H+ W3 {) D& r- u6 }1 h% r+ `+ r, ]
import multiprocessing; o9 y6 }. E2 }$ L3 x+ q
+ y0 w" l) W" L4 V, s* W
- g4 ]9 x/ S, g/ [9 k
6 j# d j9 _* _0 V8 L3 J5 ^
5 J: b6 O3 p8 m! z
4 f) e5 t2 k, I3 r
- 7 V/ |/ _" n" r+ X2 x
2 H4 P B- e' d+ r" Cdef foo(i): I& q5 T \4 i
# ]1 x& m/ O9 Z9 q7 X
- ; S \! T( @( X8 L
0 G6 b2 C4 N' W" G$ V2 l # 同样的参数传递方法
9 j! ]8 ?( `: U; s* l* x2 Z% i+ v o, i
8 ~* K% s3 F! L2 t# l8 T# [
- \/ M8 r, p& w% r" l print("这里是 ", multiprocessing.current_process().name). B) |2 e2 _" Q2 D/ X2 [
% ~2 P: W Q- K2 Z. w4 x+ e: d! n/ Y5 }7 E
' i5 V' O$ ~$ J6 f N; D; l9 i7 J8 M+ K8 E) j
print('模块名称:', __name__)
' r( ^! e, d& f+ ]8 C, ~# Z
* r, ]& u1 P& B
# R' N6 d. o l) v) W: r% P
- J, ?# L2 a& q" g, T+ X- Z print('父进程 id:', os.getppid()) # 获取父进程id b2 j: A! j2 S* \8 V( k& u' |1 L
! `8 ~0 @* d( |# e& C
- 2 t# a8 U" _# b. j
( T3 p' H% Q5 N
print('当前子进程 id:', os.getpid()) # 获取自己的进程id
6 E+ l0 e4 Q p! G
7 z ~ K. `' v' \ - : M' n9 n9 w9 Z! z. z" Y
4 V7 O/ D7 E' Q+ L% e print('------------------------')
, D. \6 `: Q1 t5 w" q9 R
1 Q+ O/ N+ N" q$ }" D
5 n( A7 E( _6 m& M1 q$ Q8 _- L0 f
& ]4 j- e; \* ~7 a6 F. Z6 N3 H2 m
% M) F% u1 W3 Q2 {
- r* \+ T6 z1 X& ?
+ @' O8 k6 u9 M, t8 T& M7 U3 @$ x" K+ G1 u
if __name__ == '__main__':$ w$ ~& F: q# E; I( Y4 P& {
D* |: ?/ a6 e% [- {- * }! r! q! r) p) F2 @3 C, M+ B/ n9 l
# _! G' N+ s' g0 @2 S( n
8 V3 H* z4 u' \6 h! _/ s& f* X8 }
4 }3 N i% {1 v: f; l4 U/ F( a
- ) A/ o5 x6 I1 ~6 q
( O& y5 `9 d; G4 h! E
for i in range(5):
$ S# R4 Q1 w6 \+ S- {. ?2 h" v6 R0 D
3 c/ Z+ M r! L* p' B: J- c, W% h5 M
p = multiprocessing.Process(target=foo, args=(i,))
; J: C* X' I2 }4 M; M% h# _: B
, ` @1 Q. s/ e1 w# w! {- $ b8 p8 r- E+ q7 d; }
2 x% U3 z5 V0 E' m6 U0 T% i# T# S p.start()9 P& s' v% ~& W V5 V; B
7 @5 J @9 B4 ?3 h* D, c4 z" T5 `7 ~" B
运行结果: - 2 u* `; E% A+ {3 E3 Q4 Z0 z
, e* J }! T; {+ k* [这里是 Process-2
; t' K& [3 ~0 d* ^& M
! i% e. A3 |* z" G5 `. q& B - 2 z5 F* O4 F U& e! H0 p
5 [4 i F- `/ R, c模块名称: __mp_main__6 |: ^" @' @5 d2 h4 i0 D0 B+ E
. m: k6 ~, }; d) q7 A
2 R( j2 @5 q3 o# \5 y+ K7 ~
) y+ ~& h: ?9 i父进程 id: 880
1 H' K8 V# H i+ R1 F! L! p% Z- R
- ; y" ~# x$ A5 L' M" W6 z2 N! N# E. h
, G) ?# x2 Q0 _3 i6 c4 O5 a- G) P2 H* J当前子进程 id: 5260 u: Z; z- i& _$ y$ g7 |
8 V, Q( X5 {* ]( k
& z) B% |% \' Q: r2 C) s) W8 y; k- p# N' x
--------------" q: b1 w" o! x- L0 t5 F2 g
& N b0 I; N( y) u6 A* R7 d& j3 h# @. u
- # H0 B$ n7 Z& h9 Q- o+ `5 ~
7 ?+ [1 R& s0 |) N f
这里是 Process-3
8 }) P) u7 C- q- l, T) f. k
; a/ S# `6 j7 \2 V - " Q6 V0 i: N3 i: v! i1 F
3 y: g/ K& }3 H1 h3 r- P
模块名称: __mp_main__
4 E( Z& P$ X% j) t6 b2 @: d; @; I6 W6 E! I; ^+ S3 Z5 o- T
- o; V- c& q* |7 l# l$ L0 A" r5 O2 W8 z0 G) C; B
父进程 id: 880
8 e" l, Y5 }( [* m0 p
9 L: q8 d- o4 D0 c
1 ]) P, E& M' o, V" [$ k6 q* p2 D6 u- B5 ^0 Q# a7 _! z
当前子进程 id: 4912
5 T6 r$ J9 x- [! {; j- B' Z! K0 l; j; K
- 3 c- j0 k0 T% n8 A7 d# Y' q B' B
$ ]5 q& f/ r7 S: D" B' w--------------
0 ?6 {; u0 m) c; P- I/ @% Y+ s. T3 v! c
- & }0 D! c* F* P9 y
; {) r4 e; J3 Y) n! k- x
这里是 Process-4 I/ m0 j' r ]* c
7 ?- V! u# S; K( }, d- ~* I - 5 ?$ e3 r- Z8 ^ m# r( S
3 b: f& v; e- f3 Q# s& z
模块名称: __mp_main__
+ m* ]! e5 K, ~( K! w3 {6 o+ o
5 `6 ], N( P3 w" ~" `' E - - j/ s$ s4 t( O' g% m7 }
4 V( F0 y0 S7 g0 W) @; G2 o& G父进程 id: 880
. ^, r$ S9 ~) A) }( K' M" O+ }, q! R1 p: Y: C% C; |2 h1 }4 Z) w8 G
2 _( P9 Q8 T9 t9 r
# e( G; F7 e4 F# M/ R& a当前子进程 id: 5176
4 o- ~( E, v- E3 ^' Q
. s1 Z/ Q4 ]( X# E- * J, Q# E0 V4 v# {4 X
& I. a9 K' G7 I W6 U/ X# U Y--------------
9 c# w- q# S6 F1 W* C+ J4 u- e
- 8 B9 Y# C2 V! M8 L/ W; L9 ]
: o& ]. x8 r8 v8 [+ }
这里是 Process-10 C3 c) v% _: U4 N
( f B9 I# }5 _0 O3 u
- ; y/ F- w" \2 w- [ M$ {( A
+ B8 t6 B# Z: a* I模块名称: __mp_main__
' K, Y3 D! s/ o5 v" d8 i7 c/ t- j& m9 E, g. r: n0 ~' k
- . }, B/ K( {/ a5 X2 N
4 W- q2 B+ }% n2 C父进程 id: 880
' k* @2 J& v: a7 S5 L# O' L% B K' w5 F- w# i& `. L7 u2 ^3 B
- " F) F8 N3 k# R- f1 I7 o3 I1 d! F
* w, x6 V1 f+ {+ _6 E0 \6 v
当前子进程 id: 53804 s0 ~ S2 S/ I b: Q. T
! ~# m! [, Q1 t- B4 Q7 S$ M+ Z; y
- 2 U$ f% I7 @% a, a" B& T
4 a% L1 Y% d | U/ _1 ~--------------: E$ w2 R+ }% G. g* t) D- s
R O; f9 h: @
- . D0 N) b' [" f& ^/ \
2 [1 ]- h3 B$ A# z" w
这里是 Process-5
& `* ]9 G0 r7 s' j! n; C$ ]# B! N6 j3 \1 Y7 S. Y$ A' @) {
- 8 W" \: n. v4 F) x0 s
% q9 h+ D+ K8 ^- }( U
模块名称: __mp_main__. {2 ~# B) r9 i! `% x4 s# I
% S0 ^' J" {+ A# f. q; X8 a+ R
- ) Z, v- R! E. u. A; u n4 A5 A
! T' C, h0 _# H4 v2 p# k父进程 id: 880* [" `, D. K- [. m& Q
5 m( e3 E! j l9 b; ]) s# B6 _
8 r+ T, K6 T& [/ [. L q4 j; H' T
7 p4 j1 Y4 G5 c+ n L. _! A9 j当前子进程 id: 3520% n+ d' H l6 w9 X
( |; d) l4 g0 E4 }$ }- , o8 u; q, |$ ?' m# r
) a) R) I6 A2 q! y+ R& h8 }& A
--------------
7 G1 W8 v$ g' s- W$ i6 \# x- |5 H( o4 T
+ {7 a8 @5 R' D% a2 @6 F. y6 [ 1. 进程间的数据共享在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。 创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。 下面我们尝试用一个全局列表来实现进程间的数据共享: - 0 T8 J& s A% w; s
' [9 B3 b( D" L$ c- U0 d1 gfrom multiprocessing import Process
2 [" F; u ]+ ? E/ A$ p& S6 P4 A0 r* p# @- ]& A
- . o! T3 C6 S* M) v$ I! n
/ l$ Q- `2 j. Q4 M0 L) Y0 t/ |3 ^8 Z9 @5 b5 g8 W, @. g9 Z
! b% b. T* O1 ?+ Y! @, W - $ F) g% N8 B, a! d. k( d2 o
( A, P, l8 M$ \1 s1 Elis = []- [ l j7 @: P S0 A! i1 Y
x* C! i9 V: B5 T! C
- , n5 h) j4 b. k9 Y% b5 g
F( d8 h& N5 |, F9 X
7 [/ {. x7 n8 y1 q% Z, g' b# W; b) ?3 Z" u/ q6 R: G6 h; H
- }8 p, u) }4 Z- d
5 Y4 q4 \+ A. Y) ~def foo(i):2 M2 w4 f/ Z0 B' P' @% H
( x+ o/ }/ h# s' `! x- h* n
- 4 {! n7 `& R a
2 v A* D- H3 P4 T$ \
lis.append(i)
4 S% U5 b# { r2 v- f7 r' H3 N5 h) Q
0 m0 \% q9 `& t" w - ]4 l! D7 |4 _ s. q: C- N7 A- F
4 W; V4 p6 X" q1 s3 T! }' W print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis))
. Y% m6 n/ B/ W9 J# u( c; `2 @* V1 T( M
+ A5 c$ }1 X" f
% d. z! b3 i7 K0 }! U2 D* \1 E) M
7 t6 X8 e* s2 E% G$ k3 p
" n e0 g7 t$ B! a1 E# d% b/ D) E
# @ ?1 `) |2 k4 ? E
0 h) j$ e3 d9 U/ ?if __name__ == '__main__':- s, m9 J$ H& n4 Q8 f
5 M+ f- E- p, }8 _- / H2 Z6 O5 v1 v3 b5 u B
3 \- j1 l+ g0 Z$ O" f$ l for i in range(5):( [$ K" i) @$ y5 B% ]: x* |2 ~
! Z: m) {, H5 F D
- 0 F* F5 `% q% \
6 D6 C2 n& n$ a4 n! g% t p = Process(target=foo, args=(i,))' v! K3 Z* P5 v2 }( {
& s7 M) S0 x9 Q/ ^$ { e
7 c- ?( T. w) |* B: ]/ }2 H- }/ J. P. K, j" y
p.start()
# |: q* q3 q4 w1 g, X/ \8 _7 T, J7 c1 {1 t1 h
- 7 p9 u( D" H2 R) _6 P1 h
S/ R. X: [2 X' h7 @ print("The end of list_1:", lis) O) _) h) {. i
- j+ b B W9 g% W2 {) F+ m; o7 v) D* _! @! ]# h& E/ e
运行结果: - % H+ s0 |+ z$ l% C6 [2 m5 D
P/ ~8 p+ a3 T6 G* f4 J8 z: T: J
The end of list_1: []
& a- @. a: `4 E* M& k% n0 r9 c5 t; Y) T6 Q* I
% c; W1 _& c# \' m& r
& G K& X# B- A# uThis is Process 2 and lis is [2] and lis.address is 40356744
$ C: h. Y- \) N1 ~ S8 q
7 t) w; B% k" h- ' l6 U: r: ^, q- H, f2 o# |
$ @: ~% m8 d1 ]4 O% Q
This is Process 1 and lis is [1] and lis.address is 40291208
/ J& t9 ^4 u( l9 w8 V, W- p, Y$ B% E5 o; k/ M. k
I0 i. C! y/ @: k* G* ]4 D5 U. L
# E, Z$ Y: b( o. d- m1 pThis is Process 0 and lis is [0] and lis.address is 40291208 Z0 f) J! i$ V0 }
3 z k2 P3 L0 e
6 K' `0 j* |. n+ v3 ?& I, t' H3 _5 n& H4 h. [
This is Process 3 and lis is [3] and lis.address is 402256721 R' g2 D* G0 [/ r# S5 {4 b
* E$ m- ]+ @2 P. ?+ }$ f! ~! w- " v& c. |6 ?5 D
# D8 K% w' ?; t* E ^7 CThis is Process 4 and lis is [4] and lis.address is 40291208* j& ~4 K' V/ ^1 e* E7 c
, P8 v7 e; |! L6 o8 j' i" X9 W! e
( v+ W; M* e# b) t- J' P$ T0 R
可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。 想要在进程之间进行数据共享可以使用Queues、Array和Manager这三个multiprocess模块提供的类。 1.1 使用Array共享数据对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系:
3 t1 n; O% o7 B! b7 R
* z: i- E* u( Q" _'c': ctypes.c_char, 'u': ctypes.c_wchar,; I z3 U( a+ F6 `& B. @- S
% S! a* c5 y' t" V: m3 o7 Y
6 r) J6 I/ p" b4 a, b
- h8 W' _) |2 V7 a% Y'b': ctypes.c_byte, 'B': ctypes.c_ubyte,: I4 Q8 G3 f3 R) I. C
( o& A, |2 n$ K8 [8 ~+ U9 S
- 4 M# \! a0 P; t- c* O4 a
) Z3 [2 o. e; y'h': ctypes.c_short, 'H': ctypes.c_ushort,
1 M/ a/ N! ?: q I( k1 i) X) n+ M* `* S b
- ' r4 e5 ^8 T2 G6 ~. [, k& j
0 D, L! f6 L+ [5 X ]2 ?3 N'i': ctypes.c_int, 'I': ctypes.c_uint,. V- Y! o9 s v4 D( l( g9 F
$ I% ~; @) ~, L% @$ v, O
) j; k. c) H2 H, H! {) U4 R6 T3 h9 ~/ T9 v. _: p
'l': ctypes.c_long, 'L': ctypes.c_ulong,- l$ w" s% i9 ?5 n+ t
' X2 G" J7 U' k r
- 1 |5 w. l2 H4 {: `; J1 \
3 X O" Z! I5 ~, `; P'f': ctypes.c_float, 'd': ctypes.c_double7 \1 \5 s" ^# k3 A! [: Z- ^9 M8 S
+ y& L- K, _( Q
/ J, V8 L, R# v, x2 [0 ]' K
看下面的例子:
( x. B6 R. s, m/ f$ S9 n3 b- H2 }: {% ~; I( I% W) Q) P* ]0 |9 [
from multiprocessing import Process
2 ^7 u) L/ H5 e: o! E
( o7 h0 X* z- m$ O' k3 N! N7 A. S- 6 {6 n- b. o# H- P! ]( T: d6 ^
# o0 a4 F- F) C5 b/ m, ~. _from multiprocessing import Array
1 e4 A# a# _# y- t K! Z! _9 |: Q7 q9 F! E( z, Z: F0 `
- * U- ]( S' C* Q9 }4 \ g/ k
) n+ a* n) k( f
; k0 \; g3 K% k( w
4 P8 I2 {* Q$ P( B# s. f3 w Q+ W8 e
0 Y$ e( K0 |6 I* [( q4 N: \* Z) P& k" B3 M% V
def func(i,temp):
8 @8 v1 c& f9 @. i$ x* @/ ]. G* `2 _2 Z8 q2 \: R% z- E( X
- 6 R, g+ J5 V; h Y" w! v
- z2 N; @2 t1 z* E1 v
temp[0] += 100
P' o9 X/ A( V: ]- O8 Y) [/ Q! f- u2 Q# Y4 @( l- X: ]# ]- l% H
- j3 S2 G8 @2 C5 B( |9 W) ~7 t" f
4 B4 o0 F- h/ d print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0])
" w" Y& X/ r& O' ^4 _" H( j4 v
5 C$ K6 w' S# h0 J6 M7 W, S
: }' M% f" U% x# s4 d# ^+ C2 h: G$ Q/ s4 C7 q
% t [+ G- q- G3 s! {0 y# u
% ]' Y& {: F( y6 N) W- 9 B4 r# @3 n: V6 O- V
" {; `8 z0 s5 k3 _if __name__ == '__main__':, {3 W: U2 }0 ?7 j6 j8 C: |& q
/ t _. c4 i/ E
! \$ C" f O' o0 @ D9 F3 h- ]) @- D g
temp = Array('i', [1, 2, 3, 4])
' F; N& l# K2 ^5 T5 N* L" a4 B" s% b) L) A
- " G" Q" ^0 c9 J$ w3 S8 n
& b; a9 n6 j7 c for i in range(10):
, b8 d) X( q: D( c( a8 H% ^' E$ ]/ | i2 F# E# P
- 8 y( F, g* q# t- h( @2 {* o; U
0 r$ z2 G6 h7 v/ V3 @: u9 d. ^2 X4 o p = Process(target=func, args=(i, temp))2 E! u4 N" E; Q& O$ E+ R: X
' |- j0 E" B* y! q4 K! w" G: n' d
- : l6 D- Q" H+ Z, L; `
/ F; X( H, D/ s1 g* X
p.start()2 I) I6 G! j o- T Q T6 R
& ~0 p- g5 R8 [5 ~" V" ?
( o+ c& g5 _6 g2 ~/ j7 f
运行结果:
% D* i/ k9 t0 {; ^
/ c" ?1 X7 q/ R& X) Z6 c- `5 o) l. ~6 c进程2 修改数组第一个元素后-----> 101! f1 L6 D) p" _- N' ]0 \
6 y% ]# P0 x3 y: y
3 B6 R) b6 W+ C' r { m# q) g) l
进程4 修改数组第一个元素后-----> 201" g, ?- _' z8 A8 U2 U. U* x1 D
4 I( N' _# c1 B7 j* k# u
! P& l) V5 a1 `+ g# X+ w$ C2 c9 s W0 L8 S! V( D3 b
进程5 修改数组第一个元素后-----> 301. N5 m9 ]$ I# {% m @
* ?6 B# N W, l$ c$ P! H+ Y. m' v
- Z# ^. i( e6 |/ ]' [( Y9 y# P& U j* \. T6 r% b: z' v" i
进程3 修改数组第一个元素后-----> 401; L2 d- j6 O& H2 m0 l. e, m+ e
0 ?8 Q+ `+ v+ ^# ~, s
: [: w4 c, \; t, Q- u: t/ ]
5 Y/ Q0 o0 X v8 T$ C3 m- S进程1 修改数组第一个元素后-----> 501
; z6 }4 }" g0 p# ]9 P+ X0 l$ o9 T$ D( e1 ~4 [& X7 ^% V0 |7 u# C
- + y+ X7 Q7 g2 c, z/ O! }- {
& j" `; t4 E% S% G' F" s* p \: P
进程6 修改数组第一个元素后-----> 601
( \# s1 a5 @# p/ C6 s) P1 \
8 R) l% J. \# k' L! {4 _ - ! b) R1 L* l: ~- S* g( T% l
7 h% f% ]" {9 }9 H
进程9 修改数组第一个元素后-----> 701
/ F8 f0 V5 Q4 z
* p. P6 z5 q6 J - , o% \+ b0 q: F: ^* K
" f5 _5 ]1 e! |+ |9 d
进程8 修改数组第一个元素后-----> 801
2 c& C9 V& Y. b; U7 V8 G0 L8 r5 F4 w0 _* B7 u1 z
- " Y/ D& l$ }- i: M# q( v
* o0 A/ @9 z8 H' l% Q J
进程0 修改数组第一个元素后-----> 901
1 ]/ @! G1 l9 F# L* L K" t1 @1 ]" S9 G/ b2 k
- . w3 O+ A, D( ^ S
% ~* r$ Y9 N1 v. z7 S( Q; S& `% B
进程7 修改数组第一个元素后-----> 1001 D0 x7 n; O8 Y" y) d% B7 x
0 k) z& N- \. T% M7 `$ D! n5 M
1.2 使用Manager共享数据通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。
8 u H9 ~! ^' Y3 a% j+ o6 r! W4 v# a I( c
from multiprocessing import Process
) m( h( G! [! E1 ]! P
) j& R X, F# c6 m' z1 X8 Z- & J( H+ {+ r" O% G3 Y& Q/ k
Y( F; g) ?& ?! Yfrom multiprocessing import Manager5 r7 I9 H! a' `
) u* t2 ^. w' x3 U& }0 @' U - . m- V R$ E1 i1 Q
- p* ~9 V4 v3 R$ J) O R6 q; z+ ?
4 g8 H- w- O, |+ g2 h' r. m
# m, Y& V h5 Y1 Z( {: r% O
" N* `" X4 ?" c$ F5 G9 S! l1 X. m) j" ^7 Q; A/ A% I$ t
def func(i, dic):& ?9 B% m! A" u
7 F7 p5 o' p# H( ?5 I
, V; h. Z$ s" H( u/ z7 R8 P4 j* ?0 P* K4 x$ K
dic["num"] = 100+i
' J L* O; Z _3 g/ H6 j
( T- U. p8 z# o0 M7 M
R0 j$ ]9 V0 P
" ]3 S. i0 s7 X% F: g9 K. t3 } print(dic.items())0 f$ `+ z9 q, Z8 t4 j2 Q
: [/ D3 M; |9 U% x1 V) R4 ^; n- # k9 k" Q) D* {- p
0 w/ v# T) ?0 @7 \0 O4 X2 L/ U3 l8 w* y" m
Y$ ]- t6 L3 C0 K/ C - " ~/ p% L% ^7 m& q- s. W" Z. q) ~
" s6 ]7 w% r& {8 l& r
if __name__ == '__main__':; ~6 |( j; E( z* _9 `7 p
: T8 w4 d" }! F, Q4 f - ( A! ~- b+ _" O, m5 u
! o/ K8 I! \* {( g8 h1 W1 S# I: a: k
dic = Manager().dict()
4 k, Q" Y" g0 @3 o; N" q
# C5 x, Y$ e+ R1 V/ Y1 b/ s! m - / O$ {* v' T9 _" i
6 {" v2 n. S& l4 p- \! J) K$ i for i in range(10):3 q P3 n8 p7 ]+ M/ r, t6 l0 ~( w: C# v
2 M" Y8 Y7 H2 A, ~ - 4 X3 u% t6 ]7 A& e. i
' f$ Q4 ~* a* }: f s
p = Process(target=func, args=(i, dic))& o x: x% u9 u7 ?( ?
4 i2 F5 ]% V) ]/ p' N- Z# w
- / F3 ?4 R" S1 k7 k! |) `
7 G) b S/ j% b, N p.start()/ |5 G8 S1 h2 l. I" @- W7 K
' }1 H9 M! W P9 [: g
$ X C- P* x. R6 k
2 k. k3 u0 ]+ a( }7 j9 S p.join()$ H! g' K5 T) K" @7 E' m
% S* L7 w9 I% B$ |; ?
* K/ {1 ~. {8 R) L8 S
运行结果: - ! a5 R+ \# n0 X5 K
2 f7 T* A2 g; g[('num', 100)]% @) e2 e/ k# `* H& S* g
: Y. C1 }1 a, C/ X% g8 y$ P - ! V( m- q U8 X4 K& O8 j0 t5 M
! Z* `( G" z# D7 c7 \+ E" a3 c[('num', 101)]
& h5 f9 ]) q) b
3 }6 E( g+ }5 i1 h! G3 M - % @: ?6 X2 k7 `% H, k" A% z; N7 i
9 C/ B. }- R$ _
[('num', 102)]% p" z5 n' I& U, ~4 |. w# t
2 ~6 W& `5 n7 B" D6 n1 K
3 n: B9 U( H8 S, Y6 d1 z, N2 d( B$ M$ D/ x0 I/ h
[('num', 103)]
& ^0 W7 C N2 f! _8 _
: K6 [* _: X* V: |7 m( I2 M3 V; [- 1 X8 r; E( N: Z6 q f, K
( w/ ^! l) d( d( D2 Z( F6 v% r[('num', 104)]
: K: h# F; n. j8 p6 V u* q2 ?; C! l! ]1 d
- 7 }9 h8 p7 _. \; i+ W
0 c, B3 r# x ?[('num', 105)] t$ b2 v5 p) D# C7 h+ K E0 r
* L: d/ q# [5 o - 6 f A, [9 E* ^4 d. o
& G1 V8 X% y7 f& L1 _" w: I[('num', 106)]' d2 V/ G7 p+ R" ~) c% A
3 i" G9 K; H0 p% F& {" W, L
/ }' p r1 m' A: ?! `) O: c; A( ?- n, ~0 H
[('num', 107)]
9 ^9 s4 L4 b0 d6 }, k; T: g; Q9 o; N6 P! a5 m, O& k: k
- B; e8 N `5 Q' N" a/ W$ f0 M" V0 I3 s" r, n7 D" H- h0 s* S4 r8 P
[('num', 108)]2 s6 d: P3 J0 N/ W" R
6 X) W) l) D7 Q/ h
" c! Q" R8 ?2 M$ ?( a t: T7 W I _
% w4 Q+ Y2 v" k! x( V7 [+ W[('num', 109)]
8 u9 D7 [1 [$ f8 T
" C- K1 T6 e, H3 Q( Y
8 ?. j# O M( N- @; l 1.3 使用queues的Queue类共享数据multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示:
# l% f1 o5 p% ^5 u$ [
& @) B& O/ E* r8 H0 J6 M) Qimport multiprocessing S6 G e7 d& H2 D
# ?: U8 o( R1 m+ ]
2 \! ~+ ^5 J: F) q, w* _# d. E% A0 L7 E- v9 h$ V8 A/ e
from multiprocessing import Process8 R$ z2 [. O# V( z: R5 E
7 M; x+ ` V9 j# o' }" e0 X
^0 t- ?. d% W7 Q, o9 r# J2 K1 D0 J6 H* j/ N4 G
from multiprocessing import queues
3 J5 _/ M9 H4 G( O, q; l' u" y# a1 @6 L+ u- Q) Y% h
* A |- t3 m" r5 c5 X' g- n) |" _7 T+ D& X8 F) w/ y$ w( ~& M
/ i* m" P1 S0 r& I: z# Q4 _/ ~% w! U% ]) r* M) v9 S
- - W' N7 ~$ R5 x. z9 G8 _. k
3 x: L0 w; ~+ ?* J
def func(i, q):4 D" }7 J* U: |$ v6 Z
4 O/ ~6 k2 e. O- Y
5 ~0 m* M& F. P7 Y+ u8 Q2 o! l% _4 { O( ]. A
ret = q.get()
& j. ?8 B1 y# L- t* }
+ O+ O4 Y" D& }
9 ]) w* d* ~2 r
t, R4 w; T3 v, Q print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i))8 P1 ?2 `* _. C' w6 J; _
& @* Y5 e; C8 \0 {
+ Q0 L0 l! ?* }2 ?4 }* D m! p- ?6 }: M- Z4 Y2 Q8 p3 i
q.put(i)$ G, N i( L7 f `6 h! S+ B
) f- y3 ?& { s. z$ E
2 u# d7 D/ Q7 O
9 e% S Y) M, l5 u1 ~" Y
& y$ l. }' ]* S2 \+ v* i$ C; g0 f+ ]" N8 S/ b0 I5 s
- + i/ @$ Y; ?4 W2 L$ j. h
+ f* [7 ?) K1 ~7 ]
if __name__ == "__main__":. L. n( `% ^6 U5 Q. T
) Q. Y" H0 B" x) ^/ J
0 P X# I8 U* T% N
* G# a% f1 J- H lis = queues.Queue(20, ctx=multiprocessing)/ L; \ [' c* z/ T1 h1 g; E
! u; F- ^+ m0 ^4 E+ \
- 2 d7 E- W7 M! ~" z5 p3 P# E
. m2 b( L, G! n: R5 p
lis.put(0)
" R6 q7 a, S) o+ w" J- s
( U4 K- \+ ~& n" k - : C' K: g- `$ R& X: b! G* F: ?
0 l" W [; z8 s |1 N& x2 c/ d$ M; b
for i in range(10):4 p0 W6 c2 h1 ^
% z9 I2 M; {& Y1 w/ p9 b' I - $ O; L9 H: [5 U# g
" F5 r( O& I- t: m* Z" H
p = Process(target=func, args=(i, lis,))1 V8 t3 m# a1 M4 G0 Z6 w
, Y9 G% J" C" ?
- ]. W& l8 Z2 ~. i8 R
" ~9 i/ T/ r1 y- P p.start()9 S0 J3 O( q8 @1 c8 O
1 C+ z5 ?. x9 P! k4 O5 |& A' W2 ]- W, X) x" k
运行结果: - 4 b( s6 T; z% M- \+ c1 M f
6 n a2 Z+ M& a4 B进程1从队列里获取了一个0,然后又向队列里放入了一个1/ j, ~, e% j" W- q4 x% w" V
M& [0 I/ m+ A/ p1 Q' T `' V
: [6 D' l9 V2 T( B7 |, U+ a: V1 b/ U) @1 s) Q
进程4从队列里获取了一个1,然后又向队列里放入了一个4/ P, n& Z* L: T% l0 w& h4 a# t- Z/ R
& |6 ]( b! ~* e4 Z4 X% Q- ' f3 u& S; T# w
, W1 J! M6 j) T# r+ T; X
进程2从队列里获取了一个4,然后又向队列里放入了一个2
5 e8 p- o0 l! C9 _2 Z$ t& L
6 ` g% z l! g& h+ E3 k4 M - 2 j1 }1 T k* C7 U; M
; g/ p1 ` k5 }' {) \. T# t! m
进程6从队列里获取了一个2,然后又向队列里放入了一个6
8 z+ D. a# w* b" x
% U- l. v# i+ R/ z' G
( K" z' @: N3 f7 S& q$ u7 N9 i0 x* K0 O3 [0 V
进程0从队列里获取了一个6,然后又向队列里放入了一个0: s, l( O4 k6 j) f3 E* P8 g
4 c& X4 Z8 X. d; J) e- X
8 [; j4 n8 b! c. x4 p9 U
; N$ j4 a' F! w& [. w进程5从队列里获取了一个0,然后又向队列里放入了一个5
5 J( H0 @ h8 t C2 {
3 P0 ]6 e0 a+ q) @+ P3 t9 n6 H
+ h2 G6 T$ V8 A" J& ?' \& a1 I/ t% B9 } Y2 e" ?5 s3 y4 ?
进程9从队列里获取了一个5,然后又向队列里放入了一个98 J8 ~. r1 G% M
* Z8 L6 F1 T7 \- s, b- - o. ?% W0 ] Z, o& D( r4 j: W
O& g9 `% R9 }8 o进程7从队列里获取了一个9,然后又向队列里放入了一个76 F7 }0 T% Q( H1 V
# _: L' \7 b; |
- * N. O0 G9 o2 @4 m% H1 F+ [$ _
; m& }7 }3 N7 z3 v. d进程3从队列里获取了一个7,然后又向队列里放入了一个33 R9 Y g% G% |5 U
2 B: N, z4 }9 ], {( U
- * D% |! Q3 g$ o# I& ^
$ o! ^; _2 D3 z/ N) X4 s. a$ ?& ^& I进程8从队列里获取了一个3,然后又向队列里放入了一个80 U+ A' v. y/ \/ D
; O1 L; l/ }0 y% V" ?% D+ e8 _3 X' `) B9 ?3 B
关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。 2. 进程锁为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的,这一点非常友好!
2 L; R- `% R7 b
; w/ p$ l( i8 V3 }( Afrom multiprocessing import Process
5 G7 u& t4 N5 S
& T) e3 Z5 {) ^7 k+ f2 b/ f, o0 i: K" Y
- ~2 k N1 w% _; [* h+ J5 v0 ?( [! q' l
from multiprocessing import Array% v& J+ _% Z+ q Q3 X! ^0 V+ y& T
" @- D& l: k5 S& [3 C+ w+ k- + }! A7 g7 o% y# t! N2 T
. j) a" n7 q: r% a% y1 w
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
3 O, t! {% I/ b% h( \! }! Q1 [
7 ~# u2 G7 p/ p# c" n
& P4 B' S( ] |5 V& F
* |6 f9 x3 |$ i' @7 uimport time
2 z& o2 F& L+ @7 z' i# Z
$ `+ r. w5 Z) c0 z( c, w
6 D! y: i1 B0 U3 c& G, r/ y B# W3 a! a' g2 c
0 L* \% n% h5 r9 B
$ Q( ?0 Z% O6 T% n! p' D3 m
6 C2 \0 X) e# H9 Y/ q" d( o6 o' A! x3 N6 i0 m: c7 @) V
def func(i,lis,lc):
& z* L5 D3 j. Y9 h9 v( q2 m1 m# m4 \, W! b! k5 k- `1 m2 Z- K
' P; g6 M, ?8 ], H
% F. Y: |1 G$ j; [ lc.acquire()
" X/ q0 }- g: p* r) b6 U, o* L5 I3 ]& Z! B. `. G" T
- 8 e/ d0 g0 p# A9 [
N( z6 j2 } p- \- Z
lis[0] = lis[0] - 1
$ E3 h; }5 J9 _8 L6 E
! s4 `: k5 \5 e, D2 B - ( f, d& o* [6 J7 W% }' R
# g+ q. ]/ F8 u$ y time.sleep(1)
8 O% {0 O5 Z. w3 b7 L
! p, i: c2 x/ N) p4 p0 ` - + a6 i5 v- a4 N% b! a `
+ s2 F. {% o, U {3 ~+ ]
print('say hi', lis[0])% x/ j3 e9 M# e3 u' F
6 p/ x5 Z# A& c
. a2 j. o* B: H1 {* `( G9 y" B* P9 N$ U, ], F/ p/ S) s0 Q
lc.release()" s. [8 ~4 t) s8 {
* g% q% P7 j v1 L& Q
- % U* T; j8 q6 q! E
' L( ?! S; V+ i" \' D6 X% c3 E/ _% g& n
% o! [9 ~1 k; i2 O5 Y. S
- / B1 P2 R4 s9 v
2 O5 f0 h0 c" q* q- V; _' lif __name__ == "__main__":- R7 r( T) U6 n' p
M2 e! V5 H. ? - 5 u3 s2 h' d& Y! R5 F) p
/ C" X, ]) `% H- _; N array = Array('i', 1)3 Q: b5 m3 R( \
2 W3 j& J4 K0 N5 ^
# @0 u" F+ _& S0 a+ Y W# {/ I- g/ l% D! }% N$ L
array[0] = 108 y" a9 c! v8 T# L, g- v
' z$ O# L" _' g/ x' f2 X
0 h6 E" U, J$ i) B. ~' |4 \* ?: Y8 }+ t( r2 g9 f6 F7 k% G
lock = RLock()
" E: q! y6 P. f. F3 ?0 S
4 r6 @9 o" W9 o8 I" p* d/ ?- T
H$ V/ P: i- E* s# H9 f9 D' R- b+ p* Y8 t
for i in range(10):
& M& |( U) p0 ]. Y; g& f- C, K! I! D# ]0 `+ H6 v: U
- 1 P$ |' i; V( @: V
% k5 R$ _$ {! m( _) W, g
p = Process(target=func, args=(i, array, lock))+ p, o9 W+ s0 `' q; Z% W" T
! s7 x: O3 d% t1 {' U! F - ; g2 E' Y0 L- |+ f- Q# Z) z
* [# o% }! T7 Q/ i7 Q
p.start()
2 Z9 z6 w1 P2 l2 B6 {0 l6 [3 h p/ ^- {4 @
# W& `0 F! @5 Y, m2 y/ A
运行结果: - ' F0 W. `' l7 h, y' g: ]
5 b& X' f# x& p/ {5 j, F+ @
say hi 9/ f; A7 j1 M! N. u' a$ E+ ^
) f5 m6 C3 J7 v! s
" C) T% y9 K* ]- C z" [( `) W! j: e) D! z; {* @
say hi 8
4 w6 W }7 @6 e2 F
e! n, L/ ?. |. B" V- " U0 c2 h7 C, a
' O3 J+ w3 w, l% T
say hi 7
4 o% [- |& O/ t$ G! {$ L% C% C
& c* e& {; I" c: z/ M4 m2 r
- U: V6 }) f* P3 ~$ X7 r) I9 ?. @) Q; H; Y
say hi 6+ |' s( \1 p( E0 \8 C+ X2 Z# N- l3 `
o6 V9 c8 h! E o; D8 ?
2 h: p: T5 i5 l' x( l" a) }, c
1 F- L8 C4 K- e& `( m# lsay hi 5
) T o* j3 c. a" o$ v4 q% k# X' q7 m! s, [) \. f1 G$ e" E0 r% R; L F
- $ a% w9 Z# Y7 N1 G
. U1 u$ R" S% f( q9 }
say hi 4
( S% d% C- f2 O/ ~* q0 l7 B, }' g' q5 D; v. O$ V; n
- ' w, u" }+ {$ L/ b! A" @% \9 s- R3 [
; O2 Q6 r2 v# E+ v- v% _% a
say hi 3
. L% W; F& E, K# ]2 C" {( Y8 I% D) x. A' T5 S
) o: z9 g' Y7 N4 F; O( d6 i2 |1 B' F2 {$ x$ q
say hi 2+ Z" q0 q5 o- l- N7 Z# }
" q4 ~' }9 H$ J7 R, K
4 p5 o6 a. s; Q% m8 H: o) L' A' K+ [9 L1 Z; e0 `# {
say hi 14 Y' h P7 [' I& F k' e* e
8 g% L2 K: V0 d; B. Y* p6 D6 V* E
. Z9 m7 ^8 H0 L( b2 t% z7 m2 |6 W( u, W6 @+ f4 V6 @9 ~( i) Y! T' l
say hi 08 A1 ^ U7 _5 G) k8 j2 ]! f |
: W/ Z9 ^' T+ ~
7 ~& \' z$ g4 T Z! x0 } 3. 进程池Pool类进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。 比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。 进程池中常用的方法:
7 u( a1 T: H+ [! `, K* `
8 k3 I7 P9 u# J y1 ^from multiprocessing import Pool
7 i" @2 x; f: x3 r$ \
) j/ ~& [5 a- Q4 c: o( E: j8 N; }
u& B6 o- X4 H6 [4 S: t
' S( B) e' G2 d9 K4 b/ `( ^, }import time: S+ c) c! y1 V9 ^0 l& G" K; G
. T( p- {! K& v& l9 k4 y- $ Q' ^) c; I! O
/ I- x; F! [, N3 W. E) R! A3 `. v0 N& A9 d& i# @
' D- E9 g* g( C: y3 F/ `
- , _) D% z+ b3 b7 T8 f: R% v5 Z ?
& o+ Q4 ^! @; _# odef func(args):
4 \* u9 N7 e' F( R% @1 S! H: ?& l5 A6 Q$ H' r
- 6 w" Y7 t: ?: o
# }' x+ f+ M9 X* T# V
time.sleep(1)! z9 ?1 z; X; y% ?4 J
/ p' @2 O6 h* }
- 4 p2 S% Z. j6 A# h: A
S; q( b; l) I
print("正在执行进程 ", args)% ]0 f- }! q: j& l& \8 o: d
. A( W2 j' c- S2 N9 g- D' a; d+ U
- 1 {1 \' p& H9 e: K, o& S. r; p
) o, h2 A7 \" C! R' J+ T6 K0 ]4 q
6 Q" J7 X' b- @1 Z7 ~% f7 f. }5 m2 r$ F: M/ G
0 b+ E8 Q5 p9 O7 N. {7 p g" V% l; o
' h4 u8 C1 F( Q. E! p' H* n* N$ Dif __name__ == '__main__':# O( Z' C6 V6 W! d+ {
( [+ v( F* l0 c8 ^6 [' f v
$ _0 O& t3 S- V0 F7 |2 \' b. f X8 p8 ?. T6 K
8 R& J$ W8 n; H, q) O/ j% {3 n( ~% r( H: F0 b
. W( \1 J2 K5 l! h8 G1 c* ~3 [) E/ W" B% h
% r* W" Y: S! [" r p = Pool(5) # 创建一个包含5个进程的进程池, ]: G/ k; j$ ^8 w4 i: k. f8 C$ ?
' Z/ ~/ S' ^! L8 i( @
- 2 K8 i# D3 t0 h) @# ]) L8 p
+ z+ r: V% T; ]
5 f0 k% j: v' |! q& ]) S& P( Q( P, N+ Y. U+ M
- Y" V5 Y+ u2 U K3 Q9 ]4 D( b+ E4 k
for i in range(30):$ T4 d% S T" b
$ A1 l) }; n! P: r
6 C8 i; w: n% o- M, {1 P0 L3 P9 o9 y6 ~9 s& D/ Z
p.apply_async(func=func, args=(i,))3 ], ]* v) ?0 o+ ~" \
* i: _; | @9 y
$ [. W6 b" y! j* M* T0 h
! }9 q, f2 w+ t \$ u; {$ u7 c Q/ u" x& i8 n
4 z' ~6 ?* J* y1 q- r+ S$ j
[/ ?& E1 A, _5 s1 x1 v: O, ~/ M
V' p7 m, n! G. W5 K% V* P/ T p.close() # 等子进程执行完毕后关闭进程池+ `1 S' p+ K3 h; h. f
9 h5 r: `: ~( B, t$ [4 i3 N6 K- ]- 4 M9 A6 b9 X4 H l8 f) h5 @
/ Y7 I; h; [- ~ # time.sleep(2)
. ]/ G ^7 R c0 ~& P C; A/ h4 w0 M- f3 S
- 0 Y/ g* y/ i$ H2 [
/ c, X9 e+ L# O& h6 d
# p.terminate() # 立刻关闭进程池% q- h- o; H1 c. E% w8 p. P2 E5 t A
1 A9 B, `# Q( i0 k8 p. m, N
- % k+ l2 p. J0 d/ @/ k: z9 r
/ s& G1 d- k- j) L! {# J p.join()- g5 t; g3 {3 X8 a
2 q) ~& v4 Z+ D# W8 F2 R. s
F) r; ^; c$ [( H; u5 P
% p% G( i* ]" @/ o8 R请继续关注我 * l: p' z/ e9 A& ^5 M% W8 V
0 \ j, O! p5 W$ L1 Z
|