别在等待中失去了节奏

再一次于凌晨从被窝中冻醒,辗转反侧了许久,决定写一篇博文。作为探讨爱情观的博文,可以说这是所有我写过的非技术类博文中的一股清流。

从节奏谈起

节奏作为开场最为合适不过,小学上音乐课老师就夸我打节拍特别准。确切的说,这种节奏感也可以说是从小学开始的。小学时连做作业都是按照老师说好的节奏来的,五一七天假,每天写一点作业然后去玩,遵从完美的规划。然而第七天的时候,母亲做完农活回来,见我还在不紧不慢的写作业,问我写完了没,我说还没有。于是她把我整个作业本都撕了。我流了会眼泪,然而赶紧苦逼地赶起了作业,终于写了一下午写完了。那是我第一次发现deadline之前的效率是最高的。于是从那之后我换了种节奏,每次放假都是先玩命把作业写完,然后随便玩。当然不都是纯为了完成作业而去做作业,例如初中的时候感觉做英语题是种享受,每次放假当晚我都会拿出所有的英语题试卷堆在桌上,然后奋战1个多小时,享受虐题的乐趣。8开的试卷,单面3分钟,正确率95%,这是当时我创造的记录。

这种节奏从小学到高中结束,一路感觉特别顺畅,可能是节奏路线和大主题契合的缘故。在这种节奏下,不用考虑deadline,因为很少会遇到。直到上了大学,这种境况被打破。

deadline方式的崛起

没错,大学可以看作是deadline方式崛起的元年。大一的时候把很多精力放在高数上,以为可以凭借高数制霸学院,然而最终发现自己还是太naive,连自己宿舍的一个江苏的哥们都比不过。在两次高数期末考试都徘徊在90分边缘后,我决定转变思路,找寻一条可持续发展的新道路。于是在大二一年,考了英语4、6级,计算机2、4级,不过后来发现这些东西并没什么用。其实在这个时候已经有些跑偏了,不再像大一学高数那样花费很多经历在上面,而是在考试前的半个月内开始突击模式,这个时候终于看到deadline方式的高效。从此开始愈发喜欢上了这种方式,以至于后来很多地方都在用,包括研究生阶段的考前复习,找实习,甚至是学位论文的开题。很多时候做事是迫于deadline的压力,常常差一点点就超过deadline了,不过好在凭借运气苟活到现在。

实习之后发现deadline竟然在工作中广泛使用。每次接到需求都要预估一个deadline,然后为了按时交付有时就不免加班完成。

deadline几宗罪

然而deadline带来的负面效应太多,首先它打乱了节奏。我不得不将生活状态划分为平常状态deadline前夕状态,然后在分界岭出来一个突变,踩一脚油门开始变速。这对于我这种单线程工作生活的人来说是种挑战,常常在各个deadline间疲于奔波,感受不到那种从容。其次它忽略了过程,没有了全程参与给予的享受体验。

具体到自己身上,因为有些deadline没有确定好,所以直接忽视掉。例如现在处于找工作的状态,然而校招快结束了还没有出手,主要原因是不清楚deadline是什么时候,总是觉得时间还有很多,过段时间再想也没事。哪怕是有人催,还是没有动力跑。总觉得青春一大把,却没有认识到青春是最不能辜负的

启发式

我在人工智能领域看到了上面问题的影子,说这句话不是想吹牛逼,而是想说我在AI的接触中体会到了同样深深的无力感。在AI中很多问题是存在精确解的,但是需要的时间或空间都是超乎想象的,由于耗不起,所以只能退而求其次。但是怎样以小的代价逼近精确解呢,这个很难说,其中有一种方案就是采用启发式的策略。大致就是说,我们在这个时刻是肯定不知道怎样才是全局最优的,但是我们可以使当前的局部状态保持最优的,然后结合实际不断调整,尽量使得全局最优。但是这样真的很靠谱吗,不一定,就像爬一座山,想要最快地到达山顶,于是找最陡峭的地方爬,因为这里是升的最快的,但是爬着爬着,越来越平缓,甚至最终就成了平路。于是站起身想要感叹一览众山小,结果发现自己爬的只是个小土坡,真正的高山在身旁矗立着。

尽管可以尽可能避免出现这种窘境,但是即便是引入随机,也要不断尝试,迭代,调优,要重复很多次才能找到比较理想的模型。然而,生活不是这样,生活中没有Retry按钮,所以只能一气呵成

正题

前面说的貌似离题了,其实不然。身为一个学物理出身的,我始终坚信,万物同理。所以,再次展示下我的逻辑思辨过程。

简要回顾

对于异性的好感是从幼儿园开始萌生的,当时觉得一个女孩蛮漂亮,然后过了段时间觉得自己之前的认识存在问题。过了段时间决定另外一个女孩蛮不错,再过段时间觉得很一般。如此反复几次,我对自己的审美观彻底失去信心。这种审美观的自卑使自己一直处于观望态度,因为怕选择出错,所以一直没有选择。这种暗恋的方式至少从表面看是无害的,所以这种方式一直使用了10多年,直到现在。

上了大学后总免不了被亲戚催着找对象,然而我都可以找到理由搪塞过去。身为一名理科生出身,最不缺的就是理由啦。其中一个理由不光糊弄了别人,也糊弄了自己,它的内容很简单,是这样的:“我觉得谈恋爱肯定要吃吃饭、出去玩一玩什么的,而这些是需要钱的。肯定不能用父母的钱来扯这些没用的事,所以要自己有经济来源。但是我哥哥告诫我不要因为去挣眼前的小钱而失去了挣日后大钱的机会,我觉得好有道理,所以我就不会因为这个去参加勤工助学、当家教了。既然这样就没有经济来源了,那就不用谈恋爱了。”这个理由很简短有力,我自己都信了。虽然后来自己参加过勤工助学、当过英语家教,然而挣的钱都当伙食费吃掉了,而且觉得很合理。

吐槽部分

  • 根据6度空间理论,任何两个人想要建立联系最多经过6个人就够了,后来还有人证实了好像只要5个人?这块记不太清了。总之想要进入到某个社交圈并不难
  • 人与人之间的情感总是建立在相处的基础上,也就是日久生情了。对于这种现象理解起来就比较多元化了,直观地感觉这不就是一句废话吗;心机一些的理解是想要和一个人产生感情, 只要适当地多接触,最终成为TA生活的一部分,就自然而然成功了。
  • 未来是不确定的,这是美好也是残酷的地方。对于未来的期待可能是出于对现状的不满和不甘,也可能只是单纯的期待。但是这个期待最终会成什么样子,这个谁都说不准。具体到本文的主旨,未来会遇到什么人是充满未知的。我曾为《假如爱有天意》《我脑中的橡皮擦》这种刻骨铭心的爱而感动,虽然我更多在影视剧中看到;也曾为见到很多年近而立之年的高校老师急于结婚而参与到各种相亲交友活动而唏嘘,虽然这一幕在生活中常常看到。量子力学告诉我们想要预测到未来是不可能的,实际状态会在临近真相的那一秒瞬间塌缩

