数学建模社区-数学中国
标题: 爬虫(七十)多进程multiprocess(六十一) [打印本页]
作者: 杨利霞 时间: 2020-5-31 10:35
标题: 爬虫(七十)多进程multiprocess(六十一)
& L7 y; ~: O0 R) w爬虫(七十)多进程multiprocess(六十一)) m( C1 o! b7 Y' |
Python中的multiprocess提供了Process类,实现进程相关的功能。但是它基于fork机制,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。
另外,在multiprocess中你既可以import大写的Process,也可以import小写的process,这两者是完全不同的东西。这种情况在Python中很多,请一定要小心和注意。
下面是一个简单的多进程例子,Process类的用法和Thread类几乎一模一样。
; r9 d% K9 `4 x8 |# Q. ^& S+ e' U
2 o! c2 d" _+ \3 z/ dimport os
* v6 o* H8 D/ b5 D( ]/ x1 J
' |9 G4 J5 y. F& m9 W9 \5 E9 _; m- ) Q2 J- z- F6 F/ t4 S! v- O7 s4 g
3 B$ N @: Q& m( @
import multiprocessing
- H( X# F, B- s" m( j! u4 N; G$ A R4 S3 ^" Z
- . b% V- I5 S& n* c/ |; |
/ ] @' c6 e/ V% Z
" D# t; E" v2 e. e( t
5 k' v9 x5 l0 g9 J" z! n
% Z% s; q0 _/ i& I: {; \. g" O' X; j5 h) U3 X
def foo(i):1 x; o7 y# e. f. N8 R* X
M5 k7 v! n+ {4 S0 C. e4 v
- ]; {) f' c/ @7 G. `
4 W% d. z% ~& h" A% e& [
# 同样的参数传递方法
% ^% [ q" R1 w7 s* T% o6 a2 d" N9 R+ l2 Z
& B" X& X' I% n* |6 |( z l# c, V6 r* P
print("这里是 ", multiprocessing.current_process().name)$ v6 I) _+ a# H2 k
# O: S$ W' }0 D7 X/ Q
8 l) }! N' r4 b' `1 r: U
* S2 ?4 U; f- @" i" N8 {, s print('模块名称:', __name__)
. I! C, a) i! Z" f, Y+ M7 D
M) T4 [5 T9 i4 ^$ N; _- 7 i6 r0 G# x% {2 y: g
' G: N" `7 e! ~/ j print('父进程 id:', os.getppid()) # 获取父进程id2 w k8 n# h2 ~) ], `. H9 O' A
$ x; u8 @% H! M7 Y2 F! s; o
- - f( m" F9 D2 f# U
2 t w$ p9 w" r1 x& X7 [ print('当前子进程 id:', os.getpid()) # 获取自己的进程id/ f; `/ C/ u" Q; F/ o( [% [+ R3 o+ q5 P
5 r. m3 ?' \$ A5 r
; D. \* {6 A& s
% w7 `! G$ I: X7 }3 F" E! u print('------------------------')
# t' s; ?, ^5 f- V
% ]7 W: ~' v, |# ]# f( K% I
; ]2 V( I: _2 E) r1 i. P* F" P" @9 Y
* l R/ ?, I# b# ^6 R4 I% }; `7 n; \6 `' v+ z
$ e) O0 M& n& J# x: v9 k r' I
: M( r8 t" ~' @0 C% Q" x' Zif __name__ == '__main__':. }- C' w+ j# W) @& |7 W
/ l1 c" ^- t: ~% j7 B
) t& I6 ], b4 ~
$ j' d4 P9 J# N1 _
! P9 @/ Y- Q8 G
; e C1 J: I% ~
4 w5 a' [: L/ _- j: n
$ H) k7 N# B4 P# `/ W for i in range(5):
, a) `' d% I2 ~, `
: K1 U3 [; z. K S2 ]' z* ?- - R) t6 l# a. g# i! [7 H) b( j2 K
+ b% S/ R5 D8 Q8 m* ~$ ` p = multiprocessing.Process(target=foo, args=(i,))
8 \5 d4 @9 y. W3 t. _
% P8 J9 s8 M5 |, x( X. z - 2 q; N$ u5 |, o4 T# W1 m- c
' g h: z, p3 P0 w: ?; N p.start()
7 X9 J& W% T7 U. R/ U8 s! s4 [ K, G7 m
+ b* b5 s3 F" {) q' U
运行结果:
7 t- |- O x5 O; M4 i. G# D
- q2 T$ o! A4 s5 j; w& I% M% D1 S这里是 Process-2/ }1 C6 B' | E% s8 {5 y
9 f, ^2 H. x ]/ J: B* I2 Q' R( y- - l3 a8 ~. s% \" V
4 ? f; D7 ]3 k+ [' S' S
模块名称: __mp_main__
& @' A S! e3 `$ N1 o2 V/ g
, J5 n0 O; {* d$ u - : X0 Q+ @! C- Z# E8 }- g
: E5 k$ u( ^- G( g# v! j7 l0 L父进程 id: 880
5 i1 ~# o2 ` A2 j- Y/ r* ^. M. }8 L# Z. j* r2 b3 K1 Z
) N3 d( _( n0 F, } [1 \6 {9 T0 d0 C" I1 @: V
当前子进程 id: 5260
3 T! N5 P' l) x& H# V5 @' a+ Q. E: w! j6 r2 t/ g# a
, i2 V& E1 V" s, X& {. k1 Z F0 E$ D" a O0 W9 x) e- T& q% s
--------------+ j5 h( c. A9 S( @7 v8 N
7 {5 j/ J- H/ Y+ l* ~( c( ^! O
- # D; R$ ^1 T0 z1 V) Q* ]; |/ a
) q6 j% I0 c4 H2 \& ]
这里是 Process-3
# H3 b% g: D, @) A. U* D$ Y/ X) h; U( q
- $ o: h5 [* j7 |4 X0 c
2 n! [1 K- V2 o模块名称: __mp_main__
6 A' z9 z I3 I( g! {) E, ~. n' v1 U* Y" t+ d7 K* \
- 0 s9 k" s5 w3 z4 B
+ R, T% G# @; G
父进程 id: 880' I* h l# `- e- O: L. R8 l
a9 W" O7 r1 N6 a9 T
- ! k6 J7 i \ |5 C8 a1 L
2 N# w6 C! g k5 E4 m
当前子进程 id: 4912
+ T9 q3 U# O! V* G0 k; N& ?3 H% l9 t5 S9 B0 I2 [/ g6 V- m
% O5 G7 t; B) V9 R g Y$ r" s
6 a- D" y7 P: `4 U v: F, M5 M' u--------------
) L9 |& G1 o* L5 s# s
* y2 i: j7 |% B# S- $ |( \) B% o' `5 h
' @ ~7 B0 e0 n: O+ a
这里是 Process-4
+ W0 f/ C z$ O' v" v, M7 N8 Y. R" l5 v2 \
- ! T& A, w0 w5 a
1 s. p2 j% p: I# h; l/ G5 O
模块名称: __mp_main__ A3 C% P0 H M
/ i8 t9 D8 v8 q1 ?" Y
. r& k. D1 N a) j& \3 f* D# Q9 ]$ N d9 F! x' k+ q( Q
父进程 id: 880; v1 v( g; g0 C( z8 f
9 I# h$ o' v) f: L- V
! H/ ]2 C( k0 I& X! t- \: u. o6 ^+ G8 H! \4 n
当前子进程 id: 5176# h! p( O7 K6 ?* ]- l
: g! J( a( z; d- [( v( j- 0 [3 Z' B' o! K% y" l# e
8 Q! V7 L% N5 Y% ?--------------+ h2 |" k0 a- E9 {
% q! x. K* H5 J! H' H1 d
. h" [' s8 I2 J. S+ d
; z& C; u1 y* K# r- ^' h" p这里是 Process-1# D% Z7 z3 Z- q
& P$ _) {$ s) B! S
- " T2 c- L/ E3 x. D+ x# E1 `* I
$ d: Z8 p9 a4 N6 ?( C& ^" X模块名称: __mp_main__3 {+ K" a% i9 K# T M0 y
1 S8 [# G' s! i3 e
- 4 I% Y7 H( n! J z! p& v" k
5 w( Y0 ?. T- |7 I$ p+ c' m
父进程 id: 880. `9 _7 d! M$ D+ N0 u+ U4 C
) B" ~, I0 ?8 }# h! U - 5 N2 a0 H1 M1 U* B
' j& m3 z1 ?8 V6 z
当前子进程 id: 5380
1 I, p) A( M% B J6 {% g% {$ h
* ^' X5 r& z% c. E/ o! A
& _ w" k; I! M& I' c7 x1 h6 F6 a& ?) R' s
--------------5 f* G" S1 e9 X! W9 h0 E4 s; s
, o3 Q0 k% N! p0 _0 ~* K% [4 G
- + [! J( y# C3 o
! j$ J" b7 `& Q. S这里是 Process-5
; Z5 f3 d) j7 a1 ^0 b
* m* s+ H1 j; F4 S I
2 J: v8 ?( l% N3 z# ^1 K D& k& X7 M+ r' C4 Q8 A7 w
模块名称: __mp_main__' L2 v5 W) A* W) ~
: z! k# [$ i6 z# ]* S9 z% N- ; F4 R( R9 H4 D7 J
% S7 e4 M% h/ G! b H% J! B: ?# H父进程 id: 8806 r# T8 s9 M5 V9 h9 Q7 ?
4 v" L+ }) ?) g2 |( M0 K2 q - , _1 ]# y5 C+ ]
8 w! [3 H) b0 W
当前子进程 id: 35205 M' ~- I6 F b% E; _
/ l% Y. G6 t6 Q) V' h6 z5 X" Z; ^
- b/ q+ f5 N/ \* K: e
9 v8 t/ \2 u* y2 g# X
--------------: }2 i8 m9 e' R3 a
1 O9 w% c/ L8 d L/ C) y7 P' e/ n: O3 s6 m9 r" j+ g( e
1. 进程间的数据共享在Linux中,每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。
创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。
下面我们尝试用一个全局列表来实现进程间的数据共享:
- I% X4 e O: r+ O5 u
" @$ C0 x' P, Z. \$ Qfrom multiprocessing import Process. H0 |# y9 u4 R" c( x
6 D1 e; X- q8 g$ {1 g
+ i) l$ y" Y, e9 h. F" @7 ~+ j/ c+ z& b: x
0 ~- R& y/ H1 b g: M* t% U; }" A
1 U! h) b6 d) A
( e4 T" A4 e+ Q5 g) H
- _1 z+ s0 g! P" a' @; Elis = []
7 N' l2 s/ d" C. u1 _0 [) e2 v+ d9 n
- 4 v" a$ S1 {2 o! C$ R0 F4 M" s1 u
' J# {. g! t' g, o( g0 A5 }0 j3 r7 D9 v. N# j+ n, n
* \* H- ?1 `5 _/ T9 [2 X - 2 r0 f" e7 [" n- G
' }4 a, l- o8 W* R4 {def foo(i):
( B. l: s [( c- ?; r0 q" {. h
& Z3 j" G" U% n: I/ e - 2 r' v7 J8 E) O1 i6 B# N2 [/ H$ V
% B9 ~6 M: T1 t9 T* g+ P6 { lis.append(i)( ?9 x: a t% {% L! ~3 |
* c% F6 f2 U& C' W3 z* ^1 v7 i - 0 r: b, g$ a6 v9 g) D: f5 Z; j1 j
- b: ]' z8 A G" q1 k }* \2 v, K/ g
print("This is Process ", i," and lis is ", lis, " and lis.address is ", id(lis))
6 p& [3 m1 s G X& k
% F% Y5 e- N% Q: m2 b
/ j g# ^3 W# c2 P7 c
( x* |8 f) d% W+ ^( n4 z! V6 c) V0 y+ g; e8 r5 r
1 \. |9 F0 p$ o2 ^" N" ~! t) ~
* q# l% e+ U% ^7 p% @7 K7 t$ H8 n# @% Q. J
if __name__ == '__main__':
6 Q) ^( d* N6 ]3 |' q
5 n6 P V; y& n6 I$ ~- 0 q! D/ }- b% a
: B. h8 t0 K4 J7 s
for i in range(5):
0 \/ |- T1 N; b+ _5 x( V. E
: P0 J/ Y" C% P$ f - - Q! h7 D! q% ?% x
. }, _7 i4 ]3 l3 l3 Z& I
p = Process(target=foo, args=(i,))2 g8 O7 X( Z! ~* Y( l
+ Y7 T6 V! |* I6 d - 1 X/ ^1 y+ ?9 @7 z
# `# g8 d" \9 ?3 G3 }& _
p.start(), ]7 a* N3 n9 u! A* c! d5 Y
1 f n5 j9 K$ k. I
6 g; K! \3 X$ c: v/ X* q$ r. O
9 E J: v4 d5 p# a8 H" t# C print("The end of list_1:", lis)
7 v- e) j: G4 j: H" A z9 b3 U, U; t9 q" Z
% }8 {' {( c: _ T# F
运行结果:
! I5 D9 L/ m* c# ` w5 S) N! x, @+ E0 W: R) a8 l4 a0 j6 d% u
The end of list_1: []
/ |' J' |/ n4 y1 l- ?, H2 s8 I4 h& {9 l: B/ O' ?7 u
- - T1 ?2 A* `0 |5 Y
% y+ F* o% N% e1 b! a# k, oThis is Process 2 and lis is [2] and lis.address is 403567445 R Z- {/ Q& S0 R1 x
6 V! g+ a, l" ? - C7 x. i7 G. f4 o& R
0 O' U' ?1 K/ iThis is Process 1 and lis is [1] and lis.address is 402912082 ]; |7 p y9 o6 {3 o+ Z" I
5 p! f+ z& j2 i$ l3 h4 L: ]
: R+ H2 h! [) V) b. U" g# `- d. D( @6 q
This is Process 0 and lis is [0] and lis.address is 40291208( L0 G& @# L% }: I4 P0 i; A
) {- \, G3 C9 E2 M" W5 B' O% T2 C: A
# B4 T. D+ p8 D5 { f% O+ Y) I, q# H' J
This is Process 3 and lis is [3] and lis.address is 40225672$ E& H; m- ^$ ~# Q) X
$ N! d7 ^- u. X6 K# h! Y9 [
- 8 [ c6 e% l, m, N4 d' K
n) q; y, [7 b! @# W% C+ y, SThis is Process 4 and lis is [4] and lis.address is 40291208 j a y5 d( q& m
& _" g Y: z7 q" O( c) n! V
7 s6 s' Z# |% K2 o9 p- u$ V+ |# ~
可以看到,全局列表lis没有起到任何作用,在主进程和子进程中,lis指向内存中不同的列表。
想要在进程之间进行数据共享可以使用Queues、Array和Manager这三个multiprocess模块提供的类。
1.1 使用Array共享数据对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的对应关系:
* `2 C9 B6 F2 ~& q/ I+ S* G2 C# o# |5 V! `- z
'c': ctypes.c_char, 'u': ctypes.c_wchar,
; j4 ?" V/ W: B( T2 `( Y
9 R0 X" D+ v5 y6 ^, Q
$ d$ y9 g; [5 ~: z7 [* @: K' F0 o! Y, F v( o
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
& v1 Q& ]. q7 k2 K6 K$ @% |4 a( L- c) |3 {
' P. A P! V5 w( q* g8 }/ j
7 s' r" v3 ~/ \/ K6 O0 \. y'h': ctypes.c_short, 'H': ctypes.c_ushort,$ p c1 y* b) K0 v7 o
* e( ]4 d* B' S& N% p+ H1 @2 _
) E3 D4 t( n2 D
$ H0 c" M- o, L; X/ N5 V6 p'i': ctypes.c_int, 'I': ctypes.c_uint,
& n0 \# s, v5 P- ?3 A' U7 z5 [- A1 G: |6 W$ ]; `5 @9 @6 k
% I& Z8 N9 P( Q4 k
" N, e$ f' y7 r% A+ ~2 a2 p'l': ctypes.c_long, 'L': ctypes.c_ulong,3 p4 T4 h1 ^; x; r
. ]- r7 _: T% s0 t9 W- ! S8 o$ f' q e. {% G6 w( b8 K
9 C3 i0 ~# F& @9 g2 h'f': ctypes.c_float, 'd': ctypes.c_double9 Z N m1 R) I8 i
' e( r4 {2 R$ m4 f2 A) X
/ i! c- @# E) c( e3 C
看下面的例子:
- N. l( \! t+ ~4 f% @0 r0 A. N" ?* O1 Z# g i
from multiprocessing import Process$ [! K% H) f7 J+ U, ~) t9 m' {/ _
' z1 S- P( u( |' M
/ b2 D3 ?- ^' V+ I& \
" A" }2 K9 W5 n3 Ofrom multiprocessing import Array( {( @/ O$ Y8 a! v! @9 z
/ ~; Y0 L1 D' K- `2 N5 [7 y1 Q
- * P5 }3 ^7 @7 ?7 N, n* {/ _# I3 Q: L
7 m) }! B R1 w# @! d. q% s: V3 q3 ~( U+ d6 w6 B% `$ X
0 i9 m, x" W& ~+ b4 ~ - " j! V# F% Y/ Z
6 a) X, M: n5 s0 ^9 F, xdef func(i,temp):
. m( @4 |8 u! b% Q
: Y, o6 i0 N `. J6 b - b! l% z0 \ u O7 u7 o9 {3 D, _
, X# R; }$ [/ B) z2 c, s: R( x temp[0] += 100
$ ?# ]2 m- q; R$ @6 X
; i% X+ d# T7 c0 r8 ~* T - , V8 U/ K. Z. m3 m( w9 v
# h) V4 _( F, E1 w- S# d3 b print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0])3 u! `& q. w5 H6 V9 Q
, ?' r% V: L) l8 [( \- `% X$ l - " a* k5 R$ J) c( S0 h! V
8 A; k+ M' ]8 W3 [
/ N: u/ H. U) u
) S4 |6 N2 Q- J+ [/ i+ d$ Q( K
1 \9 n! A# U; @+ H: h2 X8 _5 h
3 C; R6 x+ u2 }9 I' nif __name__ == '__main__':9 n# j4 }/ u5 Y% _" E
+ ^0 f2 z' e. C
" {2 X3 c- V% I2 D
( J: O( i: ?. O( F temp = Array('i', [1, 2, 3, 4])
2 ?- S* Y7 T1 V
& n9 I% f" k; G5 S n# b
_8 `: k9 {1 O3 v# |
. {8 r9 i# L! S# R$ t8 J v; i$ } for i in range(10):2 Q5 \* b! C1 |; d
( O! K; f; V- z( a, b
- % s# q* v5 F" J5 a! B3 W
5 @& H0 z: W( Z2 m! l& F/ N3 }6 o9 ^
p = Process(target=func, args=(i, temp))
/ \- n" o5 ?) V2 `! ^
7 z+ U) _& A! O
6 k. _2 f( T1 z
3 C8 @5 F: ~' _+ ~. G) o0 I5 i p.start()+ G2 @/ L) r9 i5 T( v' P
0 a% B, V6 C- W( T2 F" K3 f4 \7 e% K
运行结果:
- - F6 ~' a) n8 Q2 q" k. b" u
2 g4 ?/ k/ z' P; X/ S进程2 修改数组第一个元素后-----> 101
. g# N5 r D/ _3 j# S) _# \. D* Q' S: o; v: [* }! ] r
- % e( Y6 O+ C0 d9 a3 z( T @, o
- @( j1 P9 z- W* S: j& I进程4 修改数组第一个元素后-----> 201- T. N* C( P; e7 q( ^ V
1 Q. i# H! b* q+ J
- ) o3 B9 L F9 r, ]+ n. e# V
( H% v$ w, w, I; Q4 [+ n进程5 修改数组第一个元素后-----> 301
: {5 J8 t) _/ b' c6 r! b) R( X. a w8 [
4 F Y( @. U ]) m/ `9 q0 R
_& b+ P: T3 N进程3 修改数组第一个元素后-----> 401
5 S/ l3 B) X# W% K' ^
4 P. K/ _0 c+ _0 Y) D' ]- 5 O1 F) L. S# g1 B
3 | r8 [! D6 M+ x: f5 n进程1 修改数组第一个元素后-----> 501$ A0 t8 Z7 {3 \ H( h
8 y5 W8 \" r: M1 A - ! e5 }; }7 I5 g7 ?9 ?. |1 I, I
, R% Y4 g3 Z( A, N
进程6 修改数组第一个元素后-----> 601% f+ ^: F. \0 U8 O4 o3 @
0 Z: Z6 K1 D2 q
- ! L& N" }3 U0 m
. d, [# ~3 Z2 ^8 _5 @7 X进程9 修改数组第一个元素后-----> 701. M6 _" o" r/ \4 v! B$ \8 y0 w
0 v W; a9 h2 W - % R6 t5 R8 I: c! s# E
3 V) c7 p" Z! p" w& ^进程8 修改数组第一个元素后-----> 801
0 l7 C7 }6 i0 r8 o* ^( g: k& X3 |6 G% O0 M1 M
) v) P, B6 I1 |, I# h& L1 a2 }' n# k# u* D0 ]) j" g
进程0 修改数组第一个元素后-----> 901) u& A! V9 m: }9 _: N
7 K- A% D" ^4 D+ @5 \3 M
7 O+ U- ]) x+ x
" ?( `' j9 n# o* F进程7 修改数组第一个元素后-----> 10012 g+ g P9 H* ~4 d- v
* E/ D* U/ M; m/ F9 U5 q6 G( \8 z3 q
# r0 {7 K$ ?. C- j5 f
1.2 使用Manager共享数据通过Manager类也可以实现进程间数据的共享。Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。
5 U) H$ I" M1 A( K2 N# `
4 M- ~, N6 G0 ~" _. ffrom multiprocessing import Process
0 _ U4 g2 l% _, B; q, f/ V
8 g4 ?+ I2 p: T- 0 K y" o- G. v4 M# q2 r
. S# ~% Z5 L& R: i7 ^
from multiprocessing import Manager* f7 D- V" j: y
! ?1 a& _; B; X( v2 L9 G - 8 E# t v, i5 r; b& z& ~
+ k9 k- s# g/ P: k2 z
9 j/ W8 s2 V" }# i3 Y9 j/ q5 p2 M; O& X* B& I" h+ L
. A- L; l, Q+ x. R1 k* U4 E2 G
) W+ j0 x- k7 e8 M+ Zdef func(i, dic):3 Z0 [5 |5 s# V6 u' _9 U9 Y# D* V
: S- ~: [* X' t0 e5 Z) e/ C- r
- [8 K# z. T8 n) ~2 _
) m; H8 p# I6 p; Q dic["num"] = 100+i
( c) l& s) `' c( E
0 ?' e# @2 ?$ }) ?& D# G( x
5 l% G5 i! M0 O! i% ?5 q1 u0 f. x! o& J% ?0 h' a( j
print(dic.items())6 \! F( k; z0 h: }% s$ U
( y' _0 Y/ {3 P4 w) V6 @$ b- ) e6 ?3 v# Y; q7 C0 y$ ~* r4 Z
9 L3 _6 A+ a' ]* u' X% A/ O, Y% E4 X9 o6 e. K3 @
1 y. z [1 p6 {/ v: I% A/ `) A
- & e. a8 v8 A9 |- ]: m) a
, O7 Z& L" }( rif __name__ == '__main__':
( D: _' {, E' _3 D* h0 B. V9 p# Z# E7 J# |2 Z1 W6 a3 J
: F7 `$ ^7 w C6 r
1 Q) j4 U3 K/ @ dic = Manager().dict()+ n" ?) i$ `& K7 b5 Z0 V* ?
, a5 u! [7 k! ]7 l3 s( [2 X) x0 |- 9 Y8 C8 |& s; x6 z
+ O6 ~8 ?! S7 V" M for i in range(10):
) t( j% x, T# M' m/ P
[* t0 C/ o% j1 z
* ]3 g7 I8 m9 K6 ]7 l& d. l1 B5 k% W a3 }5 x& r* s
p = Process(target=func, args=(i, dic))" j& e, c- p/ _: n: r& J
" Q$ G8 c! I* F: H+ r0 r, V5 W, Z
- / q0 r6 t1 V8 }6 M; B2 f. }
3 n1 T/ D+ f3 ^. F p.start() h1 p% h- p' ^- }' B
' k; l: z& I: M+ J5 F& p/ \: {! A
0 ?7 Y; }( X8 P1 T$ `+ E* n4 Y0 v/ d$ b7 ]* m/ i4 O! x
p.join()" {: g9 h8 Y1 @* q
, _4 p# `( k1 F3 r0 E# Q+ F5 @% j; S% z* [2 Z% V4 N! b5 y f
运行结果:
+ m0 [$ Q) p2 I# j1 q) W' ?$ D! V- C& P# m" f2 {% t& f
[('num', 100)]0 ]/ |6 [" q/ J( a( Z
, Y( t( O$ f3 ?# I- " @3 y7 F2 s( v
/ r; H! i2 X+ B( v' I+ [
[('num', 101)]
3 b" p( H$ ~' ~ x6 P* @$ D* T6 z2 Q0 f8 [! C! d* L
- l6 n7 h# C8 q) U5 a
7 r2 O# B( L" u: }: d# h[('num', 102)]1 Q7 \/ e; G; @- S, j. k* Y: k6 y5 Y
, p0 p2 X* n; S7 n; M5 t6 _5 B
e1 o0 Y: d0 S8 i7 |/ x0 N6 x F& s0 G7 U v
[('num', 103)]4 M* Y Z) O+ [; W% r8 y- @
) y; t. y( F% ]
- , V2 |8 `2 j& e% }6 A
; P! B( S" G$ I- {! `, T* l9 ~
[('num', 104)]6 [# s8 F3 L; m% l1 \ a5 t
4 w" X* P% f7 _) O) V5 j* H$ ]; Q, J
- " C, ]$ N1 j* z& l0 u+ u( b7 C* w: P
% U- ?( o7 Z! W* |8 c7 z( G[('num', 105)]
# |: l3 C, [- K) d7 o3 y* L& ]- K
. K3 j1 I* ?( r7 `# e1 m$ a# Z0 f
' c5 ]- Y* N2 Y& ]- {[('num', 106)]% T2 a- K N- V9 \, @
' J4 f7 m% A" ~' X
( ^7 P; T6 M3 ^3 v/ g5 ~' K- \) c L9 z4 I h# u
[('num', 107)]$ R) Z6 f: N/ ?9 W
2 f" o. U3 {" [4 Z( M- ' C& {8 |8 S# D& b ]
; r R; G, `2 Z9 n0 K0 n K7 M[('num', 108)]$ a, _6 m" J! W2 R- ?0 R
% A+ x- a( T* C; Y
( }- P+ w$ d! a' V3 A4 @& |9 T0 }( p e8 y
[('num', 109)]
E2 M( ~0 C. `5 W( Q# n
: [# b: Q8 e: ?1 `4 E& d" ^5 e6 v9 \! g2 l8 L
1.3 使用queues的Queue类共享数据multiprocessing是一个包,它内部又一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示:
- 2 f n# U" k, x9 @5 P2 G3 ~5 V
/ X! p3 n( G) {/ Simport multiprocessing
1 g3 }$ _5 Z, z4 {( g3 O& r0 x$ F
. R/ u6 ]$ V" [3 B - 9 K0 P0 {) Z& p" O+ X5 ]2 S8 v
1 ~- B8 m+ O3 o0 U9 U' ?3 x- xfrom multiprocessing import Process
8 k( Q& U( P1 G4 _% |5 ~7 |0 S" f x- ^/ ]0 F( g3 g I
1 f. g9 O, s; R6 D* J- ?
! X* n8 j0 a7 n+ J. {& y1 Ufrom multiprocessing import queues
+ l" G0 v: u% {" O3 G* o
! v( Q( n/ a0 A8 R0 G. u- 9 Z- Z0 V( D4 A# \: v7 U
! ]" ~6 U5 R; l5 g
* ]) C" I/ M1 _' D* H& G8 m! L+ g [: d- C+ Q
6 R$ G% P* x4 x4 M& F
: [7 p( r; F# l7 Odef func(i, q):
6 n7 o& ~& A& H+ o& C9 B6 k: d; T% u/ Y% K! d
1 o/ }( s+ _0 D0 \" ?) i
7 c3 k- I, v& `' a& W6 J" D8 v ret = q.get()* z2 P1 ^7 c q; I
8 B6 f) ]! r/ s5 f
( T/ X9 ~2 H6 A" v4 C! c7 R# C
7 Q1 e8 t9 i" i- H print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i)), u# s& S; T( D7 n& d. E o/ d- s
9 l# Y0 K0 H$ ]- i- @
- 5 ^2 e7 Z- @4 _$ d/ f; T
, `+ K# \0 j5 u' Z" G' e3 n
q.put(i)/ q' y7 [5 s) I( N: |
' {$ C) O& X! a7 H
4 | G' C) M5 M: r
- U5 a" X. b& z1 t5 \
9 t0 w* v# N! W& Q7 G6 _9 r
: Q( w$ t" T: y7 ^6 H! }( o; _3 d- . E) b; _. `& s! E9 C: x6 {
& q0 O5 ]8 o0 [9 [1 G
if __name__ == "__main__":# H7 O. V% ]/ x( @+ N1 z
: f; ^. \+ f* m) r1 n' A
- 7 o' r/ G6 D4 C S& b/ z$ W# [
# q( M, T3 z4 V$ ^! P lis = queues.Queue(20, ctx=multiprocessing)
3 r L8 N2 g) F9 \) i
1 T8 J2 } M# t. S3 T" ?2 G - ' S' L; ~! b k& U$ q2 F
3 T1 `2 e- T* c5 S& | lis.put(0)/ w6 j& W) O& i
/ P, n+ w! j+ ~
0 Y9 W2 w2 u4 U: \8 U
4 b4 A2 w0 y" F2 m% F, ^3 M for i in range(10):7 ~2 W c) U0 n* P8 [/ E& y5 \1 X
# g9 @+ G0 K5 {6 V" K
- # L& C4 E3 m" }4 I
' }0 Q7 {) G/ q0 L0 }4 T' w& X# p p = Process(target=func, args=(i, lis,))
8 ?; a9 \" x. u! u0 Z% y; @
7 I, ]# R+ A" Z, Y/ q4 z - ' M1 R2 ]; k8 X) D W% u Z0 J5 J
' Q9 ?- F1 f" X+ f
p.start()
: C' w4 _1 j) v! a: [8 G: y- }4 m) ^ [% S
! J) f* i( e2 {* x
运行结果:
( E' h! D) K/ @5 V, q
2 z( k6 I0 o4 n进程1从队列里获取了一个0,然后又向队列里放入了一个1% W( a& b+ Q m
! X( ]; c9 R( T0 R
+ V; Q4 l% d& n9 @) n/ O O4 [+ `1 V% _8 J: a4 G- o
进程4从队列里获取了一个1,然后又向队列里放入了一个42 E0 G2 C+ w# Z3 o0 |
0 h$ u: C( y2 ?
3 F/ F# F+ H4 h6 B/ e' o: y! U) a/ @ O0 d7 ~9 p, _" q+ K. |
进程2从队列里获取了一个4,然后又向队列里放入了一个2
" [5 |7 h* w# A: e+ h3 ?
* p! ^& X7 j% m3 g0 ^0 y
/ a5 B6 P% {8 l. h i6 B3 f
, k: Q. @# P. r. M进程6从队列里获取了一个2,然后又向队列里放入了一个65 k) q, S$ U" n! H9 b
/ S) R* u" X* d7 E- 3 p1 q# P$ E* ?/ T# x
$ r4 d8 o3 J f' o, C. M+ x* z1 _9 y进程0从队列里获取了一个6,然后又向队列里放入了一个0. g" T/ s9 G2 ^
7 Z' A1 f, C: \0 |" D! |' L
- & @3 F1 x% T8 ^
# ^# N( {/ x2 A5 }" m6 p8 |进程5从队列里获取了一个0,然后又向队列里放入了一个59 t: g/ J7 `* k4 k
8 j5 f! e/ S; B( _" i
, _) [' T1 r9 k5 B6 L9 x( q) S }
进程9从队列里获取了一个5,然后又向队列里放入了一个9" |. R5 E6 ^" J
3 k' J* T1 ^) g- ' c2 S' e! N& C& K; V
' P- V1 D. t# L. W% @进程7从队列里获取了一个9,然后又向队列里放入了一个7
) J: N& J6 e6 S( v* U$ \
) D% N& V3 y$ j" u - + W4 A$ {" h: J
& T8 [" l* U6 D. k- g( _进程3从队列里获取了一个7,然后又向队列里放入了一个3
, v% a2 [% \% {$ |* b7 K) o$ |3 W* ?# z Z1 [. u4 }) x4 b
- A' I' ~- p9 j& J- i
0 R. L) R1 v. p) O2 f. I: ^进程8从队列里获取了一个3,然后又向队列里放入了一个80 t" Q, ^' m2 y7 K# S
! X" T I7 F, H% Y+ W( w9 L, U) L
关于queue和Queue,在Python库中非常频繁的出现,很容易就搞混淆了。甚至是multiprocessing自己还有一个Queue类(大写的Q),一样能实现queues.Queue的功能,导入方式是from multiprocessing import Queue。
2. 进程锁为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的,这一点非常友好!
; e2 ?) \0 n: y q" R) T- b, A0 v0 C8 l ?2 L j5 b: ^* Y: S
from multiprocessing import Process
+ Z/ B; D% }- |+ Y3 r* N1 D/ s: G
- 3 t, q5 h$ x# x" ] I8 _8 X
1 A2 u1 S. n; I: j0 a7 zfrom multiprocessing import Array( l$ N; B6 Z$ Z4 `0 D5 ], A
1 { a2 u! Z, p2 P, w* }
- . k& @' {4 L1 e9 u
3 R6 y) p: D& X9 _- f9 B6 i. pfrom multiprocessing import RLock, Lock, Event, Condition, Semaphore
6 b' u; c0 a0 C v# V6 f0 l9 x! D7 V
- 2 y5 D, t8 A: l3 l2 ^/ o4 u( K. t
" _1 V' ~0 f. k, q
import time( P6 s3 f H' }' c2 i8 p' @) q
2 f2 W$ i; T! R
- 2 D! ]7 S- M0 g9 n4 _
+ }$ N- J: r, [+ S1 Z- S1 O p
' P# w# J' F8 a, T
3 u1 V% f7 U) a0 J! g2 i, y
% F7 g6 M5 Y+ r% I0 e! {% a, N
$ [0 x; K0 R! m& w' c* K: G8 W: A( adef func(i,lis,lc):
/ V4 u [! z" Y) c0 W. }, W, O/ f9 y
- - u, n) Z& t% r& X" E3 r
( N0 c7 T' z9 i; s) W$ N* _" _! g: D lc.acquire()* b0 g! M9 V6 n: u1 B$ O
7 y$ }0 A( `' i% M9 q6 ]
1 V5 C( p$ d' Q, q/ m$ Q9 [: A; r4 S+ E
lis[0] = lis[0] - 1
: c! h# t& J/ I- g
/ @0 H/ K: ^/ U- S6 b" V
! w/ k5 K# h: Q! Z T: T8 `' D, U9 z+ T8 J* K" o5 W
time.sleep(1)
' Y( ~$ H' M3 V8 p9 ~+ G5 c* s
. u( [" x+ I6 O, ~4 f) p% H- & X4 G9 t" D* m/ a# K& E- {
+ T6 X& ^' d9 @" d! H
print('say hi', lis[0])7 g$ k$ J' m3 T, ?6 R4 \" t5 B
6 |7 v- d. A! o8 @+ e
9 o! V. }7 Z$ b1 S) a' E" R' X" P8 Q2 x( w7 R
lc.release()
6 ^+ F! H/ c0 _3 f3 Z5 A- Z) ~! r: D2 @5 }0 O
- Y3 ~& y& y# Y+ r% `! P8 t H% {2 W
3 N, C6 ]! m' f! J3 l
& Y2 k8 t( i# d" W9 a- U
- $ e! x# e8 u2 ?4 e E
3 j2 [/ R' ^- l! qif __name__ == "__main__":
6 ~3 y5 W5 r0 B3 c$ k: o! J& j7 ^
* m, Z9 C+ u& K! Y- g - , q' i a8 ~; {- m+ K1 w a
4 |, c2 u+ W8 w0 T8 r array = Array('i', 1)
( P% _( ? w0 x0 O- \; j8 a+ ~ h2 F- i, Y) v8 x
5 u% K9 E2 b" Z3 ?: r
$ ]1 L2 C) ?- |3 ?, T array[0] = 103 A- l; C/ ^: f3 K0 O
3 f* S& j" h" X! O& T9 @
- 7 Q. ?: k9 P& J r3 h
+ s4 {% J5 z5 Z0 | lock = RLock()' t! k; a5 ~# L. @# k# E
8 E1 N% Z# l- F: ~& n
4 ^. L# ^9 c; V! j
& c) C; Q- o: r& O; j for i in range(10):
, F" j/ x& P. g2 Q; v' \$ `
4 G+ @% U7 y- P$ \- 3 T6 ]2 r0 Q1 w6 K+ U; @
% N# d$ w& j0 M/ P- j& T% o' i2 E% K
p = Process(target=func, args=(i, array, lock))
1 E K G- R9 Q) [( F7 x: w
X4 J! w0 X7 v( m5 r# Q - , o" c; E; n' f
& W+ k, l8 D5 c3 n# c* l( W2 O
p.start(), A; ]2 v# m% K* j" N
* J6 F+ I3 N9 h- l8 r
- ]% r+ M8 B5 k! ? K
运行结果:
- % O7 ?% C- {" f" F( d) y7 O) J
$ j/ v0 N9 q. }* X) ^# f8 U* L# d1 X/ [say hi 91 o+ j, K* V" c5 y2 Y! m
) e1 n3 T3 M# `+ P# @
- Q8 x% h$ N( K$ m/ Y. i3 l: T6 _& ~+ q5 d& S% }
say hi 8- y' z6 X b+ \
# ? ]( \) F( H2 B% E+ O
# }& A" `* \1 I5 l' @5 y
# g: X5 N4 ~( e" v; O7 Usay hi 7) d" s Z4 j6 O# N, G7 K: j
' }7 o W9 F' G( j% @6 O1 H* N2 J5 t
* T+ B, F; A4 t* x- p$ ~( f! J& h6 w
say hi 6
! T1 G5 S+ |0 K" a% y" a' E4 B1 B7 ]5 s. p. L; t) ~& v
- 6 l2 w# z8 Y% C8 m1 F( H1 @9 d
6 Y0 s0 V' x2 J, y8 K1 Csay hi 5
& [, u4 r) D% X* f- u6 W6 ]7 j" S+ H
( x0 u' |2 U& d0 \7 M* J6 s+ d* `5 e. }3 X* J
say hi 4
# e5 f+ B0 I) b
! u$ ?+ R' |2 O
+ v# H0 h- Q `; C) G0 h. o2 u
1 [$ ^( E5 w! T% j+ J' |5 `0 l# tsay hi 3
( E4 r; x# q4 q# P. W6 e1 Y0 V6 a
) S! j0 }! U4 p+ Q, c$ V
7 y7 G9 I9 Y. k$ M+ K. V+ p% {6 j, ~" G4 O C+ S
say hi 2
$ H+ O2 U' B5 h5 U" T0 N" n5 G% X
$ h1 L, f! p% s' g1 F
" l1 o1 p- v5 D1 r9 U7 V$ J l, E: b9 Z, U: U0 x
say hi 1
0 l& a6 z; t# |7 g' _: c. s
! D+ M% \8 C8 j6 x' k
8 c1 ^: q" w9 a& C; v% n4 F0 _; u3 q& U% y" a0 P
say hi 0; U5 G% _8 u4 y+ ]' |" F: [* l
, \% b% m9 c3 l; ]2 [
& ^0 I" G% [# S: w$ f# u# S
3. 进程池Pool类进程启动的开销比较大,过多的创建新进程会消耗大量的内存空间。仿照线程池的做法,我们可以使用进程池控制内存开销。
比较幸运的是,Python给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的from multiprocessing import Pool导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中常用的方法:
( N! `$ V/ R9 d6 B' {( Q P8 h i2 a# m& L
from multiprocessing import Pool
0 m/ Q+ e, F, G! B- _( ?. R1 Z$ d6 x# o$ P
- 3 }9 n6 S6 l9 G7 I* o$ l4 U
, y: `8 E( z( l# C5 u2 v2 L$ t/ A2 U+ r
import time, f7 }: S3 \" G, N0 g$ F
/ C, f) W' h E: k
- 6 [- o8 { O: J/ `9 {0 i
5 N8 V1 M& L ~! U5 \
0 J8 j0 H; U9 J! T/ Y A7 Q% @& K8 h3 P/ x" `! f
- ) s" i2 r. X1 m! K( d: D2 c+ x7 q
9 u, C# a# J' ?
def func(args):. B% J6 K& a/ a! l
+ s1 R7 m' r& C q/ g# e! ?( Z- h
! q c' J% |4 Q- L& U# x' x6 l2 z& w- H& p
time.sleep(1)
& i* X6 B8 I! k" m$ V. M0 D1 c
& R6 y+ _9 Y! O& {% L- , P% y( D3 I0 E
/ P1 V3 M" D+ u$ @ print("正在执行进程 ", args)
. o; Q" Q5 c Y; T1 F0 G; _
2 x; u9 ?+ E) V6 n. c9 c6 ?
7 @/ C% g! t, I; h. J" v; f7 Q6 l
K# u* M6 n: j; S+ F$ h
* j9 d1 \5 e0 c* t+ F3 p& o! D
; ~8 i0 j. B2 J" }3 l- ; p: S4 C* d; Y2 r8 ?. L
0 Q( r$ s2 e- c4 A, eif __name__ == '__main__':
& y8 {. E: C2 ?% ~) Y- c7 h" E& G0 u" ]9 C8 T3 i
" f# N2 L0 p8 E# P/ E) ^& t) B9 ]: d7 }! {6 H# d' i% `
0 I' k$ E6 n' I2 q5 |6 v9 A2 l N4 c$ r) h8 n4 O& [" ?6 S
3 o* k8 N7 |! S$ B4 {: C$ D4 X/ V! C4 w
p = Pool(5) # 创建一个包含5个进程的进程池
+ s8 J: {! Q! t% Z4 \2 V# g) C7 x' t) R' e! N: K6 P& h J
- / }9 q! ?' d8 j5 d n! x8 |, _- e
: G/ N6 @; }! S7 P
# L) y4 H+ o6 W+ l" N& x) l a; F6 z" ~8 y9 _
- + i$ }; [6 ~: z
2 U3 X _' E9 ?" s, X/ P5 W for i in range(30):& {5 }# i+ k W6 T5 z z- s$ L _ {
. P6 N8 g8 ~+ ~4 \- f - 9 ]' r, @/ H. a/ F" ]
- v( t/ I* C: p' t8 E# _: B; u
p.apply_async(func=func, args=(i,))
4 ^/ g% \9 W0 b' }, b \5 L$ f
. P7 e' e6 z+ ?( S
- C6 f+ j/ f: G# N$ c5 N2 ^2 W7 g1 A7 V9 W9 I
; r; B. B* {3 \4 ^6 v
) i. Y4 S- u& Q3 A- : T' @0 o" s+ `/ c$ A
7 e7 Y8 u7 k2 h6 j/ W4 X$ q p.close() # 等子进程执行完毕后关闭进程池$ S2 D. ^1 |2 q. g* z
, _4 t2 m- W8 u& ]+ r - & k1 b) s6 N7 p7 m; ^( X
8 ` \) ~( _) E- @: g3 P6 s # time.sleep(2), ^: m# r! T8 ]
4 I! u* K8 b, K7 x
- , F( S$ F# D2 H) q' E
( b: A* p" {; b$ M6 E, g# J/ j # p.terminate() # 立刻关闭进程池, M2 Q4 J& @( S% x& ]" L3 i+ C f7 v3 `4 _
" ^1 k3 q1 D6 c3 N+ ?) z- Y - $ e# k3 \1 L: F/ |2 ]& m
1 O' V6 ~- k6 U2 e p.join()
J( ]1 x4 ^8 Y: }8 p/ x! P! s4 ^! m& t4 I+ b" j. e3 s4 h
+ T5 S3 X; L6 J- r
; }7 h+ R* l( I
请继续关注我
+ A* i; w9 M5 X/ i" \
4 Y/ ]& e: Z+ t% F( m: a/ t
| 欢迎光临 数学建模社区-数学中国 (http://www.madio.net/) |
Powered by Discuz! X2.5 |