- Published on
Python応用
- Authors
- Name
- Kikusan
- Refs
- データベース
- pickle
- sqlalchemy
- test
- doctest
- unittest
- pytest
- テストカバレッジ
- setuptoolsでテスト
- mock
- 並列化
- Thread
- multiprocessing
- 別サーバプロセス
- 高水準の並列化ライブラリ
- シングルスレッドの非同期処理
- socket通信
- tcpの場合
- udpの場合
- 簡単なwebサーバ
- contextlib
- io
- collections
- 正規表現re
Refs
酒井さんのpython講座の内容を自分なりにまとめたものです。
データベース
pymysqlやpsycopg2, pymongoのようなDB接続ライブラリをつかって、基本操作や生のSQLを取り扱うことができる。
NoSQL MEMO
キーバリュー型 : DBM, memcach, redis
ドキュメント型 : MongoDB
ワイドカラム型 : Hbase, Cassandra
グラフ型 : Neo4j
pickle
pickleは日本語でつけもの。データ構造を保存できる。
import pickle
class T(object):
def __init__(self, name):
self.namej = name
data = {
'a': [1,2,3],
'b': {'1': 2},
'c': T('test')
}
with open('data.pickle', 'wb') as f:
pickle.dump(data, f)
with open('data.pickle', 'rb') as f:
data_loaded = pickle.load(f)
print(data_loaded)
# {'a': [1, 2, 3], 'b': {'1': 2}, 'c': <__main__.T object at 0x00000277CDB90BB0>}
sqlalchemy
sqlalchemyはいろいろなRDBにアクセスできるラッパーライブラリ 途中でデータベースを切り替えてみたり、ORM的に使えたりする。
pip install SQLAlchemy
import sqlalchemy
import sqlalchemy.ext.declarative
import sqlalchemy.orm
# sqlalchemyのsqliteのインメモリデータベース echo=TrueでSQLが見れる
# 接続文字列はhttp://docs.sqlalchemy.org/en/latest/core/engines.html#database-urls
engine = sqlalchemy.create_engine('sqlite:///:memory:', echo=False)
"""
# sqliteファイルを利用する場合
engine = sqlalchemy.create_engine('sqlite:///test_sqlite')
"""
"""
# mysqlを利用する場合
engine = sqlalchemy.create_engine('mysql+pymysql:///test_mysql_database')
"""
# ORMの基底クラス
Base = sqlalchemy.ext.declarative.declarative_base()
# テーブルオブジェクト
class Person(Base):
__tablename__ = 'persons'
id = sqlalchemy.Column(
sqlalchemy.Integer, primary_key=True, autoincrement=True)
name = sqlalchemy.Column(sqlalchemy.String(14))
# テーブルを作成
Base.metadata.create_all(engine)
# トランザクション開始
Session = sqlalchemy.orm.sessionmaker(bind=engine)
session = Session()
# テーブルデータ
p1 = Person(name='Mike')
p2 = Person(name='Nancy')
p3 = Person(name='Jun')
# トランザクションに追加
session.add(p1)
session.add(p2)
session.add(p3)
# commit
session.commit()
# where, update
p4 = session.query(Person).filter_by(name='Mike').first()
p4.name = 'Michel'
session.add(p4)
session.commit()
# delete
p5 = session.query(Person).filter_by(name='Nancy').first()
session.delete(p5)
session.commit()
# 読み込み
persons = session.query(Person).all()
for person in persons:
print(person.id, person.name)
# 1 Michel
# 3 Jun
memcachedを使うとメモリ上にテーブルを展開した状態を維持して、DBアクセスを高速化できる。
test
doctest
class Cal(object):
def add_num_and_double(self, x, y):
"""Add and double
>>> c = Cal()
>>> c.add_num_and_double(1, 1)
5
>>> c = Cal()
>>> c.add_num_and_double("2", 1)
9
"""
result = x + y
result *= 2
return result
def add_num(x, y):
"""Add
:param x:
:param y:
:return: x + y
>>> add_num(1,1)
3
"""
return x + y
if __name__ == '__main__':
import doctest
doctest.testmod()
**********************************************************************
File "testtest.py", line 6, in __main__.Cal.add_num_and_double
Failed example:
c.add_num_and_double(1, 1)
Expected:
5
Got:
4
**********************************************************************
File "testtest.py", line 10, in __main__.Cal.add_num_and_double
Failed example:
c.add_num_and_double("2", 1)
Exception raised:
Traceback (most recent call last):
File "doctest.py", line 1329, in __run
compileflags, 1), test.globs)
File "<doctest __main__.Cal.add_num_and_double[3]>", line 1, in <module>
c.add_num_and_double("2", 1)
File "testtest.py", line 13, in add_num_and_double
result = x + y
TypeError: can only concatenate str (not "int") to str
**********************************************************************
File "testtest.py", line 24, in __main__.add_num
Failed example:
add_num(1,1)
Expected:
3
Got:
2
**********************************************************************
2 items had failures:
2 of 4 in __main__.Cal.add_num_and_double
1 of 1 in __main__.add_num
***Test Failed*** 3 failures.
unittest
import unittest
import calculation
release_name = 'test'
class CalTest(unittest.TestCase):
"""unittest.TestCaseを継承"""
def setUp(self):
"""testのセットアップ情報を登録"""
self.cal = calculation.Cal()
def tearDown(self):
"""test終了後処理"""
del self.cal
# @unittest.skipでテストを行わない
# @unittest.skip('skip!')
@unittest.skipIf(release_name=='test', 'skip!')
def test_add_num_and_double(self):
"""test_ 名前に必須"""
self.assertEqual(
self.cal.add_num_and_double(1,1), 4
)
with self.assertRaises(TypeError):
"""例外処理テストではwithを使う"""
self.cal.add_num_and_double('1', 1)
def test_add_num(self):
self.assertNotEqual(
calculation.add_num(1,1), 4
)
if __name__ == '__main__':
unittest.main()
Ran 1 test in 0.008s
OK (skipped=1)
Skipped: skip!
pytest
# まずはじめに
pip install pytest
# 実行
pytest # -s でprint表示
# collected 2 items
# test_aaa.py F.
pytestは実行dir以下のファイル名がtest_で始まるファイルを対象にテストする。
unittestのコードもpytestには含まれる。
- test_calc.py
import calculation
import pytest
is_release = True
def test_add_num_and_double():
"""test_ は必要"""
cal = calculation.Cal()
# pythonの演算子で判定できる。
assert cal.add_num_and_double(1,1) == 4
class TestCal(object):
"""クラスの先頭はTest"""
@classmethod
def setup_class(cls):
"""クラスのセットアップをしたいときはsetup_class@classmethod"""
print('start')
cls.cal = calculation.Cal()
@classmethod
def teardown_class(cls):
"""クラスの終了時処理をしたいときはteardown_class@classmethod"""
print('end')
del cls.cal
# def setup_method(self, method):
# print('setup_method={}'.format(method.__name__))
# """testmethodのセットアップ情報を登録, 引数は実行関数"""
# self.cal = calculation.Cal()
# def teardown_method(self, method):
# """testmethod終了後処理"""
# print('method={}'.format(method.__name__))
# del self.cal
# @pytest.mark.skip(reason='skip!') # skip時 pytest -rs
@pytest.mark.skipif(is_release, reason='skip!')
def test_add_num_and_double(self):
"""test_ は必要"""
assert self.cal.add_num_and_double(1,1) == 4
def test_add_num_and_double_raise(self):
with pytest.raises(TypeError):
"""例外処理テストではwithでpytest.raisesを使う"""
self.cal.add_num_and_double('1', 1)
# 引数で受け取れるものは引数名で決まっている(fixture)
# https://docs.pytest.org/en/stable/builtin.html
# requestや、tmpdirなどがある。自分で作成することもできる。
def test_config(self, request):
"""conftest.pyに書き込んだオプションを
request.config.getoptionで読み込める。"""
os_name = request.config.getoption('--os-name')
assert os_name == 'linux'
def test_fixture(self, csv_file):
"""自作fixture"""
print(csv_file)
assert True
- conftest.py
"""
conftest.pyは各ディレクトリ配下に影響する。
> pytest --os-name=windows
"""
import os
import pytest
def pytest_addoption(parser):
"""option追加関数
parser.addoptionでオプションが追加できる。
"""
parser.addoption('--os-name', default='linux', help='os name')
@pytest.fixture
def csv_file(tmpdir):
"""独自fixture
このfixtureにも↑fixtureを入れられる
"""
# yieldにすると開いたファイルをfixtureで閉じてくれる
# fixtureとしてはreturnでもいい
with open(os.path.join(tmpdir, 'test.csv'), 'w+') as c:
print('before test')
yield c
print('after test')
テストカバレッジ
どれだけテストでカバーされているかをみることができる。
pip install pytest-cov pytest-xdist
# --cov=importモジュールかフォルダパス
pytest test_calc.py --cov=calculation --cov-report term-missing
# windowsではsqlite3.dllにpathを通さないと動かなかった
# ----------- coverage: platform win32, python 3.8.5-final-0 -----------
# Name Stmts Miss Cover Missing
# ----------------------------------------------
# calculation.py 10 3 70% 27, 30-31
# ----------------------------------------------
# TOTAL 10 3 70%
missingはテストできていない行(分岐などで実行がされていない)
特に見られていなくてもいい行もあるので、Cover率は合格率をあらかじめ決める。
setuptoolsでテスト
- setup.py
from setuptools import setup
setup(
# ...
# tests_require=['pytest'], # pytestでテストするとき
test_suits='test_folder_name'
)
- setup.cfg
# pytestを使うとき
[alieas]
test=pytest
[tool:pytest]
python_files = tests/*
python setup.py test
とすれば、そのフォルダ以下のtest_XXX.py ファイルでテストしてくれる。
mock
- モック : まだできてない箇所をダミーのオブジェクトで補ってテストする仕組み
- スタブ : テスト対象オブジェクトから利用するオブジェクトがまだ完成していないとき、代替えする仕組み
- ドライバ : テスト対象を利用しようとするオブジェクトがまだ完成していないとき、代替えする仕組み
pythonのmockでは、すべてできる。
どこまでmockするかは、プロジェクトで決める。(response.json()をモック?get_response()をモック?)
- salary.py
import requests
class ThirdPartyBonusRestApi(object):
def bonus_price(self, year):
r = requests.get('http://localhost/bonus', params={'year': year})
return r.json()['price']
class Salary(object):
def __init__(self, base=100, year=2017):
self.bonus_api = ThirdPartyBonusRestApi()
self.base = base
self.year = year
def calculation_salary(self):
try:
bonus = self.bonus_api.bonus_price(year=self.year)
except ConnectionRefusedError:
bonus = 0
return self.base + bonus
- test_salary.py
import unittest
from unittest.mock import MagicMock
from unittest import mock
import salary
class TestSalary(unittest.TestCase):
def test_calculation_salary(self):
s = salary.Salary(year=2017)
# MagicMockを使うことで、関数の戻り値がreturn_valueの値に置き換わる
s.bonus_api.bonus_price = MagicMock(return_value=1)
self.assertEqual(s.calculation_salary(), 101)
# mockを使った関数が呼ばれなかったらassertError
s.bonus_api.bonus_price.assert_called()
# 1以外の回数呼ばれたらassertError
s.bonus_api.bonus_price.assert_called_once()
# 指定の引数と一緒に呼ばれていなければassertError
s.bonus_api.bonus_price.assert_called_with(year=2017)
# 呼ばれた回数
self.assertEqual(s.bonus_api.bonus_price.call_count, 1)
@mock.patch('salary.ThirdPartyBonusRestApi.bonus_price', return_value=1)
def test_calculation_salary_patch(self, mock_bonus):
"""@mock.patchによって引数にそのオブジェクトが入る"""
# やっていることはmock_bonus.return_value = 1と一緒
s = salary.Salary(year=2017)
self.assertEqual(s.calculation_salary(), 101)
mock_bonus.assert_called()
def test_calculation_salary_patch_with(self):
"""with句でもmock.patchを使える"""
with mock.patch('salary.ThirdPartyBonusRestApi.bonus_price'
, return_value=1) as mock_bonus:
"""
patcher = mock.patch(..)
mock_bonus = patcher.start()
patcher.stop() と同義
start->stop は setUp->tearDownで使える
"""
s = salary.Salary(year=2017)
self.assertEqual(s.calculation_salary(), 101)
mock_bonus.assert_called()
def test_calculation_salary_patch_side_effect(self):
"""mock_patch.side_effectでmockを別の関数にできる"""
with mock.patch('salary.ThirdPartyBonusRestApi.bonus_price'
) as mock_bonus:
def f(year):
return year + 1000
# 関数ではなく[1, ValueError]にすれば、一度目のcallで1, 二度目のcallでValueErrorが返る。
mock_bonus.side_effect = f
s = salary.Salary(year=2017)
self.assertEqual(s.calculation_salary(), 3117)
mock_bonus.assert_called()
def test_calculation_salary_patch_side_effect_error(self):
"""mock_patch.side_effectでmock関数でerrorをraiseできる"""
with mock.patch('salary.ThirdPartyBonusRestApi.bonus_price'
) as mock_bonus:
mock_bonus.side_effect = ConnectionRefusedError
s = salary.Salary(year=2017)
self.assertEqual(s.calculation_salary(), 100)
mock_bonus.assert_called()
@mock.patch('salary.ThirdPartyBonusRestApi', spec=True)
def test_calculation_salary_class(self, MockRest):
"""
@mock.patchによって引数にクラスも入れられる
spec=Trueでプロパティやメソッド全てmockにしてくれる
"""
# mock_rest = MockRest.return_valueと同義
# 本来は引数をmock_restと命名して mock_rest = mock_rest.return_valueとする
mock_rest = MockRest()
mock_rest.bonus_price.return_value = 1
s = salary.Salary(year=2017)
self.assertEqual(s.calculation_salary(), 101)
mock_rest.bonus_price.assert_called()
if __name__ == '__main__':
unittest.main()
並列化
マルチプロセス,スレッド
スレッド:
- CPUコアとメモリは共有, 平行, PythonGILというものの上で動くので実際は交互に動く
- I/Oバウンド(ネットワーク、ファイル読み書き)には効果的, CPUバウンド(並列計算)には不向き
- スレッド数がマルチプロセスよりも増やせる
- GIL待ちがある(GILロックを取るのにかかる時間)
プロセス:
- CPUコアとメモリが別々, 並列
- CPUのコア数を超えて並列化はできない
- プロセス間通信しないとデータを受け渡せない
- I/OバウンドにもCPUバウンドにも効果的
Thread
import threading
import time
import logging
logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')
logger = logging.getLogger(__name__)
def worker1():
"""threadに渡す関数"""
logger.debug('start')
time.sleep(3)
logger.debug('end')
def worker2(x, y=1):
"""引数ありスレッド例"""
logger.debug('start')
logger.debug(x)
logger.debug(y)
time.sleep(2)
logger.debug('end')
if __name__ == '__main__':
t1 = threading.Thread(target=worker1, name='rename1')
t1.setDaemon(True) # 他のプロセスが終了する時同時にThreadを強制終了する。
t2 = threading.Thread(target=worker2, args=(100, ), kwargs={'y': 200})
t1.start() # threadスタート
t2.start()
print('started')
t1.join() # スレッドが終わるのを待つ setDaemonがTrueでも待つ。
print('ended')
# t2.join() daemon化していないものはjoin書いていなくても同じ
# rename1: start
# Thread-1: start
# started
# Thread-1: 100
# Thread-1: 200
# Thread-1: end
# rename1: end
# ended
print("""
**********
生存中のThreadはenumerateでとれる
**********
""")
# threads = []
for _ in range(5):
t = threading.Thread(target=worker1)
t.setDaemon(True)
t.start()
# threads.append(t)
# for thread in threads: # リストをつくってjoinしてもいいが
for thread in threading.enumerate():
if thread is threading.currentThread(): # mainthreadは省く
print(thread)
continue
thread.join()
# Thread-2: start
# Thread-4: start
# Thread-5: start
# Thread-6: start
# <_MainThread(MainThread, started 18628)>
# Thread-5: end
# Thread-2: end
# Thread-4: end
# Thread-6: end
# Thread-3: end
print("""
*****タイマー*****
""")
# 始まるのを遅らせる
t = threading.Timer(3, worker1)
t.start()
t.join()
# ******************************
# *****Lock RLock Semaphore*****
# ******************************
def share1(d, lock):
"""lock例"""
with lock:
i = d['x']
time.sleep(2)
d['x'] = i + 2
logger.debug(d['x'])
def share2(d, lock):
"""lock例"""
lock.acquire()
d['x'] += 5
logger.debug(d['x'])
lock.release()
if __name__ == '__main__':
# 同じLockを関数に渡すことで占有ロックができる
lock = threading.Lock()
# lock = threading.RLock() RLockにするとロックの中でロックできるようになる。
# semaphore = threading.Semaphore(2) Semaphoreにすると並列処理するスレッドの数を狭められる
d = {'x': 0}
t1 = threading.Thread(name="share1", target=share1, args=(d, lock))
t2 = threading.Thread(name="share2", target=share2, args=(d, lock))
for t in (t1, t2):
t.start()
# share1: 2
# share2: 7
# ***************
# *****queue*****
# ***************
def queue_put(queue):
"""queue put関数"""
queue.put(100)
time.sleep(2)
queue.put(200)
logger.debug("end")
def queue_get(queue):
"""queue get関数"""
logger.debug(queue.get())
logger.debug(queue.get())
logger.debug("end")
def queue_loop(queue):
"""queueの用例"""
logger.debug('start')
while True:
item = queue.get()
# Noneが入ってきたタイミングでループを終了する
if item is None:
break
logger.debug(item)
queue.task_done() # queueが空であることをjoinに知らせる
logger.debug('end')
if __name__ == '__main__':
import queue
queue = queue.Queue()
# queueは取り出すものが来るまで待つので、並列化されていれば入れられるのも待ってくれる
t1 = threading.Thread(name="queue1", target=queue_put, args=(queue,))
t2 = threading.Thread(name="queue2", target=queue_get, args=(queue,))
for t in (t1, t2):
t.start()
# queue2: 100
# queue1: end
# queue2: 200
# queue2: end
for i in range(10):
queue.put(i)
t = threading.Thread(name="queue_loop", target=queue_loop, args=(queue,))
t.start()
logger.debug('tasks are not done')
queue.join() # queue.joinはtask_doneが判定されるまで処理をまつ
logger.debug('tasks are done')
queue.put(None)
# queue_loop: start
# MainThread: tasks are not done
# queue_loop: 0
# queue_loop: 1
# queue_loop: 2
# queue_loop: 3
# queue_loop: 4
# queue_loop: 5
# queue_loop: 6
# queue_loop: 7
# queue_loop: 8
# queue_loop: 9
# MainThread: tasks are done
# queue_loop: end
# ***************
# *****event*****
# ***************
def event_func1(event):
event.wait() # event.wait()でイベントが発火するのを待つ
logger.debug('start')
time.sleep(3)
logger.debug('end')
def event_func2(event):
event.wait() # event.wait()でイベントが発火するのを待つ
logger.debug('start')
time.sleep(3)
logger.debug('end')
def event_func3(event):
logger.debug('start')
time.sleep(3)
logger.debug('end')
event.set() # event.set()でイベントを発火
if __name__ == '__main__':
event = threading.Event()
t1 = threading.Thread(target=event_func1, args=(event,))
t2 = threading.Thread(target=event_func2, args=(event,))
t3 = threading.Thread(target=event_func3, args=(event,))
t1.start()
t2.start()
t3.start()
# Thread-3: start
# Thread-3: end
# Thread-1: start
# Thread-2: start
# Thread-2: end
# Thread-1: end
# *******************
# *****condition*****
# *******************
def condition_func1(condition):
with condition: # lockみたいにロックできる
condition.wait() # 発火を待つ
logger.debug('start')
time.sleep(3)
logger.debug('end')
def condition_func2(condition):
with condition: # lockみたいにロックできる
condition.wait() # 発火を待つ
logger.debug('start')
time.sleep(3)
logger.debug('end')
def condition_func3(condition):
with condition:
logger.debug('start')
time.sleep(3)
logger.debug('end')
condition.notifyAll() # eventを発火
if __name__ == '__main__':
condition = threading.Condition()
t1 = threading.Thread(name="t1", target=condition_func1, args=(condition,))
t2 = threading.Thread(name="t2", target=condition_func2, args=(condition,))
t3 = threading.Thread(name="t3", target=condition_func3, args=(condition,))
t1.start()
t2.start()
t3.start()
# t3: start
# t3: end
# t2: start
# t2: end
# t1: start
# t1: end
# *****************
# *****barrier*****
# *****************
def barrier_func1(barrier):
r = barrier.wait() # スレッドがそろうまで待つ
logging.debug('num={}'.format(r))
while True:
logger.debug('start')
time.sleep(3)
logger.debug('end')
def barrier_func2(barrier):
r = barrier.wait() # スレッドがそろうまで待つ
logging.debug('num={}'.format(r))
while True:
logger.debug('start')
time.sleep(3)
logger.debug('end')
if __name__ == '__main__':
barrier = threading.Barrier(2) # barrier.wait()をしているThreadが2つになったら開始
t1 = threading.Thread(name="t1", target=barrier_func1, args=(barrier,))
t2 = threading.Thread(name="t2", target=barrier_func2, args=(barrier,))
t1.start()
t2.start()
multiprocessing
import time
import logging
from multiprocessing import Process
# from multiprocessing import RLock, Semaphore, Queue, Event, Barrier
from multiprocessing import Value, Array, Pipe, Manager
# **********************************
# 基本的にはThreadingと同じように扱える
# **********************************
logging.basicConfig(level=logging.DEBUG, format='%(processName)s: %(message)s')
logger = logging.getLogger(__name__)
def worker1(i):
logger.debug('start')
time.sleep(i)
logger.debug('end')
def worker2(i):
logger.debug('start')
time.sleep(i)
logger.debug('end')
if __name__ == '__main__':
t1 = Process(target=worker1, args=(3,))
t2 = Process(target=worker1, args=(2,))
t1.daemon = True
t1.start()
t2.start()
t1.join()
# Process-1: start
# Process-2: start
# Process-2: end
# Process-1: end
def worker1(i):
logger.debug('start')
time.sleep(i)
logger.debug('end')
return i
if __name__ == '__main__':
# Poolは並列処理するプロセスの数
with Pool(5) as p:
# 並列化しない場合はapply
logger.debug(p.apply(worker1, (1,)))
# Pool.apply_asyncで関数を並列処理
p1 = p.apply_async(worker1, (3,))
p2 = p.apply_async(worker1, (2,))
logger.debug('exec')
# logger.debug(p2.get(timeout=1)) # TimeoutError をあげる
logger.debug(p1.get()) # 値が返ってくるまで待つ。
# MainProcess: 1
# MainProcess: exec
# SpawnPoolWorker-2: start
# SpawnPoolWorker-3: start
# SpawnPoolWorker-3: end
# SpawnPoolWorker-2: end
# MainProcess: 3
if __name__ == '__main__':
with Pool(5) as p:
# Pool.mapを使うと指定関数をargsの数だけ並列実行
r = p.map(worker1, (3,2,1))
print(r, type(r))
# l = list(map(worker1, (3,2,1))) # builtinのmapも並列じゃないけど同じような動作をする
# print(l)
# SpawnPoolWorker-1: start
# SpawnPoolWorker-2: start
# SpawnPoolWorker-3: start
# SpawnPoolWorker-3: end
# SpawnPoolWorker-2: end
# SpawnPoolWorker-1: end
# [3, 2, 1] <class 'list'>
r = p.map_async(worker1, (3,2,1))
print(r.get(), type(r)) # map_asyncの戻り値のget()は並列処理が完了するまで待つ。
# SpawnPoolWorker-4: start
# SpawnPoolWorker-3: start
# SpawnPoolWorker-5: start
# SpawnPoolWorker-3: end
# SpawnPoolWorker-5: end
# SpawnPoolWorker-4: end
# [3, 2, 1] <class 'multiprocessing.pool.MapResult'>
r = p.imap(worker1, (3,2,1)) # imapの戻り値はイテレータ
logger.debug([i for i in r]) # 回された瞬間に実行される
# SpawnPoolWorker-2: start
# SpawnPoolWorker-1: start
# SpawnPoolWorker-3: start
# SpawnPoolWorker-3: end
# SpawnPoolWorker-1: end
# SpawnPoolWorker-2: end
# MainProcess: [3, 2, 1]
# ******************
# mapやimapで複数引数使いたい時は, objectかラップ関数を使う
# ******************
def multiply(a, b):
logger.debug(f'a:{a} b:{b}')
return a * b
def wrap_multiply(args):
logger.debug('wrap')
return multiply(*args)
if __name__ == '__main__':
param1 = [1, 2, 3, 4]
param2 = [10, 20, 30, 40]
with Pool(5) as p:
# zip関数はイテレータをタプルにそれぞれまとめて返す
params = zip(param1, param2)
results = p.imap(wrap_multiply, params)
print(list(results)) # listに変換しても実行される
# SpawnPoolWorker-1: wrap
# SpawnPoolWorker-1: a:1 b:10
# SpawnPoolWorker-1: wrap
# SpawnPoolWorker-1: a:2 b:20
# SpawnPoolWorker-2: wrap
# SpawnPoolWorker-2: a:3 b:30
# SpawnPoolWorker-1: wrap
# SpawnPoolWorker-1: a:4 b:40
# [10, 40, 90, 160]
# multiprocessでは実は一つのプロセス内で使っているメモリはすべて独立している
# 引数なんかはコピーされている
def connect(conn):
conn.send('test_start') # 親に送る
time.sleep(3)
conn.send('test_end')
conn.close()
if __name__ == '__main__':
# Pipeは現在のプロセスと子プロセスの通信路を作る
parent_conn, child_conn = Pipe()
p = Process(target=connect, args=(parent_conn,))
p.start()
logger.debug(child_conn.recv()) # 子から受け取る
logger.debug(child_conn.recv())
# ************
# 共有メモリ
# ************
def share1(num, arr):
num.value += 1.0
logger.debug(num)
for i in range(len(arr)):
arr[i] *= 2
logger.debug(arr)
if __name__ == '__main__':
# Valueは値を共有 浮動小数点はf
num = Value('f', 0.0)
# Arrayはリストを共有 integerはi
arr = Array('i', [1, 2, 3, 4, 5])
p = Process(target=share1, args=(num, arr))
p.start()
p.join()
logger.debug(num.value)
logger.debug(arr[:])
# Process-1: <Synchronized wrapper for c_float(1.0)>
# Process-1: <SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_long_Array_5 object at 0x00000222EC9EA640>>
# MainProcess: 1.0
# MainProcess: [2, 4, 6, 8, 10]
def share2(l, d, n):
l.reverse()
d['x'] += 1
n.y += 1
if __name__ == '__main__':
# Managerは直観的に共有オブジェクトを操作できる。ただ、すこーし遅い
with Manager() as manager:
l = manager.list()
d = manager.dict()
n = manager.Namespace()
l.append(1)
l.append(2)
l.append(3)
d['x'] = 0
n.y = 0
p1 = Process(target=share2, args=(l, d, n))
p1.start()
p1.join()
logger.debug(l)
logger.debug(d)
logger.debug(n)
# MainProcess: [3, 2, 1]
# MainProcess: {'x': 1}
# MainProcess: Namespace(y=1)
別サーバプロセス
- server.py
import queue
from multiprocessing.managers import BaseManager
queue = queue.Queue()
class QueueManager(BaseManager):
pass
QueueManager.register('get_queue', callable=lambda: queue)
manager = QueueManager(
address=('127.0.0.1', 50000),
authkey=b'abcde'
)
server = manager.get_server()
server.serve_forever()
- client.py
import queue
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('get_queue')
manager = QueueManager(
address=('127.0.0.1', 50000),
authkey=b'abcde'
)
manager.connect()
queue = manager.get_queue()
# server.pyサーバにqueueをセット
queue.put('hello')
# server.pyサーバからqueueをゲット
print(queue.get())
高水準の並列化ライブラリ
import concurrent.futures
import logging
import time
logging.basicConfig(level=logging.DEBUG, format='%(processName)s: %(message)s')
logger = logging.getLogger(__name__)
def worker(x, y):
logger.debug('start')
time.sleep(3)
r = x * y
logger.debug(r)
logger.debug('end')
return r
def main():
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
f1 = executor.submit(worker, 2, 5)
f2 = executor.submit(worker, 3, 10)
logger.debug(f1.result())
logger.debug(f2.result())
if __name__ == '__main__':
main()
# MainProcess: start
# MainProcess: start
# MainProcess: 10
# MainProcess: 30
# MainProcess: end
# MainProcess: end
# MainProcess: 10
# MainProcess: 30
シングルスレッドの非同期処理
I/Oの時間を別の処理で有効に使う。
- 非同期処理:タスクの処理が終了したタイミングで通知される。 Callback/Future/Promiseと表現
- ノンブロッキング:処理の終了、処理のエラーを即座に返す。 Pollingと表現
はじめにgeneratorベースのコルーチンの例
# yieldで処理が中断する コルーチン
def g2():
print('first')
r = yield 'hello'
print('second')
yield r
g2 = g2()
print(next(g2)) # ここではyield 'hello'が帰ってきている
# print(next(g2)) # None
print(g2.send('plus')) # ここではyield r が帰ってきている
# first
# hello
# second
# plus
# yield from 文
def g3():
yield from [1,2,3]
yield from g1()
g3 = g3()
print(next(g3)) # 1
print(next(g3)) # 2
print(next(g3)) # 3
print(next(g3)) # hello
def sub_sub_generator():
yield "Sub Sub yield"
return "Sub Sub return"
def sub_generator():
yield "Sub yield"
res = yield from sub_sub_generator()
print("Sub res = {}".format(res))
return "Sub return"
def generator():
yield "Gen yield"
res = yield from sub_generator()
print("Gen res = {}".format(res))
yield res # yield from句の中でないと、return句は機能しない
gen = generator()
# yieldはそこで処理が中断する。
# yield from 句ではreturn句のときに戻り値が返ってきて、続行する。
print(next(gen)) # Gen yield
print(next(gen)) # Sub yield
print(next(gen)) # Sub Sub yield
print(next(gen)) # Sub res = Sub Sub return, Gen res = Sub return, Sub return
ジェネレータベースのコルーチンの動作がわかったところで、asyncioの非同期操作を見ていく。
import asyncio
# threadと同じようなライブラリもある
# from asyncio import Lock, Event, Condition, Semaphore, Queue, Future
# lock = Lock()
# loop.run_until_complete(asyncio.wait([waorker1(lock)]))
# async def worker1(lock):
# with await lock:
# await asyncio.sleep(2)
import threading
import time
def worker():
print('start')
time.sleep(2)
print('end')
# @asyncio.coroutine python 3.5まで
async def worker_async():
print('start')
# yield from asyncio.sleep(2) python3.5まで
await asyncio.sleep(2)
print('end')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# 全ての処理が完了したらクローズする
# async関数の中の、awaitの部分が非同期となり、await中に他の処理ができるようになる
loop.run_until_complete(asyncio.wait([worker_async(), worker_async()]))
loop.close()
# マルチスレッドの場合
# t1 = threading.Thread(target=worker)
# t2 = threading.Thread(target=worker)
# t1.start()
# t2.start()
# start
# start
# end
# end
# loop.run_forever, loop.call_later, loop.stopの例
import asyncio
loop = asyncio.get_event_loop()
def hello(name, loop):
print(name)
loop.stop()
loop.call_later(2, hello, 'Mike', loop)
# loop.call_soon(hello, 'Nancy', loop)
loop.run_forever()
loop.close()
# asyncio chain
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
# Compute 1 + 2 ...
# 1 + 2 = 3
通信I/Oで非同期処理する例を見ていく。
pip install aiohttp
import asyncio
import time
import aiohttp
import requests
loop = asyncio.get_event_loop()
# async def hello(url):
# print(requests.get(url).status_code)
# 200
# 200
# 1.5945141315460205
async def hello(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
resp = await response.read()
print(response.status)
# 200
# 200
# 0.8088996410369873
start = time.time()
loop.run_until_complete(asyncio.wait([
hello("https://sehippocampus.work/"),
hello("https://sehippocampus.work/")
]))
end = time.time()
print(end - start)
socket通信
pythonの通信モジュール 公式Doc
これをつかってwebサーバとかを作っている。
tcpの場合
- socket_server.py
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: # ipv4, tcpで通信
s.bind(('127.0.0.1', 55000)) # ポートを使う
s.listen(1) # 1接続
while True:
conn, addr = s.accept() # 誰かが接続するのを待つ
with conn:
data = conn.recv(1024) # chunk=1024で受け取る
if not data:
break
print('data: {}, addr: {}'.format(data, addr))
conn.sendall(b'Received: ' + data) # clientにデータを返す
- socket_client.py
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('127.0.0.1', 55000)) # 通信を試みる
s.sendall(b'Hello') # 送る
data = s.recv(1024) # 返信を受け取る
print(repr(data)) # reprはオブジェクトそのものの形を表示
udpの場合
受け取られたかを確認しないので簡単
- socket_server.py
import socket
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: # ipv4, udpで通信
s.bind(('127.0.0.1', 55000)) # ポートを使う
while True:
data, addr = s.recvfrom(1024) # chunk=1024で受け取る
print('data: {}, addr: {}'.format(data, addr))
- socket_client.py
import socket
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.sendto(b'Hello', ('127.0.0.1', 55000)) # 送る
簡単なwebサーバ
import http.server
import socketserver
with socketserver.TCPServer(('127.0.0.1', 8000),
http.server.SimpleHTTPRequestHandler) as httpd:
httpd.serve_forever()
contextlib
import contextlib
# デコレータを簡単に書く。引数も与えられる。
@contextlib.contextmanager
def tag(name):
print('<{}>'.format(name))
# yield内で実関数が動く
yield
print('</{}>'.format(name))
@tag('h3')
def f(content):
print(content)
f('test')
# <h3>
# test
# </h3>
# 関数をつくらなくてもwithでデコレータみたいに使える
with tag('h2'):
print('test')
with tag('a'):
print('test2')
# <h2>
# test
# <a>
# test2
# </a>
# </h2>
import contextlib
# ContextDecoratorを継承することでデコレータライクにクラスを扱える
class tag(contextlib.ContextDecorator):
def __init__(self, name):
self.name = name
self.start_tag = '<{}>'.format(name)
self.end_tag = '</{}>'.format(name)
def __enter__(self):
"""クラスが呼び出される時の関数"""
print(self.start_tag)
def __exit__(self, exc_type, exc_val, exc_tb):
"""クラスが最後に呼び出される時の関数"""
print(exc_type) # exceptiontype
print(exc_val) # exception内容
print(exc_tb) # exception traceback
print(self.end_tag)
with tag('h2'):
print('test')
# <h2>
# test
# None
# None
# None
# </h2>
import logging
import contextlib
# Errorを無視したいとき、suppressを使える
with contextlib.suppress(FileNotFoundError):
os.remove('somefile.tmp')
with
# 標準出力をファイルに書き込むredirect_std
with open('stdout.log', 'w') as f:
with contextlib.redirect_stdout(f):
print('hello')
with open('stderr.log', 'w') as f:
with contextlib.redirect_stderr(f):
logging.error('Error')
import contextlib
def is_ok_job():
try:
print('do something')
raise Exception('error')
return True
except Exception:
return False
def cleanup():
print('clean up')
def cleanup2():
print('clean up2')
# finallyの処理を取り回しやすくするstack
with contextlib.ExitStack() as stack:
stack.callback(cleanup) # finallyみたいにやりたい関数を入れておく
stack.callback(cleanup2)
@stack.callback # デコレータでもcallbackを追加できる
def cleanup3():
print('clean up3')
is_ok = is_ok_job()
print('more task')
if is_ok:
stack.pop_all() # もし処理が成功したらstackに入れてある関数を取り除く
# is_okが失敗していれば、cleanupが行われる
# do something
# more task
# clean up3
# clean up2
# clean up
io
インメモリストリームを作成できる。
# tarファイルを保存せずに取り扱う
import io
import requests
import tarfile
url = ('https://files.pythonhosted.org/packages/12/68/95515eaff788370246dac534830ea9ccb0758e921ac9e9041996026ecaf2/setuptools-53.0.0.tar.gz')
r = requests.get(url)
f = io.BytesIO(r.content)
with tarfile.open(fileobj=f) as t:
print(t.getmembers())
collections
# ChainMapは辞書をそのままの形で結合できる。
a = {'a': 'a', 'num': 0}
b = {'b': 'b', 'c': 'cc'}
c = {'b': 'bbb', 'c': 'ccc'}
m = collections.ChainMap(a, b, c)
print(m)
# ChainMap({'a': 'a', 'num': 0}, {'b': 'b', 'c': 'cc'}, {'b': 'bbb', 'c': 'ccc'})
print(m.maps) # list
# [{'a': 'a', 'num': 0}, {'b': 'b', 'c': 'cc'}, {'b': 'bbb', 'c': 'ccc'}]
# defaultdictはキーが存在しないときに値を返す
d = collections.defaultdict(int)
print(d['1']) # 0
d = collections.defaultdict(lambda:'str')
print(d['1']) # str
# Counterはイテレータをカウントしてくれる
c = collections.Counter([1,2,3,1,2, 'str'])
print(c) # Counter({1: 2, 2: 2, 3: 1, 'str': 1})
print(c.most_common(1)) # [(1, 2)]
# dequeは双方向から操作できるキュー。threadにもqueueと同様に使える
d = collections.deque([1,2,3,4])
print(d.pop()) # 4
print(d.popleft()) # 1
print(d) # deque([2, 3])
d.appendleft(4)
d.append(1)
print(d) # deque([4, 2, 3, 1])
# クラスメンバライクにアクセスできるタプル
# 第一引数はtupleの名前, 第二引数がタプルの中身
Point = collections.namedtuple('Point', ['x', 'y'])
p = Point(10, y=20) # *[10, 20]
print(p) # Point(x=10, y=20)
print(p.x) # 10
# p.x = 100 # AttributeError: can't set attribute
正規表現re
import re
s = r'abc de fg abc'
# . は任意の一文字
# match() 文字列の先頭で正規表現とマッチするか判定
m = re.match('a.c', s)
print(m.group()) # abc
# search() 文字列を操作して正規表現がどこにマッチするか調べる(最初の一致)
m = re.search('a.c', s)
print(m.span()) # (0, 3) :0から2までが一致
print(m.group()) # abc
# findall() 正規い表現にマッチする部分文字列を全て探し出しリストとして返す
m = re.findall('a.c', s)
print(m) # ['abc', 'abc']
# finditer() 重複しないマッチオブジェクトのイテレータを返す
m = re.finditer('a.c', s)
print([w.group() for w in m]) # ['abc', 'abc']
s1 = ('/abc-def-ghi/xxxxxxxxxxxx/123456789/yyy-yyy-yy')
s2 = ('/abc-def-ghi/XXXXXXXXXXXX/123456789/YYY-YYY-YY')
# (?P<name> re) で、groupnameをつけることができる。
m = re.match(r'/[\w-]+/(?P<xxx>[\w]+)/[\d]+/(?P<yyy>[\w-]+)', s1)
print(m.group('xxx')) # xxxxxxxxxxxx
print(m.group('yyy')) # yyy-yyy-yy
# compileを使うと高速化できる
RE_STACK_ID = re.compile(r'/[\w-]+/(?P<xxx>[\w]+)/[\d]+/(?P<yyy>[\w-]+)')
for s in (s1, s2):
m = RE_STACK_ID.match(s)
if m:
print(m.group('xxx'))
print(m.group('yyy'))
s = 'My name is ... Mike'
print(s.split()) # ['My', 'name', 'is', '...', 'Mike']
# 正規表現でsplit
p = re.compile(r'\W+')
print(p.split(s)) # ['My', 'name', 'is', 'Mike']
# 正規表現で置換
p = re.compile('name|names')
print(p.sub('firstname', s)) # My firstname is ... Mike
print(p.sub('firstname', s + ' names', count=1)) # My firstname is ... Mike names
print(p.subn('firstname', s + ' names')) # ('My firstname is ... Mike firstnames', 2)
def hexrepl(match):
value = int(match.group())
return hex(value)
# matchしたら関数を適用することもできる
p = re.compile(r'\d')
print(p.sub(hexrepl, '12345 01 test')) # 0x10x20x30x40x5 0x00x1 test
# greedyである正規表現の直し方
s = '<html><head><title>Title</title></head>'
print(re.match('<.*>', s).group()) # <html><head><title>Title</title></head>
print(re.match('<.*?>', s).group()) # <html>