综上,无论是相识相爱,或是谈婚论嫁,都是在自己的一个小圈子中做出选择的。这个圈子是动态变化,可以想象成钱钟书先生说的“围城”。自己究竟会选择哪一个相伴终生,取决于做出选择那一刻圈子里的人。无论是选择等待还是选择扩大圈子,无非都是想在一个适当的时机在合理的圈子中做出合理的选择。但是什么时候算合理呢,这个谁知道呢。可能在感觉心满意足的时候,可能在感觉身心俱疲的时候,可能在迫于身边压力的时候,放弃了等待,做出了选择,于是诸多不确定的东西瞬间确定了。此时的爱情就像薛定谔的猫,任何揣测都是徒劳的,打开盒子的一刹那,是生是死一目了然。

思考

但是知道了真相后会怎样呢?不知道,不瞎说了。

爱情观

爱情观,择偶观,在我看来基本等同。还是那句话,万事万物皆同理

  • 首先我给自己设定了deadline,不管是出于对自己、家人的负责,还是作为一个缓兵之计,deadline都是不可或缺的。我将28岁作为结婚的deadline,那么向前倒推2、3年,就是恋爱开始的时间。
    除非自己在28岁左右事业、生活中有很大波动,否则这个deadline还是很靠谱、很稳定的。
  • 其次我在找寻的过程中使用启发式的原则,在best caseworst case 之间做出决策。决策依据是和自己的契合程度,这个没办法量化,只可意会。

总结

无论是爱情、生活,等待都是整个环节中的一部分。但是无论选择怎样的出行方式,最要紧的是尽快上路,别磨蹭。节奏路上找,风景路上看,至于何时遇到何人何时,看自己,也看别人。一切随缘又非缘。

写到这里,我决定把工作找了先。

hexo deploy时提示权限不够的问题

以前按照网上的教程生成过ssh的key,并且上传到了coding.net的网站上,原以为这样就一劳永逸了,但是每次重新打开git shell 部署博客时总是提示permission denied。下面是解决办法。

注意:前提是已经使用ssh-keygen生成过key,如果没有,可以采用类似下面的代码生成

1
ssh-keygen -t rsa -C "wenqiang_china@126.com"

生成的key会在.ssh文件夹中。
生成完以后要将.ssh文件夹中的id_rsa.pub文件复制后提交到github.com上,然后输入以下代码验证是否提交成功。

1
ssh -T git@github.com

上述过程不详细说了,网上教程很多。

正文

当下次执行hexo d时提示如下错误信息

1
2
3
4
5
Permission denied (publickey).
fatal: Could not read from remote repository.

Please make sure you have the correct access rights
and the repository exists.

此时执行以下代码添加key

1
ssh -add ~/.ssh/id_rsa

之后执行hexo d就可以成功部署了。

Python flask使用经验教训总结

前言

这段时间开始接触flask,中间遇到些问题,也走过写弯路,本片博客用来说明下自己这段时间内踩的坑。

正文

问题一:在使用SQLAlchemy管理MySQL数据库时,创建db实例时报错,提示Import MySQLdb出错

要说明的是,在使用SQLAlchemy时,要先安装好MySQL数据库和MySQLdb,因为SQLAlchemy会调用MySQLdb模块。Windows下安装MySQLdb模块可以看一看这篇博客,如果是linux下安装则可以参考我的这篇博客

问题二:同样在使用SQLAlchemy时,执行db.create_all()创建数据表时提示OperationalError: (_mysql_exceptions.OperationalError) (1045, “Access denied for user ‘ODBC’@’localhost’ (using password: NO)”)

分析,权限不够,没有在数据库中给所操作的数据库授权。
可以采用类似下面的方式,例如数据库名为test,则授权语句为

1
grant select ,insert,update,delete,lock tables on test.* to username@localhost identified by 'password';

上面的username处填写自己的用户名,password处填写自己连接时的密码。

问题三:使用SQLAlchemy时,执行db.create_all()时出错,提示:OperationalError: (_mysql_exceptions.OperationalError) (1049, “Unknown database ‘flasky’”)

分析:错误提示未知数据库”flasky”,因为在MySQL数据库中这个名为flasky的数据库根本不存在。注意,进行创建表操作前,必须保证首先所需的数据库已经在MySQL中创建好。这个数据库的名字在创建flask应用时设置

1
app.config['SQLALCHEMY_DATABASE_URI']='mysql://root:root@localhost/flasky'

上面最后的flasky就是所用的数据库的名字,这个要在MySQL数据库中建好。
所以,合理的顺序是:首先在MySQL中创建好flasky数据库,然后授权,再之后在flask应用中定义类型,并创建数据表
上述过程类似如下操作:

  • 首先在数据库中定义数据库并授权

    1
    2
    create database flasky;
    grant select,insert,update,delete,lock tables on flasky.* to username@localhost identified by 'password';
  • 接下来在flask代码中定义ORM,创建SQLAlchemy实例并在必要的时候使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    from flask.ext.sqlalchemy import SQLAlchemy

    app=Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI']='mysql://username:password@localhost/flasky' #使用MySQLdb数据库
    app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN']=True #设置为自动提交

    #下面定义两个数据表
    class Role(db.Model):
    __tablename__='roles'
    id=db.Column(db.Integer,primary_key=True)
    name=db.Column(db.String(64),unique=True)
    users=db.relationship("User",backref="role")

    def __repr__(self):
    return '<Role %r>'%self.name

    class User(db.Model):
    __tablename__="users"
    id=db.Column(db.Integer,primary_key=True)
    username=db.Column(db.String(64),unique=True,index=True)
    role_id=db.Column(db.Integer,db.ForeignKey('roles.id'))

    def __repr__(self):
    return '<User %r>'%self.username

然后可以在python shell下执行db.create_all()方法在数据库中创建两个数据表。

问题四:在使用模板引擎时正文采用中文会出现utf8 decode error的错误出现

原因是模板文件的编码不对,最好采用utf8无BOM的格式。在notepad++ 中默认编码是ANSI,需要改过来。

浅说Python 中的函数式编程

函数式编程

函数式编程思想有着50多年的历史,最古老的算是Lisp语言了。如今众多语言都在一定程度上支持函数式编程,如Python、Ruby、Javascript,而其他一些语言如Java、PHP都增加了对于匿名函数的支持,可以看到函数编程的思想还很受欢迎。
下面一段话引自知乎,原文地址

函数式编程是一种编程范式,常见的编程范式有命令式编程,函数式编程,逻辑式编程,值得一提的是面向对象编程也是一种命令式编程。命令式编程是面向计算机硬件的抽象,有变量(对应着存储单元),赋值语句(获取及存储指令),表达式(内存引用和算术运算)和控制语句(跳转指令),总之,命令式程序就是一个冯诺依曼机的指令序列。而函数式编程是面向数学的抽象,将计算描述为一种表达式求值,一句话,函数式程序就是一个表达式。
函数式编程中的函数并不是指计算机中的函数,而是指数学中的函数,即自变量的映射,即一个函数的值仅决定于函数参数的值,不依赖其他状态。在函数式语言中,函数作为一等公民,可以在任何地方定义,可以作为函数的参数和返回值,可以对函数进行组合。纯函数式编程语言中的变量也不是命令式编程中的变量,即存储状态的单元,而是代数中的变量,即一个值的名称。变量的值是不可变的,即不允许像命令式编程语言中的那样多次给一个变量赋值。严格意义上的函数式编程意味着不使用可变的变量,赋值,循环和其他命令式控制结构进行编程。

函数式编程的好处
引用透明、没有副作用,用知乎上答主的说法就是

一个好处是,函数既不依赖外部的状态也不修改外部的状态,这使得单元测试和调试更加容易。另一个好处是由于多个线程间不共享状态,因而可以避免发生死锁,可以很好地实现并发。
除此以外,函数式语言还具有以下特性:

  • 高阶函数(Higher-order function)
  • 偏应用函数(Partially Applied Functions)
  • 柯里化(Currying)
  • 闭包(Closure)

Python 中的函数式编程

Python中对函数式编程语言的支持主要集中在提供了map(),reduce(),filter()三个函数和lambda算子,让人吃惊的是仅凭这几个函数就几乎可以实现任意的Python程序。
关于这几个函数的使用,参见我的另外一篇博客,上面有对这几个函数的介绍。

下面的内容参考自这个网址
替换条件控制语句
对于下面的条件控制语句

1
2
3
4
5
6
if condition1:
func1()
elif contidion2:
func2()
else:
func3()

的结构,替换成下面的表达式

1
(condition1 and func1()) or (condition2 and func2()) or (func3())

下面是一个例子

1
2
3
4
5
6
7
8
9
>>> pr=lambda p:p
>>> print_number=lambda x:(x==1 and pr("it's one")) or (x==2 and pr("it's two")) or (pr("it's something else"))
>>> print_number(1)
"it's one"
>>> print_number(2)
"it's two"
>>> print_number(3)
"it's something else"
>>>

可以看到采用这种方式可以实现条件控制的替换。

替换循环控制语句

  • 替换for循环
    由于和map函数使用相同,所以可以直接用map函数来实现for循环,看下面的例子

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    >>> cubic=lambda x: x**3
    >>>
    >>>
    >>> for x in range(5):
    ... cubic(x)
    ...
    0
    1
    8
    27
    64
    >>> map(cubic,range(5))
    [0, 1, 8, 27, 64]
    >>>
  • 替换while循环
    相较而言,替换while循环稍稍麻烦一些,看下面的例子

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    # statement-based while loop
    while <condition>:
    <pre-suite>
    if <break_condition>:
    break
    else:
    <suite>

    # Equivalent FP-style recursive while loop
    def while_block():
    <pre-suite>
    if <break_condition>:
    return 1
    else:
    <suite>
    return 0

    while_FP = lambda: <condition> and (while_block() or while_FP())
    while_FP()

函数式while_FP循环采用了递归的概念。当为true时,进入循环体,执行while_block();若为true时,返回1,while_FP()调用结束;若为false时,返回0,会继续执行or右侧的while_FP(),从而实现递归调用;若始终为false,则会持续递归调用while_FP(),这就实现了while语句中同样的功能。

看一个例子,下面的代码实现输入回显的echo功能,对比一下命令式编程和函数式编程。

1
2
3
4
5
6
7
8
9
#命令式编程版本
def echo_IMP():
while 1:
x = raw_input("IMP -- ")
print x
if x == 'quit':
break

echo_IMP()

下面是函数式编程版本

1
2
3
4
5
6
def monadic_print(x):
print x
return x

echo_FP = lambda: monadic_print(raw_input("FP -- "))=='quit' or echo_FP()
echo_FP()

再来看个例子,该例子实现找出笛卡尔积元组集合中元素之积大于25的所有元组。

1
2
3
4
5
6
7
bigmuls = lambda xs,ys: filter(lambda (x,y):x*y > 25, combine(xs,ys))
combine = lambda xs,ys: map(None, xs*len(ys), dupelms(ys,len(xs)))
dupelms = lambda lst,n: reduce(lambda s,t:s+t, map(lambda l,n=n: [l]*n, lst))

print bigmuls([1,2,3,4],[10,15,3,22])

[(3, 10), (4, 10), (2, 15), (3, 15), (4, 15), (2, 22), (3, 22), (4, 22)]

Python 中的属性函数

前言

Python中的属性函数可以实现一些有用的功能,例如将类方法转换为只读属性、重新实现一个属性的setter和getter方法。下面进入正题。

将类方法转为只读属性

下面的例子参考这篇文章
对于下面的一个类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Person(object):
""""""

#----------------------------------------------------------------------
def __init__(self, first_name, last_name):
"""Constructor"""
self.first_name = first_name
self.last_name = last_name

#----------------------------------------------------------------------
@property
def full_name(self):
"""
Return the full name
"""

return "%s %s" % (self.first_name, self.last_name)

通过将full_name设置为只读属性,外接可以像调用普通的属性一样使用full_name,例如下面这样

1
2
3
4
5
6
7
8
9
>>> person = Person("Mike", "Driscoll")
>>> person.full_name
'Mike Driscoll'
>>> person.first_name
'Mike'
>>> person.full_name = "Jackalope"
Traceback (most recent call last):
File "<string>", line 1, in <fragment>
AttributeError: can't set attribute

可以正常地访问它,但是无法设置属性,因为没有相应的setter方法,想要改变,只能通过修改first_name属性或者last_name属性的值来实现。下面看另外一个property的用处,即取代getter和setter方法。

实现属性的getter和setter方法

熟悉Java编程的人肯定对getter和setter并不陌生,对于类中的每一个属性,对外要提供getter和setter方法,如果用这种方式写Python代码,显得优点不专业,不光代码冗长,而且影响美观。采用property的方式可以改善这一问题。下面先看第一个版本的property改写示例。
假设现在有一个类Fees,它有一个属性_fee,如果模仿其他语言的方式提供getter和setter方法的话就是这样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from decimal import Decimal

class Fees(object):

def __init__(self):
"""Constructor"""
self._fee = None

def get_fee(self):
"""
The getter
"""

return self._fee

def set_fee(self, value):
"""
The setter
"""

if isinstance(value, str):
self._fee = Decimal(value)
elif isinstance(value, Decimal):
self._fee = value

#----------------------------------------------------------------------
if __name__ == "__main__":
f = Fees()
f.set_fee("100")
f.get_fee()

这样表面看起来挺不错的。上面本意是检查输入数据是不是数值类型或者是可以转换为数值的字符串,如果是则允许,否则不允许设置。但是如果用直接访问属性的方式来设定属性的值,那么对_fee的验证就不起作用了。看下面的使用过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
>>> f.set_fee("200")
>>> f.get_fee()
Decimal('200')

#看起来还不错
>>> f.set_fee("haha")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 24, in set_fee
File "C:\Users\wenqiang\Anaconda\lib\decimal.py", line 547, in __new__
"Invalid literal for Decimal: %r" % value)
File "C:\Users\wenqiang\Anaconda\lib\decimal.py", line 3873, in _raise_error
raise error(explanation)
decimal.InvalidOperation: Invalid literal for Decimal: 'haha'

#但是下面这样就不行了
>>> f._fee="haha"
>>> f.get_fee()
'haha'

下面通过property来实现getter和setter的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from decimal import Decimal

class Fees(object):

def __init__(self):
"""Constructor"""
self._fee = None

@property
def fee(self):
"""
The fee property - the getter
"""

return self._fee

@fee.setter
def fee(self, value):
"""
The setter of the fee property
"""

if isinstance(value, str):
self._fee = Decimal(value)
elif isinstance(value, Decimal):
self._fee = value

#----------------------------------------------------------------------
if __name__ == "__main__":
f = Fees()

这回看一下使用起来如何

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
>>> f=Fees()
>>> f.fee="1"
>>> f.fee
Decimal('1')
>>> f.fee="haha"
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in fee
File "C:\Users\wenqiang\Anaconda\lib\decimal.py", line 547, in __new__
"Invalid literal for Decimal: %r" % value)
File "C:\Users\wenqiang\Anaconda\lib\decimal.py", line 3873, in _raise_error
raise error(explanation)
decimal.InvalidOperation: Invalid literal for Decimal: 'haha'
#哈哈,还是被异常修改了
>>> f._fee="haha"
>>> f.fee
'haha'
>>>

通过以上对比,发现这种使用方式相对于getter和setter来说并没有多大改进,别急,看一下版本2中的改进。
版本二,专门定义一个类,将这个类应用到多个属性上。下面举个例子说明一下,例子来自于这篇文章

看一下这个例子使用第一个版本的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class Movie(object):
def __init__(self, title, rating, runtime, budget, gross):
self._rating = None
self._runtime = None
self._budget = None
self._gross = None

self.title = title
self.rating = rating
self.runtime = runtime
self.gross = gross
self.budget = budget

#nice
@property
def budget(self):
return self._budget

@budget.setter
def budget(self, value):
if value < 0:
raise ValueError("Negative value not allowed: %s" % value)
self._budget = value

#ok
@property
def rating(self):
return self._rating

@rating.setter
def rating(self, value):
if value < 0:
raise ValueError("Negative value not allowed: %s" % value)
self._rating = value

#uhh...
@property
def runtime(self):
return self._runtime

@runtime.setter
def runtime(self, value):
if value < 0:
raise ValueError("Negative value not allowed: %s" % value)
self._runtime = value

#is this forever?
@property
def gross(self):
return self._gross

@gross.setter
def gross(self, value):
if value < 0:
raise ValueError("Negative value not allowed: %s" % value)
self._gross = value

def profit(self):
return self.gross - self.budget

可以看到,对于多个属性这样设置仍然很麻烦,下面看一下版本2的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from weakref import WeakKeyDictionary

class NonNegative(object):
"""A descriptor that forbids negative values"""
def __init__(self, default):
self.default = default
self.data = WeakKeyDictionary()

def __get__(self, instance, owner):
# we get here when someone calls x.d, and d is a NonNegative instance
# instance = x
# owner = type(x)
return self.data.get(instance, self.default)

def __set__(self, instance, value):
# we get here when someone calls x.d = val, and d is a NonNegative instance
# instance = x
# value = val
if value < 0:
raise ValueError("Negative value not allowed: %s" % value)
self.data[instance] = value

class Movie(object):

#always put descriptors at the class-level
rating = NonNegative(0)
runtime = NonNegative(0)
budget = NonNegative(0)
gross = NonNegative(0)

def __init__(self, title, rating, runtime, budget, gross):
self.title = title
self.rating = rating
self.runtime = runtime
self.budget = budget
self.gross = gross

def profit(self):
return self.gross - self.budget

这里结合了访问描述符,当解释器遇到对属性的访问时,如果是输出,就会将该属性看作一个带有get方法的描述符;如果是赋值操作,Python就会识别出该属性是一个带有set方法的描述符,从而执行Nonnegative类中的set方法。关于描述符的更多介绍,推荐看一下这篇文章
下面我们通过这种方式再来试试可不可以随意修改属性

1
2
3
4
5
6
7
8
9
10
11
12
>>> m = Movie('Casablanca', 97, 102, 964000, 1300000)
>>> print m.budget # calls Movie.budget.__get__(m, Movie)
964000
>>> m.budget=300
>>> m.budget
300
>>> m.budget=-24
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 16, in __set__
ValueError: Negative value not allowed: -24
>>>

这里面在Nonegative类中对数值是否为负做了限定,通过直接访问属性的方法无法再修改属性的值了,原理上面说过了。不过对于多个属性执行不同逻辑的getter和setter功能时,一个访问描述符就不足以解决问题了,就要引入多个,所以要看问题的具体情况合理使用。

Python 中@classmethod和@staticmethod的区别

前言

这是stackoverflow上面的一个问题,原文地址What is the difference between @staticmethod and @classmethod in Python?,现将其翻译为中文,放在下面,仅供参考。

正文

问题:函数用@classmethod装饰和用@staticmethod装饰有什么区别?
回答
回答一:unutbu:先举一个例子,看下面的代码,注意foo,class_foo,static_foo调用时的特征。

1
2
3
4
5
6
7
8
9
10
11
12
13
class A(object):
def foo(self,x):
print "executing foo(%s,%s)"%(self,x)

@classmethod
def class_foo(cls,x):
print "executing class_foo(%s,%s)"%(cls,x)

@staticmethod
def static_foo(x):
print "executing static_foo(%s)"%x

a=A()

下面是一个对象调用方法的常用方式,对象实例a作为第一个参数被隐式地传递过去。

1
2
a.foo(1)
# executing foo(<__main__.A object at 0xb7dbef0c>,1)

如果使用@classmethod,类对象实例的类型作为第一个参数被隐式地传递进来而非通过self的方式。

1
2
a.class_foo(1)
# executing class_foo(<class '__main__.A'>,1)

也可以通过调用class_foo来使用类,事实上,如果你把一个东西定义成classmethod,那很有可能你试图通过类而非类的实例来调用它。A.foo(1)会引发TypeError异常,而使用A.class_foo(1)则可以正确运行。

1
2
A.class_foo(1)
# executing class_foo(<class '__main__.A'>,1)

还有一种用途就是利用classmethod来创建可继承、可替代的构造器

使用staticmethod,self(对象实例)或者cls(类型)都没有作为第一个参数进行传递。它看起来和普通的函数没什么区别,除非你能从一个类的实例或者类本身调用它。

1
2
3
4
5
a.static_foo(1)
# executing static_foo(1)

A.static_foo('hi')
# executing static_foo(hi)

staticmethod用来对类与类间有关联的函数进行分组。

foo只是一个函数,但是当你调用a.foo时你并不会得到这个函数,你会得到该函数的一个“局部适用”版本,并且将其与对象实例a绑定起来。foo需要两个参数,而a.foo只需要一个参数。
a和foo发生了绑定,下面这个例子简述了绑定的含义

1
2
print(a.foo)
# <bound method A.foo of <__main__.A object at 0xb7d52f0c>>

如果使用a.class_foo,则a不会和class_foo发生绑定,而是类A和class_foo发生了绑定

1
2
print(a.class_foo)
# <bound method type.class_foo of <class '__main__.A'>>

使用staticmethod,虽然它是一个方法,但是a.static_foo只是返回一个函数而没有参数的绑定。static_foo需要一个参数,a.static_foot同样需要一个参数。

1
2
print(a.static_foo)
# <function static_foo at 0xb7d479cc>

类似,使用A.static_foo时也会有同样的效果

1
2
print(A.static_foo)
# <function static_foo at 0xb7d479cc>

Python 文档随译--multiprocessing模块

前言

Python官方文档以前没有认真读,现在仔细读一遍,并且做一下随译(就是说翻译地比较随意 呵呵)。水平有限,翻译可能存在问题,请自行斟酌。
本文对应的官方文档链接:multiprocessing

正文

简介

multiprocessing模块支持用和Treading模块API相似的语法生成进程。该模块支持本地和远程并发,利用subprocesses而不是threads,有效地避开了GIL的限制。该模块允许程序员在机器上充分利用多个CPU,该模块支持Windows和Unix平台。

multiprocessing模块同样提供了Threading模块所没有的,例如它提供的Pool对象,该对象具有一些方法,它可以让多个输入数据并行执行一个函数,实际上是通过将数据分配到每个进程中。下面是一个使用Pool对象实现并行化的简单例子。

1
2
3
4
5
6
7
8
from multiprocessing import Pool

def f(x):
return x*x

if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))

输出为

1
[1, 4, 9]

Process类

在multiprocessing模块中,可以通过Process对象创建多个进程对象,并且通过start方法来调用它们。Process采用threading.Thread,下面的例子是一个简单的多进程的例子

1
2
3
4
5
6
7
8
9
from multiprocessing import Process

def f(name):
print 'hello', name

if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()

想要显示出每个进程的ID,可以参照下面的扩展例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Process
import os

def info(title):
print title
print 'module name:', __name__
if hasattr(os, 'getppid'): # only available on Unix
print 'parent process:', os.getppid()
print 'process id:', os.getpid()

def f(name):
info('function f')
print 'hello', name

if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()

进程间交换对象

multiprocessing支持两种类型的对象作为进程间的交换通道。

Queue
Queue类几乎就是Queue.Queue类的克隆,看下面的例子:

1
2
3
4
5
6
7
8
9
10
11
from multiprocessing import Process, Queue

def f(q):
q.put([42, None, 'hello'])

if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()

需要注意,Queue既是线程安全的,同时又是进程安全的。

Pipes
pipe方法返回一对连接对象,它们是管道的两端,默认是双向的。下面是个例子

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process, Pipe

def f(conn):
conn.send([42, None, 'hello'])
conn.close()

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv() # prints "[42, None, 'hello']"
p.join()

pipe函数返回的两个连接对象表示管道的两端,每个连接对象都有send()和recv()方法。注意如果管道的两端同时从对方接收或者同时向对方发送数据则会导致数据被破坏。

进程同步

multiprocessing提供了和threading模块等价的同步原语。例如我们可以使用锁来确保某一时刻只有一个进程向终端输出信息。

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process, Lock

def f(l, i):
l.acquire()
print 'hello world', i
l.release()

if __name__ == '__main__':
lock = Lock()

for num in range(10):
Process(target=f, args=(lock, num)).start()

如果不使用锁这些信息就会混杂在一起。

进程间共享状态

我们在进行并发编程时最好避免分享状态,再使用多进程时尤其要注意。
如果非要这么做,可以参照下面给出的 两种方式
共享内存
可以用Value或者Array类在共享存储区存储数据,例如下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Process, Value, Array

def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]

if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))

p = Process(target=f, args=(num, arr))
p.start()
p.join()

print num.value
print arr[:]

程序将会打印如下的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
```
参数d和i在创建num和arr时被array模块当做字节码使用,d表明是一个双精度浮点数,而i则指明一个有符号的整数类型。这些共享对象是线程和进程安全的。
想要更灵活得使用共享内存,可以使用multiprocessing.sharedctypes,它支持从共享内存创建专门的ctype对象。

##服务进程##
由Manager()方法返回的manager对象可以用于控制一个服务进程,它能管理Python对象并且允许其他进程通过代理操作这些数据。
由Manager()方法返回的manager对象支持以下这些类型:list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value and Array。下面举个例子
``` Python
from multiprocessing import Process, Manager

def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()

if __name__ == '__main__':
manager = Manager()

d = manager.dict()
l = manager.list(range(10))

p = Process(target=f, args=(d, l))
p.start()
p.join()

print d
print l

以上代码将打印出

1
2
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务进程管理相对于共享内存具有更高的灵活性因为它支持Python中原生的数据类型;同时一个manager对象可以为网络上不同计算机上的进程所共享,但缺点是比共享内存要慢。

使用workers池

Pool类表示一个工作进程池,它提供了几种不同的方法将任务分发到各个进程。例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
return x*x

if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes

# print "[0, 1, 4,..., 81]"
print pool.map(f, range(10))

# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print i

# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print res.get(timeout=1) # prints "400"

# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print res.get(timeout=1) # prints the PID of that process

# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print [res.get(timeout=1) for res in multiple_results]

# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print res.get(timeout=1)
except TimeoutError:
print "We lacked patience and got a multiprocessing.TimeoutError"

注意:pool的方法绝对不能被创建它的进程所使用。
注意:这个包下面的功能需要确保main模块对于子进程是可以导入的。这意味着在一些情况下程序不能正常工作,例如在交互模式下,看下面的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

Python Queue介绍

前言

Python中的Queue实现了一个同步队列,并且该类实现了所有需要的锁原语。Queue实现了三种队列:普通的FIFO队列(Queue)、LIFO队列(LifoQueue)、优先级队列(PriorityQueue)。其使用方法类似,下面以普通的先进先出队列Queue为例谈一下Queue中的主要方法

Queue中的方法及使用

使用Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#引入Queue类
from Queue import Queue

#得到队列的大小
Queue.qsize()

#判断队列是否为空
Queue.empty()

#判断队列是否已满
Queue.full()

#从队列头获取元素,默认为阻塞
Queue.get([block[,timeout]])

#从队列头获取元素,非阻塞方式
Queue.get_nowait()
#或者
Queue.get(block=False)

#阻塞写入队列
Queue.put(item)

#非阻塞写入队列
Queue.put_nowait(item)
#或者
Queue.put(item,block=False)

#向队列中已完成的元素发送join信号
Queue.task_done()

上面从队列中获取元素和向队列中添加元素都有阻塞和非阻塞的方式,采用阻塞方式,如果从队列中取元素而元素为空,则线程会停下来等待知道队列中有元素可以取出;如果向队列中添加元素而此时队列已满,则同样线程会停下来直到停止。如果采用非阻塞方式,取元素时一旦队列为空,则会引发Empty异常,放元素时一旦队列已满,就会引发Full异常。

下面是采用Queue实现的经典生产者消费者问题的代码,来源:Python模块之Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from Queue import Queue
import random
import threading
import time

#Producer thread
class Producer(threading.Thread):
def __init__(self, t_name, queue):
threading.Thread.__init__(self, name=t_name)
self.data=queue
def run(self):
for i in range(5):
print "%s: %s is producing %d to the queue!" %(time.ctime(), self.getName(), i)
self.data.put(i)
time.sleep(random.randrange(10)/5)
print "%s: %s finished!" %(time.ctime(), self.getName())

#Consumer thread
class Consumer(threading.Thread):
def __init__(self, t_name, queue):
threading.Thread.__init__(self, name=t_name)
self.data=queue
def run(self):
for i in range(5):
val = self.data.get()
print "%s: %s is consuming. %d in the queue is consumed!" %(time.ctime(), self.getName(), val)
time.sleep(random.randrange(10))
print "%s: %s finished!" %(time.ctime(), self.getName())

#Main thread
def main():
queue = Queue()
producer = Producer('Pro.', queue)
consumer = Consumer('Con.', queue)
producer.start()
consumer.start()
producer.join()
consumer.join()
print 'All threads terminate!'

if __name__ == '__main__':
main()

最后附上Queue模块的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
"""A multi-producer, multi-consumer queue."""

from time import time as _time
try:
import threading as _threading # 导入threading模块
except ImportError:
import dummy_threading as _threading # 该模块的接口和thread相同,在没有实现thread模块的平台上提供thread模块的功能。
from collections import deque # https://github.com/BeginMan/pythonStdlib/blob/master/collections.md
import heapq # 堆排序 https://github.com/qiwsir/algorithm/blob/master/heapq.md

__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] # 模块级别暴露接口


class Empty(Exception):
"""当调用Queue.get(block=0)/get_nowait()时触发Empty异常

调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。
如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。
如果队列为空且block为False,队列将引发Empty异常
"""

pass


class Full(Exception):
"""当调用Queue.put(block=0)/put_nowait()时触发Full异常

如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。
如果block为0,put方法将引发Full异常。
"""

pass


class Queue:
"""创建一个给定的最大大小的队列对象.

FIFO(先进先出)队列, 第一加入队列的任务, 被第一个取出
If maxsize is <= 0, the queue size is 无限大小.
"""

def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize) # 初始化queue为空

# 所有获取锁的方法必须在返回之前先释放,互斥锁在下面三个Condition条件共享
# 从而获取和释放的条件下也获得和释放互斥锁。
self.mutex = _threading.Lock() # Lock锁

# 当添加queue元素时通知`not_empty`,之后线程等待get
self.not_empty = _threading.Condition(self.mutex) # not_empty Condition实例

# 当移除queue元素时通知`not_full`,之后线程等待put.
self.not_full = _threading.Condition(self.mutex) # not_full Condition实例

# 当未完成的任务数为0时,通知`all_tasks_done`,线程等待join()
self.all_tasks_done = _threading.Condition(self.mutex) # all_tasks_done Condition实例
self.unfinished_tasks = 0

def task_done(self):
"""表明,以前排队的任务完成了

被消费者线程使用. 对于每个get(),随后调用task_done()告知queue这个task已经完成
"""

self.all_tasks_done.acquire()
try:
# unfinished_tasks 累减
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
# 调用多次task_done则触发异常
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all() # 释放所有等待该条件的线程
self.unfinished_tasks = unfinished
finally:
self.all_tasks_done.release()

def join(self):
"""阻塞直到所有任务都处理完成
未完成的task会在put()累加,在task_done()累减, 为0时,join()非阻塞.
"""

self.all_tasks_done.acquire()
try:
# 一直循环检查未完成数
while self.unfinished_tasks:
self.all_tasks_done.wait()
finally:
self.all_tasks_done.release()

def qsize(self):
"""返回队列的近似大小(不可靠!)"""
self.mutex.acquire()
n = self._qsize() # len(queue)
self.mutex.release()
return n

def empty(self):
"""队列是否为空(不可靠)."""
self.mutex.acquire()
n = not self._qsize()
self.mutex.release()
return n

def full(self):
"""队列是否已满(不可靠!)."""
self.mutex.acquire()
n = 0 < self.maxsize == self._qsize()
self.mutex.release()
return n

def put(self, item, block=True, timeout=None):
"""添加元素.

如果可选参数block为True并且timeout参数为None(默认), 为阻塞型put().
如果timeout是正数, 会阻塞timeout时间并引发Queue.Full异常.
如果block为False为非阻塞put
"""

self.not_full.acquire()
try:
if self.maxsize > 0:
if not block:
if self._qsize() == self.maxsize:
raise Full
elif timeout is None:
while self._qsize() == self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while self._qsize() == self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)

self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
finally:
self.not_full.release()

def put_nowait(self, item):
"""
非阻塞put
其实就是将put第二个参数block设为False
"""

return self.put(item, False)

def get(self, block=True, timeout=None):
"""移除列队元素并将元素返回.

block = True为阻塞函数, block = False为非阻塞函数. 可能返回Queue.Empty异常
"""

self.not_empty.acquire()
try:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
finally:
self.not_empty.release()

def get_nowait(self):
"""
非阻塞get()
也即是get()第二个参数为False
"""

return self.get(False)

# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held

# 初始化队列表示
def _init(self, maxsize):
self.queue = deque() # 将queue初始化为一个空的deque对象

def _qsize(self, len=len): # 队列长度
return len(self.queue)

# Put a new item in the queue
def _put(self, item):
self.queue.append(item)

# Get an item from the queue
def _get(self):
return self.queue.popleft()


class PriorityQueue(Queue):
"""
继承Queue
构造一个优先级队列
maxsize设置队列大小的上界, 如果插入数据时, 达到上界会发生阻塞, 直到队列可以放入数据.
当maxsize小于或者等于0, 表示不限制队列的大小(默认).
优先级队列中, 最小值被最先取出
"""


def _init(self, maxsize):
self.queue = []

def _qsize(self, len=len):
return len(self.queue)

def _put(self, item, heappush=heapq.heappush):
heappush(self.queue, item)

def _get(self, heappop=heapq.heappop):
return heappop(self.queue)


class LifoQueue(Queue):
"""
构造一LIFO(先进后出)队列
maxsize设置队列大小的上界, 如果插入数据时, 达到上界会发生阻塞, 直到队列可以放入数据.
当maxsize小于或者等于0, 表示不限制队列的大小(默认)
"""

def _init(self, maxsize):
self.queue = []

def _qsize(self, len=len):
return len(self.queue)

def _put(self, item):
self.queue.append(item)

def _get(self):
return self.queue.pop() # 与Queue相比,仅仅是 将popleft()改成了pop()

Python 并发编程系列--4、多进程编程方法

上一篇直通车 Python 并发编程系列–3、多线程编程方法

前言

上一篇介绍了多线程编程,这一篇介绍多进程编程,当然应用场景是CPU密集型任务。

分类

都是采用multiprocessing模块,只不过应用的类和方法不同

  • 使用Pool对象的map方法
  • 使用Pool对象的apply_async方法
  • 使用Process类

下面分别介绍

使用Pool对象的map方法

上一篇类似,multiprocessing模块的Pool对象具有map方法,应用map方法可以很便捷地实现多线程程序。下面给出两个例子,分别来自python多进程(三种方法)一行Python实现并行化
首先看第一个例子,使用多线程计算0到9的平方

1
2
3
4
5
6
7
8
9
10
11
12
13
#coding:utf-8

from multiprocessing import Pool
import time

def f(x):
time.sleep(1)
print x
return x*x

if __name__=='__main__':
p=Pool(5)
print (p.map(f,range(10)))

代码运行结果

1
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

可以看到列表是按照顺序打印的,这正是采用这种方式执行的好处:既实现了多进程,又保证了程序的执行顺序。

另一个例子,是生成上千张图片的缩略图,单线程版本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import os 
import PIL

from multiprocessing import Pool
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if 'jpeg' in f)

def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)

if __name__ == '__main__':
folder = os.path.abspath(
'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

images = get_image_paths(folder)

for image in images:
create_thumbnail(Image)

在原文作者的机器上,处理6000章图片需要27.9秒。下面是使用map方法后的多进程版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import os 
import PIL

from multiprocessing import Pool
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if 'jpeg' in f)

def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)

if __name__ == '__main__':
folder = os.path.abspath(
'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

images = get_image_paths(folder)

pool = Pool()
pool.map(creat_thumbnail, images)
pool.close()
pool.join()

在原文作者的机器上,这次运行只用了5.6秒,可见速度大大提升!
注意:在使用这种方法时可能存在一些影响程序性能甚至影响程序正常执行的隐患所在,因此最好保证业务逻辑库的引用以及一些业务逻辑代码不放在Pool的map方法中,最好放在map里调用的函数中,详细信息见这个讨论

使用Pool的apply_async方法

下面是前文提到的博客中的代码,同样完成计算0到9的平方,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#coding:utf-8

from multiprocessing import Pool
import time

def f(x):
print x*x
time.sleep(1)
return x*x

if __name__=='__main__':
p=Pool(processes=5)
res_list=[]
for i in range(10):
res=p.apply_async(f,[i,])
res_list.append(res)

for item in res_list:
print item.get(),

运行结果同上,仍为有序序列。不过不同的是上一次是执行完成后返回列表,而这一次一边计算一边显示(这正是使用apply_async方法的特点,函数调用是异步的)。想要详细观察效果的话建议读者在本地机器上运行比较。

使用Process类

下面这个例子是使用multiprocessing模块中的Process类来实现的多进程,实例通过打印主进程和从进程的pid来验证属于两个进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#coding:utf-8

from multiprocessing import Process
import os

def info(title):
print title
print 'module_name:',__name__
if hasattr(os,'getppid'):#仅在Unix主机上才有此属性
print 'parent process:',os.getppid()
print 'process id:',os.getpid()

def f(name):
info('funcion f')
print 'hello',name

if __name__=='__main__':
info('main line')
p=Process(target=f,args=('bob',))
p.start()
p.join()

结果如下

1
2
3
4
5
6
7
main line
module_name: __main__
process id: 11688
funcion f
module_name: __main__
process id: 10520
hello bob

通过打印出的结果可以看到通过Process对象成功创建了一个子进程。

题外话

下面谈一些周边的内容,首先通过实例验证一下多进程数据区域的独立性。

1
2


第一次运行结果

1
2
3
4
5
6
7
8
9
10
[0[]1[
]2
[][5]
[3
4]]
[
6]
[7[8]
]
[9]

第二次运行结果

1
2
3
4
5
6
7
8
9
10
[[01[3]
][]
2[
[5]
]4
][
6[]7
[]8
][
9]

下一篇预告

下一篇会谈到协程,至于什么时候就不知道了2333

Python 并发编程系列--3、多线程编程方法

上一篇直通车 Python 并发编程系列–2、并发方式的选择

前言

根据前面的应用背景,这里介绍Python多线程编程时实例应用于网络爬虫–纯的I/O密集型任务。

多线程实现方式

多线程在Python中有不止一种实现方式,可以通过继承Threading类,也可以单纯使用Thread类的方法,不同之处是前者是后者的封装和改进,一般用前者,因为其更加方便,当然偶尔存在需要使用Thread类才能完成的任务(如对子线程生命周期的控制,设置Threading的setDaemon方法可能不够用)。今天看到了一篇译文:《一行 Python 实现并行化 – 日常多线程操作的新思路》,当然也可以参照原作者 Chris的文章,上面提到了使用map函数实现更为简洁的线程并发方式。本文对这几种分别进行介绍。

方法一:使用Thread类的方法

这种方式最为”原始”,最简单的使用方式是直接在需要的时候调用Thread类的start_new_thread方法,将需要并行执行的函数及其需要的参数传进去就可以了。

方法二:使用Threading结合Queue使用

Python中的Queue模块实现了线程安全的队列结构(Queue中使用了Threading中提供的锁实现了同步与互斥),因此使用Queue就不用担心饥饿或者死锁问题的发生了。下面是一个较为通用的I/O多线程书写模板,参照网上看到的一篇文章:Python爬虫(六)–多线程续(Queue)。关于Queue模块的介绍参见博客的另一篇文章Python Queue介绍,下面直接介绍多线程结合Queue的使用方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#coding:utf-8
import threading
import Queue

SHARE_Q=Queue.Queue()
#线程个数
_WORKED_THERAD_NUM=3

class MyThread(threading.Thread):
'''
其中func为线程函数的逻辑部分
'''


def __init__(self,func):
super(MyThread,self).__init__()
self.func=func

def run(self):
'''
重写基类的run方法
'''

self.func()

def do_something(item):
'''
运行逻辑
'''

print item

def worker():
'''
用于写工作逻辑,只要队列不空就持续处理,为空时检查队列。
由于Queue中已经包含了wait notify 和响应的锁,所以不需要在取任务或者放任务的时候
加锁或解锁
'''


global SHARE_Q
while not SHARE_Q.empty():
item=SHARE_Q.get()
do_something(item)
SHARE_Q.task_done()

def main():
global SHARE_Q
threads=[]

for task in xrange(5):
SHARE_Q.put(task)

for i in xrange(_WORKED_THREAD_NUM):
thread=MyThread(worker)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()

SHARE_Q.join()

if __name__=='__main__':
main()

注意上面有一处和原文写的不一样,在worker函数内,原文书写方式为

1
2
3
4
5
while True:
if not SHARE_Q.empty():
item=SHARE_Q.get()
do_something(item)
SHARE_Q.task_done()

实际使用时采取这种的方式会导致程序无法结束,怀疑是程序无法退出while循环(这块没有仔细调试,所以不能确定)。改成上面代码处的方式程序可以正常中止。这种方式可以作为一个书写的模板,在do_something函数内实现自己的逻辑,然后添加必要的变量或者控制语句,就可以实现比较简单的多线程代码。

方法三:使用map函数和multiprocessing模块

前面谈到了这是在别的博客中见到的一个比较简洁的多线程实现,刚好利用了map函数的功能。关于map函数的功能,可以看我的另一篇博客 Python 函数,里面讲到了map函数的用法。下面直接看map函数的效果。
先看一下map的实现效果,下面是用map对一系列url进行访问,并将结果放到列表results中返回。

1
results=map(urllib2.urlopen,['https://www.baidu.com','http://cn.bing.com'])

相当于下面的操作

1
2
3
4
urls=['https://www.baidu.com','http://cn.bing.com']
results=[]
for url in urls:
results.append(urllib2.urlopen(url))

map函数一手包办了序列操作、参数传递和结果保存等一系列的操作。借鉴这一点,选择multiprocessing模块中的子模块就可以实现线程并发。
在Python中由两个库包含了map函数,multiprocessing和它的子模块multiprocessing.dummy,前者用在多进程,而后者用于多线程。两者都有pool对象(线程池或者进程池)。使用两个pool的对象方法如下:

1
2
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

Pool对象有一些参数,第一个参数用于设定线程池中的线程数,默认为CPU的核数。线程不够,优势体现得不明显;线程过多,导致线程切换花的时间太长,划不来。在实际使用中最好根据需求人为设定。下面是针对上一个问题的多线程版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls=[
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
'http://planet.python.org/',
'https://wiki.python.org/moin/LocalUserGroups',
'http://www.python.org/psf/',
'http://docs.python.org/devguide/',
'http://www.python.org/community/awards/'
]

#设定池子的大小
pool=ThreadPool(4)

results=pool.map(urllib2.urlopen,urls)
pool.close()
pool.join()

这里面核心只有4行代码,关键只有第23行这一行。而下面则是IBM经典教程中使用模型的解决方案,足足40多行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
'''
A more realistic thread pool example
'''


import time
import threading
import Queue
import urllib2

class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue

def run(self):
while True:
content = self._queue.get()
if isinstance(content, str) and content == 'quit':
break
response = urllib2.urlopen(content)
print 'Bye byes!'


def Producer():
urls = [
'http://www.python.org', 'http://www.yahoo.com'
'http://www.scala.org', 'http://www.google.com'
# etc..
]
queue = Queue.Queue()
worker_threads = build_worker_pool(queue, 4)
start_time = time.time()

# Add the urls to process
for url in urls:
queue.put(url)
# Add the poison pillv
for worker in worker_threads:
queue.put('quit')
for worker in worker_threads:
worker.join()

print 'Done! Time taken: {}'.format(time.time() - start_time)

def build_worker_pool(queue, size):
workers = []
for _ in range(size):
worker = Consumer(queue)
worker.start()
workers.append(worker)
return workers

if __name__ == '__main__':
Producer()

相较之下,使用multiprocessing.dummpy中的pool对象的map方法要方便得多。

总结

以上三种方法只是众多多线程方法中的几种,除此之外我们当然可以使用Threading模块本身而不用Queue模块实现多线程,只是要自己实现一遍封装的过程。其实归根结底还是Thread和Threading模块,通此两者,再无难事:-)

下一篇直通车 Python 并发编程系列–4、多进程编程方法

,