Published on

Python応用

Authors
  • avatar
    Name
    Kikusan
    Twitter

Refs

酒井さんのpython講座の内容を自分なりにまとめたものです。

データベース

pymysqlやpsycopg2, pymongoのようなDB接続ライブラリをつかって、基本操作や生のSQLを取り扱うことができる。

NoSQL MEMO
キーバリュー型 : DBM, memcach, redis
ドキュメント型 : MongoDB
ワイドカラム型 : Hbase, Cassandra
グラフ型 : Neo4j

pickle

pickleは日本語でつけもの。データ構造を保存できる。

import pickle

class T(object):
    def __init__(self, name):
        self.namej = name

data = {
  'a': [1,2,3],
  'b': {'1': 2},
  'c': T('test')
}

with open('data.pickle', 'wb') as f:
    pickle.dump(data, f)

with open('data.pickle', 'rb') as f:
    data_loaded = pickle.load(f)
    print(data_loaded)

# {'a': [1, 2, 3], 'b': {'1': 2}, 'c': <__main__.T object at 0x00000277CDB90BB0>}

sqlalchemy

sqlalchemyはいろいろなRDBにアクセスできるラッパーライブラリ 途中でデータベースを切り替えてみたり、ORM的に使えたりする。

公式Doc

pip install SQLAlchemy
import sqlalchemy
import sqlalchemy.ext.declarative
import sqlalchemy.orm

# sqlalchemyのsqliteのインメモリデータベース echo=TrueでSQLが見れる
# 接続文字列はhttp://docs.sqlalchemy.org/en/latest/core/engines.html#database-urls
engine = sqlalchemy.create_engine('sqlite:///:memory:', echo=False) 
"""
    # sqliteファイルを利用する場合
    engine = sqlalchemy.create_engine('sqlite:///test_sqlite')
"""
"""
    # mysqlを利用する場合
    engine = sqlalchemy.create_engine('mysql+pymysql:///test_mysql_database')
"""
# ORMの基底クラス
Base = sqlalchemy.ext.declarative.declarative_base()
# テーブルオブジェクト
class Person(Base):
    __tablename__ = 'persons'
    id = sqlalchemy.Column(
        sqlalchemy.Integer, primary_key=True, autoincrement=True)
    name = sqlalchemy.Column(sqlalchemy.String(14))

# テーブルを作成
Base.metadata.create_all(engine)
# トランザクション開始
Session = sqlalchemy.orm.sessionmaker(bind=engine)
session = Session()
# テーブルデータ
p1 = Person(name='Mike')
p2 = Person(name='Nancy')
p3 = Person(name='Jun')
# トランザクションに追加
session.add(p1)
session.add(p2)
session.add(p3)
# commit
session.commit()

# where, update
p4 = session.query(Person).filter_by(name='Mike').first()
p4.name = 'Michel'
session.add(p4)
session.commit()

# delete
p5 = session.query(Person).filter_by(name='Nancy').first()
session.delete(p5)
session.commit()

# 読み込み
persons = session.query(Person).all()
for person in persons:
    print(person.id, person.name)

# 1 Michel
# 3 Jun

memcachedを使うとメモリ上にテーブルを展開した状態を維持して、DBアクセスを高速化できる。

test

doctest

class Cal(object):
    def add_num_and_double(self, x, y):
        """Add and double

        >>> c = Cal()
        >>> c.add_num_and_double(1, 1)
        5

        >>> c = Cal()
        >>> c.add_num_and_double("2", 1)
        9
        """
        result = x + y
        result *= 2
        return result

def add_num(x, y):
    """Add

    :param x:
    :param y:
    :return: x + y

    >>> add_num(1,1)
    3
    """
    return x + y

if __name__ == '__main__':
    import doctest
    doctest.testmod()
**********************************************************************
File "testtest.py", line 6, in __main__.Cal.add_num_and_double
Failed example:
    c.add_num_and_double(1, 1)
Expected:
    5
Got:
    4
**********************************************************************
File "testtest.py", line 10, in __main__.Cal.add_num_and_double
Failed example:
    c.add_num_and_double("2", 1)
Exception raised:
    Traceback (most recent call last):
      File "doctest.py", line 1329, in __run
        compileflags, 1), test.globs)
      File "<doctest __main__.Cal.add_num_and_double[3]>", line 1, in <module>
        c.add_num_and_double("2", 1)
      File "testtest.py", line 13, in add_num_and_double
        result = x + y
    TypeError: can only concatenate str (not "int") to str
**********************************************************************
File "testtest.py", line 24, in __main__.add_num
Failed example:
    add_num(1,1)
Expected:
    3
Got:
    2
**********************************************************************
2 items had failures:
   2 of   4 in __main__.Cal.add_num_and_double
   1 of   1 in __main__.add_num
***Test Failed*** 3 failures.

unittest

import unittest

import calculation

release_name = 'test'

class CalTest(unittest.TestCase):
    """unittest.TestCaseを継承"""
    def setUp(self):
        """testのセットアップ情報を登録"""
        self.cal = calculation.Cal()

    def tearDown(self):
        """test終了後処理"""
        del self.cal

    # @unittest.skipでテストを行わない
    # @unittest.skip('skip!')
    @unittest.skipIf(release_name=='test', 'skip!')
    def test_add_num_and_double(self):
        """test_ 名前に必須"""
        self.assertEqual(
            self.cal.add_num_and_double(1,1), 4
        )

        with self.assertRaises(TypeError):
            """例外処理テストではwithを使う"""
            self.cal.add_num_and_double('1', 1)

    def test_add_num(self):
        self.assertNotEqual(
            calculation.add_num(1,1), 4
        )

if __name__ == '__main__':
    unittest.main()
Ran 1 test in 0.008s

OK (skipped=1)

Skipped: skip!

pytest

公式Doc

# まずはじめに
pip install pytest
# 実行
pytest # -s でprint表示

# collected 2 items
# test_aaa.py F.

pytestは実行dir以下のファイル名がtest_で始まるファイルを対象にテストする。
unittestのコードもpytestには含まれる。

  • test_calc.py
import calculation
import pytest

is_release = True

def test_add_num_and_double():
    """test_ は必要"""
    cal = calculation.Cal()
    # pythonの演算子で判定できる。
    assert cal.add_num_and_double(1,1) == 4

class TestCal(object):
    """クラスの先頭はTest"""
    @classmethod
    def setup_class(cls):
        """クラスのセットアップをしたいときはsetup_class@classmethod"""
        print('start')
        cls.cal = calculation.Cal()
    
    @classmethod
    def teardown_class(cls):
        """クラスの終了時処理をしたいときはteardown_class@classmethod"""
        print('end')
        del cls.cal

    # def setup_method(self, method):
    #     print('setup_method={}'.format(method.__name__))
    #     """testmethodのセットアップ情報を登録, 引数は実行関数"""
    #     self.cal = calculation.Cal()

    # def teardown_method(self, method):
    #     """testmethod終了後処理"""
    #     print('method={}'.format(method.__name__))
    #     del self.cal

    # @pytest.mark.skip(reason='skip!') # skip時 pytest -rs
    @pytest.mark.skipif(is_release, reason='skip!')
    def test_add_num_and_double(self):
        """test_ は必要"""
        assert self.cal.add_num_and_double(1,1) == 4
    
    def test_add_num_and_double_raise(self):
        with pytest.raises(TypeError):
            """例外処理テストではwithでpytest.raisesを使う"""
            self.cal.add_num_and_double('1', 1)

    # 引数で受け取れるものは引数名で決まっている(fixture)
    # https://docs.pytest.org/en/stable/builtin.html
    # requestや、tmpdirなどがある。自分で作成することもできる。

    def test_config(self, request):
        """conftest.pyに書き込んだオプションを
        request.config.getoptionで読み込める。"""
        os_name = request.config.getoption('--os-name')
        assert os_name == 'linux'

    def test_fixture(self, csv_file):
        """自作fixture"""
        print(csv_file)
        assert True
  • conftest.py
"""
    conftest.pyは各ディレクトリ配下に影響する。
    > pytest --os-name=windows
"""

import os
import pytest

def pytest_addoption(parser):
    """option追加関数
    parser.addoptionでオプションが追加できる。
    """
    parser.addoption('--os-name', default='linux', help='os name')


@pytest.fixture
def csv_file(tmpdir):
    """独自fixture
    このfixtureにも↑fixtureを入れられる
    """
    # yieldにすると開いたファイルをfixtureで閉じてくれる
    # fixtureとしてはreturnでもいい
    with open(os.path.join(tmpdir, 'test.csv'), 'w+') as c:
        print('before test')
        yield c
        print('after test')

テストカバレッジ

どれだけテストでカバーされているかをみることができる。

pip install pytest-cov pytest-xdist
# --cov=importモジュールかフォルダパス
pytest test_calc.py --cov=calculation --cov-report term-missing 
# windowsではsqlite3.dllにpathを通さないと動かなかった

# ----------- coverage: platform win32, python 3.8.5-final-0 -----------
# Name             Stmts   Miss  Cover   Missing
# ----------------------------------------------
# calculation.py      10      3    70%   27, 30-31
# ----------------------------------------------
# TOTAL               10      3    70%

missingはテストできていない行(分岐などで実行がされていない)

特に見られていなくてもいい行もあるので、Cover率は合格率をあらかじめ決める。

setuptoolsでテスト

  • setup.py
from setuptools import setup

setup(
    # ...
    # tests_require=['pytest'],  # pytestでテストするとき
    test_suits='test_folder_name'
)
  • setup.cfg
# pytestを使うとき
[alieas]
test=pytest

[tool:pytest]
python_files = tests/*
python setup.py test

とすれば、そのフォルダ以下のtest_XXX.py ファイルでテストしてくれる。

mock

公式Doc

  • モック : まだできてない箇所をダミーのオブジェクトで補ってテストする仕組み
  • スタブ : テスト対象オブジェクトから利用するオブジェクトがまだ完成していないとき、代替えする仕組み
  • ドライバ : テスト対象を利用しようとするオブジェクトがまだ完成していないとき、代替えする仕組み

pythonのmockでは、すべてできる。

どこまでmockするかは、プロジェクトで決める。(response.json()をモック?get_response()をモック?)

  • salary.py
import requests

class ThirdPartyBonusRestApi(object):
    def bonus_price(self, year):
        r = requests.get('http://localhost/bonus', params={'year': year})
        return r.json()['price']

class Salary(object):
    def __init__(self, base=100, year=2017):
        self.bonus_api = ThirdPartyBonusRestApi()
        self.base = base
        self.year = year
    
    def calculation_salary(self):
        try:
            bonus = self.bonus_api.bonus_price(year=self.year)
        except ConnectionRefusedError:
            bonus = 0
        return self.base + bonus
  • test_salary.py
import unittest
from unittest.mock import MagicMock
from unittest import mock
import salary

class TestSalary(unittest.TestCase):
    def test_calculation_salary(self):
        s = salary.Salary(year=2017)
        # MagicMockを使うことで、関数の戻り値がreturn_valueの値に置き換わる
        s.bonus_api.bonus_price = MagicMock(return_value=1)
        self.assertEqual(s.calculation_salary(), 101)
        # mockを使った関数が呼ばれなかったらassertError
        s.bonus_api.bonus_price.assert_called()
        # 1以外の回数呼ばれたらassertError
        s.bonus_api.bonus_price.assert_called_once()
        # 指定の引数と一緒に呼ばれていなければassertError
        s.bonus_api.bonus_price.assert_called_with(year=2017)
        # 呼ばれた回数
        self.assertEqual(s.bonus_api.bonus_price.call_count, 1)

    @mock.patch('salary.ThirdPartyBonusRestApi.bonus_price', return_value=1)
    def test_calculation_salary_patch(self, mock_bonus):
        """@mock.patchによって引数にそのオブジェクトが入る"""
        # やっていることはmock_bonus.return_value = 1と一緒
        
        s = salary.Salary(year=2017)
        self.assertEqual(s.calculation_salary(), 101)
        mock_bonus.assert_called()

    def test_calculation_salary_patch_with(self):
        """with句でもmock.patchを使える"""
        with mock.patch('salary.ThirdPartyBonusRestApi.bonus_price'
                        , return_value=1) as mock_bonus:
            """
            patcher = mock.patch(..)
            mock_bonus = patcher.start()
            patcher.stop() と同義
            start->stop は setUp->tearDownで使える
            """
            s = salary.Salary(year=2017)
            self.assertEqual(s.calculation_salary(), 101)
            mock_bonus.assert_called()

    def test_calculation_salary_patch_side_effect(self):
        """mock_patch.side_effectでmockを別の関数にできる"""
        with mock.patch('salary.ThirdPartyBonusRestApi.bonus_price'
                        ) as mock_bonus:
            def f(year):
                return year + 1000
            # 関数ではなく[1, ValueError]にすれば、一度目のcallで1, 二度目のcallでValueErrorが返る。
            mock_bonus.side_effect = f
            s = salary.Salary(year=2017)
            self.assertEqual(s.calculation_salary(), 3117)
            mock_bonus.assert_called()

    def test_calculation_salary_patch_side_effect_error(self):
        """mock_patch.side_effectでmock関数でerrorをraiseできる"""
        with mock.patch('salary.ThirdPartyBonusRestApi.bonus_price'
                        ) as mock_bonus:
            mock_bonus.side_effect = ConnectionRefusedError
            s = salary.Salary(year=2017)
            self.assertEqual(s.calculation_salary(), 100)
            mock_bonus.assert_called()

    @mock.patch('salary.ThirdPartyBonusRestApi', spec=True)
    def test_calculation_salary_class(self, MockRest):
        """
        @mock.patchによって引数にクラスも入れられる
        spec=Trueでプロパティやメソッド全てmockにしてくれる
        """
        # mock_rest = MockRest.return_valueと同義
        # 本来は引数をmock_restと命名して mock_rest = mock_rest.return_valueとする
        mock_rest = MockRest()
        mock_rest.bonus_price.return_value = 1
        s = salary.Salary(year=2017)
        self.assertEqual(s.calculation_salary(), 101)
        mock_rest.bonus_price.assert_called()

if __name__ == '__main__':
    unittest.main()

並列化

マルチプロセス,スレッド
スレッド:

  • CPUコアとメモリは共有, 平行, PythonGILというものの上で動くので実際は交互に動く
  • I/Oバウンド(ネットワーク、ファイル読み書き)には効果的, CPUバウンド(並列計算)には不向き
  • スレッド数がマルチプロセスよりも増やせる
  • GIL待ちがある(GILロックを取るのにかかる時間)

プロセス:  

  • CPUコアとメモリが別々, 並列
  • CPUのコア数を超えて並列化はできない
  • プロセス間通信しないとデータを受け渡せない
  • I/OバウンドにもCPUバウンドにも効果的

Thread

公式Doc

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')
logger = logging.getLogger(__name__)

def worker1():
    """threadに渡す関数"""
    logger.debug('start')
    time.sleep(3)
    logger.debug('end')


def worker2(x, y=1):
    """引数ありスレッド例"""
    logger.debug('start')
    logger.debug(x)
    logger.debug(y)
    time.sleep(2)
    logger.debug('end')

if __name__ == '__main__':
    t1 = threading.Thread(target=worker1, name='rename1')
    t1.setDaemon(True) # 他のプロセスが終了する時同時にThreadを強制終了する。
    t2 = threading.Thread(target=worker2, args=(100, ), kwargs={'y': 200})
    t1.start() # threadスタート
    t2.start()
    print('started')
    t1.join() # スレッドが終わるのを待つ setDaemonがTrueでも待つ。
    print('ended')
    # t2.join() daemon化していないものはjoin書いていなくても同じ

# rename1: start
# Thread-1: start
# started
# Thread-1: 100
# Thread-1: 200
# Thread-1: end
# rename1: end
# ended

    print("""
    **********
    生存中のThreadはenumerateでとれる
    **********
    """)
    
    # threads = []
    for _ in range(5):
        t = threading.Thread(target=worker1)
        t.setDaemon(True)
        t.start()
    #     threads.append(t)
    # for thread in threads: # リストをつくってjoinしてもいいが
    for thread in threading.enumerate():
        if thread is threading.currentThread(): # mainthreadは省く
            print(thread)
            continue
        thread.join()

# Thread-2: start
# Thread-4: start
# Thread-5: start
# Thread-6: start
# <_MainThread(MainThread, started 18628)>
# Thread-5: end
# Thread-2: end
# Thread-4: end
# Thread-6: end
# Thread-3: end

    print("""
    *****タイマー*****
    """)
    # 始まるのを遅らせる
    t = threading.Timer(3, worker1)
    t.start()
    t.join()
# ******************************
# *****Lock RLock Semaphore*****
# ******************************

def share1(d, lock):
    """lock例"""
    with lock:
        i = d['x']
        time.sleep(2)
        d['x'] = i + 2
        logger.debug(d['x'])

def share2(d, lock):
    """lock例"""
    lock.acquire()
    d['x'] += 5
    logger.debug(d['x'])
    lock.release()
    
if __name__ == '__main__':
    # 同じLockを関数に渡すことで占有ロックができる
    lock = threading.Lock()
    # lock = threading.RLock() RLockにするとロックの中でロックできるようになる。
    # semaphore = threading.Semaphore(2) Semaphoreにすると並列処理するスレッドの数を狭められる
    d = {'x': 0}
    t1 = threading.Thread(name="share1", target=share1, args=(d, lock))
    t2 = threading.Thread(name="share2", target=share2, args=(d, lock))
    for t in (t1, t2):
        t.start()

# share1: 2
# share2: 7

# ***************
# *****queue*****
# ***************

def queue_put(queue):
    """queue put関数"""
    queue.put(100)
    time.sleep(2)
    queue.put(200)
    logger.debug("end")

def queue_get(queue):
    """queue get関数"""
    logger.debug(queue.get())
    logger.debug(queue.get())
    logger.debug("end")

def queue_loop(queue):
    """queueの用例"""
    logger.debug('start')
    while True:
        item = queue.get()
        # Noneが入ってきたタイミングでループを終了する
        if item is None:
            break
        logger.debug(item)
        queue.task_done() # queueが空であることをjoinに知らせる
    logger.debug('end')

if __name__ == '__main__':
    import queue
    queue = queue.Queue()
    # queueは取り出すものが来るまで待つので、並列化されていれば入れられるのも待ってくれる
    t1 = threading.Thread(name="queue1", target=queue_put, args=(queue,))
    t2 = threading.Thread(name="queue2", target=queue_get, args=(queue,))
    for t in (t1, t2):
        t.start()

# queue2: 100
# queue1: end
# queue2: 200
# queue2: end

    for i in range(10):
        queue.put(i)
    t = threading.Thread(name="queue_loop", target=queue_loop, args=(queue,))
    t.start()
    logger.debug('tasks are not done')
    queue.join() # queue.joinはtask_doneが判定されるまで処理をまつ
    logger.debug('tasks are done')
    queue.put(None)

# queue_loop: start
# MainThread: tasks are not done
# queue_loop: 0
# queue_loop: 1
# queue_loop: 2
# queue_loop: 3
# queue_loop: 4
# queue_loop: 5
# queue_loop: 6
# queue_loop: 7
# queue_loop: 8
# queue_loop: 9
# MainThread: tasks are done
# queue_loop: end
# ***************
# *****event*****
# ***************

def event_func1(event):
    event.wait() # event.wait()でイベントが発火するのを待つ
    logger.debug('start')
    time.sleep(3)
    logger.debug('end')

def event_func2(event):
    event.wait() # event.wait()でイベントが発火するのを待つ
    logger.debug('start')
    time.sleep(3)
    logger.debug('end')

def event_func3(event):
    logger.debug('start')
    time.sleep(3)
    logger.debug('end')
    event.set() # event.set()でイベントを発火

if __name__ == '__main__':
    event = threading.Event()
    t1 = threading.Thread(target=event_func1, args=(event,))
    t2 = threading.Thread(target=event_func2, args=(event,))
    t3 = threading.Thread(target=event_func3, args=(event,))
    t1.start()
    t2.start()
    t3.start()

# Thread-3: start
# Thread-3: end
# Thread-1: start
# Thread-2: start
# Thread-2: end
# Thread-1: end
# *******************
# *****condition*****
# *******************

def condition_func1(condition):
    with condition: # lockみたいにロックできる
        condition.wait() # 発火を待つ
        logger.debug('start')
        time.sleep(3)
        logger.debug('end')

def condition_func2(condition):
    with condition: # lockみたいにロックできる
        condition.wait() # 発火を待つ
        logger.debug('start')
        time.sleep(3)
        logger.debug('end')

def condition_func3(condition):
    with condition:
        logger.debug('start')
        time.sleep(3)
        logger.debug('end')
        condition.notifyAll() # eventを発火

if __name__ == '__main__':
    condition = threading.Condition()
    t1 = threading.Thread(name="t1", target=condition_func1, args=(condition,))
    t2 = threading.Thread(name="t2", target=condition_func2, args=(condition,))
    t3 = threading.Thread(name="t3", target=condition_func3, args=(condition,))
    t1.start()
    t2.start()
    t3.start()

# t3: start
# t3: end
# t2: start
# t2: end
# t1: start
# t1: end
# *****************
# *****barrier*****
# *****************

def barrier_func1(barrier):
    r = barrier.wait() # スレッドがそろうまで待つ
    logging.debug('num={}'.format(r))
    while True:
        logger.debug('start')
        time.sleep(3)
        logger.debug('end')

def barrier_func2(barrier):
    r = barrier.wait() # スレッドがそろうまで待つ
    logging.debug('num={}'.format(r))
    while True:
        logger.debug('start')
        time.sleep(3)
        logger.debug('end')

if __name__ == '__main__':
    barrier = threading.Barrier(2) # barrier.wait()をしているThreadが2つになったら開始
    t1 = threading.Thread(name="t1", target=barrier_func1, args=(barrier,))
    t2 = threading.Thread(name="t2", target=barrier_func2, args=(barrier,))
    t1.start()
    t2.start()

multiprocessing

import time
import logging

from multiprocessing import Process
# from multiprocessing import RLock, Semaphore, Queue, Event, Barrier
from multiprocessing import Value, Array, Pipe, Manager

# **********************************
# 基本的にはThreadingと同じように扱える
# **********************************

logging.basicConfig(level=logging.DEBUG, format='%(processName)s: %(message)s')
logger = logging.getLogger(__name__)

def worker1(i):
    logger.debug('start')
    time.sleep(i)
    logger.debug('end')

def worker2(i):
    logger.debug('start')
    time.sleep(i)
    logger.debug('end')

if __name__ == '__main__':
    t1 = Process(target=worker1, args=(3,))
    t2 = Process(target=worker1, args=(2,))
    t1.daemon = True
    t1.start()
    t2.start()
    t1.join()

# Process-1: start
# Process-2: start
# Process-2: end
# Process-1: end
def worker1(i):
    logger.debug('start')
    time.sleep(i)
    logger.debug('end')
    return i

if __name__ == '__main__':
    # Poolは並列処理するプロセスの数
    with Pool(5) as p:
        # 並列化しない場合はapply
        logger.debug(p.apply(worker1, (1,)))
        # Pool.apply_asyncで関数を並列処理
        p1 = p.apply_async(worker1, (3,))
        p2 = p.apply_async(worker1, (2,))
        logger.debug('exec')
        # logger.debug(p2.get(timeout=1)) # TimeoutError をあげる 
        logger.debug(p1.get()) # 値が返ってくるまで待つ。

# MainProcess: 1
# MainProcess: exec
# SpawnPoolWorker-2: start
# SpawnPoolWorker-3: start
# SpawnPoolWorker-3: end
# SpawnPoolWorker-2: end
# MainProcess: 3
if __name__ == '__main__':
    with Pool(5) as p:
        # Pool.mapを使うと指定関数をargsの数だけ並列実行
        r = p.map(worker1, (3,2,1))
        print(r, type(r))
        # l = list(map(worker1, (3,2,1))) # builtinのmapも並列じゃないけど同じような動作をする
        # print(l)

# SpawnPoolWorker-1: start
# SpawnPoolWorker-2: start
# SpawnPoolWorker-3: start
# SpawnPoolWorker-3: end
# SpawnPoolWorker-2: end
# SpawnPoolWorker-1: end
# [3, 2, 1] <class 'list'>

        r = p.map_async(worker1, (3,2,1))
        print(r.get(), type(r)) # map_asyncの戻り値のget()は並列処理が完了するまで待つ。

# SpawnPoolWorker-4: start
# SpawnPoolWorker-3: start
# SpawnPoolWorker-5: start
# SpawnPoolWorker-3: end
# SpawnPoolWorker-5: end
# SpawnPoolWorker-4: end
# [3, 2, 1] <class 'multiprocessing.pool.MapResult'>

        r = p.imap(worker1, (3,2,1)) # imapの戻り値はイテレータ
        logger.debug([i for i in r]) # 回された瞬間に実行される

# SpawnPoolWorker-2: start
# SpawnPoolWorker-1: start
# SpawnPoolWorker-3: start
# SpawnPoolWorker-3: end
# SpawnPoolWorker-1: end
# SpawnPoolWorker-2: end
# MainProcess: [3, 2, 1]
# ******************
# mapやimapで複数引数使いたい時は, objectかラップ関数を使う
# ******************

def multiply(a, b):
    logger.debug(f'a:{a} b:{b}')
    return a * b

def wrap_multiply(args):
    logger.debug('wrap')
    return multiply(*args)

if __name__ == '__main__':
    param1 = [1, 2, 3, 4]
    param2 = [10, 20, 30, 40]

    with Pool(5) as p:
        # zip関数はイテレータをタプルにそれぞれまとめて返す
        params = zip(param1, param2)
        results = p.imap(wrap_multiply, params)
        print(list(results)) # listに変換しても実行される

# SpawnPoolWorker-1: wrap
# SpawnPoolWorker-1: a:1 b:10
# SpawnPoolWorker-1: wrap
# SpawnPoolWorker-1: a:2 b:20
# SpawnPoolWorker-2: wrap
# SpawnPoolWorker-2: a:3 b:30
# SpawnPoolWorker-1: wrap
# SpawnPoolWorker-1: a:4 b:40
# [10, 40, 90, 160]
# multiprocessでは実は一つのプロセス内で使っているメモリはすべて独立している
# 引数なんかはコピーされている

def connect(conn):
    conn.send('test_start') # 親に送る
    time.sleep(3)
    conn.send('test_end')
    conn.close()

if __name__ == '__main__':
    # Pipeは現在のプロセスと子プロセスの通信路を作る
    parent_conn, child_conn = Pipe()
    p = Process(target=connect, args=(parent_conn,))
    p.start()
    logger.debug(child_conn.recv()) # 子から受け取る
    logger.debug(child_conn.recv())
# ************
# 共有メモリ
# ************

def share1(num, arr):
    num.value += 1.0
    logger.debug(num)
    for i in range(len(arr)):
        arr[i] *= 2
    logger.debug(arr)

if __name__ == '__main__':
    # Valueは値を共有 浮動小数点はf
    num = Value('f', 0.0)
    # Arrayはリストを共有 integerはi
    arr = Array('i', [1, 2, 3, 4, 5])

    p = Process(target=share1, args=(num, arr))
    p.start()
    p.join()
    logger.debug(num.value)
    logger.debug(arr[:])

# Process-1: <Synchronized wrapper for c_float(1.0)>
# Process-1: <SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_long_Array_5 object at 0x00000222EC9EA640>>
# MainProcess: 1.0
# MainProcess: [2, 4, 6, 8, 10]

def share2(l, d, n):
    l.reverse()
    d['x'] += 1
    n.y += 1

if __name__ == '__main__':
    # Managerは直観的に共有オブジェクトを操作できる。ただ、すこーし遅い
    with Manager() as manager:
        l = manager.list()
        d = manager.dict()
        n = manager.Namespace()

        l.append(1)
        l.append(2)
        l.append(3)
        d['x'] = 0
        n.y = 0

        p1 = Process(target=share2, args=(l, d, n))
        p1.start()
        p1.join()

        logger.debug(l)
        logger.debug(d)
        logger.debug(n)

# MainProcess: [3, 2, 1]
# MainProcess: {'x': 1}
# MainProcess: Namespace(y=1)

別サーバプロセス

  • server.py
import queue
from multiprocessing.managers import BaseManager

queue = queue.Queue()

class QueueManager(BaseManager):
    pass
    
QueueManager.register('get_queue', callable=lambda: queue)

manager = QueueManager(
    address=('127.0.0.1', 50000),
    authkey=b'abcde'
)
server = manager.get_server()
server.serve_forever()
  • client.py
import queue
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
    pass
    
QueueManager.register('get_queue')

manager = QueueManager(
    address=('127.0.0.1', 50000),
    authkey=b'abcde'
)
manager.connect()
queue = manager.get_queue()
# server.pyサーバにqueueをセット
queue.put('hello')
# server.pyサーバからqueueをゲット
print(queue.get())

高水準の並列化ライブラリ

import concurrent.futures
import logging
import time

logging.basicConfig(level=logging.DEBUG, format='%(processName)s: %(message)s')
logger = logging.getLogger(__name__)

def worker(x, y):
    logger.debug('start')
    time.sleep(3)
    r = x * y
    logger.debug(r)
    logger.debug('end')
    return r

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        f1 = executor.submit(worker, 2, 5)
        f2 = executor.submit(worker, 3, 10)
        logger.debug(f1.result())
        logger.debug(f2.result())

if __name__ == '__main__':
    main()

# MainProcess: start
# MainProcess: start
# MainProcess: 10
# MainProcess: 30
# MainProcess: end
# MainProcess: end
# MainProcess: 10
# MainProcess: 30

シングルスレッドの非同期処理

I/Oの時間を別の処理で有効に使う。

  • 非同期処理:タスクの処理が終了したタイミングで通知される。 Callback/Future/Promiseと表現
  • ノンブロッキング:処理の終了、処理のエラーを即座に返す。 Pollingと表現

はじめにgeneratorベースのコルーチンの例

# yieldで処理が中断する コルーチン
def g2():
    print('first')
    r = yield 'hello'
    print('second')
    yield r
g2 = g2()
print(next(g2)) # ここではyield 'hello'が帰ってきている
# print(next(g2)) # None
print(g2.send('plus')) # ここではyield r が帰ってきている

# first
# hello
# second
# plus

# yield from 文
def g3():
    yield from [1,2,3]
    yield from g1()

g3 = g3()
print(next(g3)) # 1
print(next(g3)) # 2
print(next(g3)) # 3
print(next(g3)) # hello

def sub_sub_generator():
    yield "Sub Sub yield"
    return "Sub Sub return"

def sub_generator():
    yield "Sub yield"
    res = yield from sub_sub_generator()
    print("Sub res = {}".format(res))
    return "Sub return"

def generator():
    yield "Gen yield"
    res = yield from sub_generator()
    print("Gen res = {}".format(res))
    yield res # yield from句の中でないと、return句は機能しない

gen = generator()
# yieldはそこで処理が中断する。
# yield from 句ではreturn句のときに戻り値が返ってきて、続行する。
print(next(gen)) # Gen yield
print(next(gen)) # Sub yield
print(next(gen)) # Sub Sub yield
print(next(gen)) # Sub res = Sub Sub return, Gen res = Sub return, Sub return  

ジェネレータベースのコルーチンの動作がわかったところで、asyncioの非同期操作を見ていく。

import asyncio
# threadと同じようなライブラリもある
# from asyncio import Lock, Event, Condition, Semaphore, Queue, Future
# lock = Lock()
# loop.run_until_complete(asyncio.wait([waorker1(lock)]))
# async def worker1(lock):
#     with await lock:
#         await asyncio.sleep(2)
import threading
import time

def worker():
    print('start')
    time.sleep(2)
    print('end')

# @asyncio.coroutine python 3.5まで
async def worker_async():
    print('start')
    # yield from asyncio.sleep(2) python3.5まで
    await asyncio.sleep(2) 
    print('end')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # 全ての処理が完了したらクローズする
    # async関数の中の、awaitの部分が非同期となり、await中に他の処理ができるようになる
    loop.run_until_complete(asyncio.wait([worker_async(), worker_async()]))
    loop.close()
    # マルチスレッドの場合
    # t1 = threading.Thread(target=worker)
    # t2 = threading.Thread(target=worker)
    # t1.start()
    # t2.start()

# start
# start
# end
# end
# loop.run_forever, loop.call_later, loop.stopの例
import asyncio

loop = asyncio.get_event_loop()

def hello(name, loop):
    print(name)
    loop.stop()

loop.call_later(2, hello, 'Mike', loop)
# loop.call_soon(hello, 'Nancy', loop)

loop.run_forever()
loop.close()
# asyncio chain
import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

# Compute 1 + 2 ...
# 1 + 2 = 3

通信I/Oで非同期処理する例を見ていく。

pip install aiohttp
import asyncio
import time

import aiohttp
import requests

loop = asyncio.get_event_loop()

# async def hello(url):
#     print(requests.get(url).status_code)

# 200
# 200
# 1.5945141315460205

async def hello(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            resp = await response.read()
            print(response.status)

# 200
# 200
# 0.8088996410369873

start = time.time()
loop.run_until_complete(asyncio.wait([
  hello("https://sehippocampus.work/"),
  hello("https://sehippocampus.work/")
]))
end = time.time()
print(end - start)

socket通信

pythonの通信モジュール 公式Doc

これをつかってwebサーバとかを作っている。

tcpの場合

  • socket_server.py
import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: # ipv4, tcpで通信
    s.bind(('127.0.0.1', 55000)) # ポートを使う
    s.listen(1) # 1接続
    while True:
        conn, addr = s.accept() # 誰かが接続するのを待つ
        with conn:
            data = conn.recv(1024) # chunk=1024で受け取る
            if not data:
                break
            print('data: {}, addr: {}'.format(data, addr))
            conn.sendall(b'Received: ' + data) # clientにデータを返す
  • socket_client.py
import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.connect(('127.0.0.1', 55000)) # 通信を試みる
    s.sendall(b'Hello') # 送る
    data = s.recv(1024) # 返信を受け取る
    print(repr(data)) # reprはオブジェクトそのものの形を表示

udpの場合

受け取られたかを確認しないので簡単

  • socket_server.py
import socket

with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: # ipv4, udpで通信
    s.bind(('127.0.0.1', 55000)) # ポートを使う
    while True:
        data, addr = s.recvfrom(1024) # chunk=1024で受け取る
        print('data: {}, addr: {}'.format(data, addr))
  • socket_client.py
import socket

with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
    s.sendto(b'Hello', ('127.0.0.1', 55000)) # 送る

簡単なwebサーバ

import http.server
import socketserver

with socketserver.TCPServer(('127.0.0.1', 8000),
                            http.server.SimpleHTTPRequestHandler) as httpd:
    httpd.serve_forever()

contextlib

import contextlib

# デコレータを簡単に書く。引数も与えられる。
@contextlib.contextmanager
def tag(name):
    print('<{}>'.format(name))
    # yield内で実関数が動く
    yield
    print('</{}>'.format(name))

@tag('h3')
def f(content):
    print(content)

f('test')

# <h3>
# test
# </h3>

# 関数をつくらなくてもwithでデコレータみたいに使える
with tag('h2'):
    print('test')
    with tag('a'):
        print('test2')

# <h2>
# test
# <a>
# test2
# </a>
# </h2>
import contextlib

# ContextDecoratorを継承することでデコレータライクにクラスを扱える
class tag(contextlib.ContextDecorator):
    def __init__(self, name):
        self.name = name
        self.start_tag = '<{}>'.format(name)
        self.end_tag = '</{}>'.format(name)
    
    def __enter__(self):
        """クラスが呼び出される時の関数"""
        print(self.start_tag)
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """クラスが最後に呼び出される時の関数"""
        print(exc_type) # exceptiontype
        print(exc_val) # exception内容
        print(exc_tb) # exception traceback
        print(self.end_tag)

with tag('h2'):
    print('test')

# <h2>
# test
# None
# None
# None
# </h2>
import logging
import contextlib

# Errorを無視したいとき、suppressを使える
with contextlib.suppress(FileNotFoundError):
    os.remove('somefile.tmp')

with 


# 標準出力をファイルに書き込むredirect_std
with open('stdout.log', 'w') as f:
    with contextlib.redirect_stdout(f):
        print('hello')
with open('stderr.log', 'w') as f:
    with contextlib.redirect_stderr(f):
        logging.error('Error')

import contextlib
def is_ok_job():
    try:
        print('do something')
        raise Exception('error')
        return True
    except Exception:
        return False

def cleanup():
    print('clean up')
def cleanup2():
    print('clean up2')

# finallyの処理を取り回しやすくするstack
with contextlib.ExitStack() as stack:
    stack.callback(cleanup) # finallyみたいにやりたい関数を入れておく
    stack.callback(cleanup2)

    @stack.callback # デコレータでもcallbackを追加できる
    def cleanup3():
        print('clean up3')

    is_ok = is_ok_job()
    print('more task')

    if is_ok:
        stack.pop_all() # もし処理が成功したらstackに入れてある関数を取り除く
    
    # is_okが失敗していれば、cleanupが行われる

# do something
# more task
# clean up3
# clean up2
# clean up

io

インメモリストリームを作成できる。

# tarファイルを保存せずに取り扱う
import io
import requests
import tarfile

url = ('https://files.pythonhosted.org/packages/12/68/95515eaff788370246dac534830ea9ccb0758e921ac9e9041996026ecaf2/setuptools-53.0.0.tar.gz')

r = requests.get(url)
f = io.BytesIO(r.content)

with tarfile.open(fileobj=f) as t:
    print(t.getmembers())    

collections

# ChainMapは辞書をそのままの形で結合できる。
a = {'a': 'a', 'num': 0}
b = {'b': 'b', 'c': 'cc'}
c = {'b': 'bbb', 'c': 'ccc'}

m = collections.ChainMap(a, b, c)
print(m)
# ChainMap({'a': 'a', 'num': 0}, {'b': 'b', 'c': 'cc'}, {'b': 'bbb', 'c': 'ccc'})
print(m.maps) # list
# [{'a': 'a', 'num': 0}, {'b': 'b', 'c': 'cc'}, {'b': 'bbb', 'c': 'ccc'}]

# defaultdictはキーが存在しないときに値を返す
d = collections.defaultdict(int)
print(d['1']) # 0
d = collections.defaultdict(lambda:'str')
print(d['1']) # str

# Counterはイテレータをカウントしてくれる
c = collections.Counter([1,2,3,1,2, 'str'])
print(c) # Counter({1: 2, 2: 2, 3: 1, 'str': 1})
print(c.most_common(1)) # [(1, 2)]

# dequeは双方向から操作できるキュー。threadにもqueueと同様に使える
d = collections.deque([1,2,3,4])
print(d.pop()) # 4
print(d.popleft()) # 1
print(d) # deque([2, 3])
d.appendleft(4)
d.append(1)
print(d) # deque([4, 2, 3, 1])

# クラスメンバライクにアクセスできるタプル
# 第一引数はtupleの名前, 第二引数がタプルの中身
Point = collections.namedtuple('Point', ['x', 'y'])
p = Point(10, y=20) # *[10, 20]
print(p) # Point(x=10, y=20)
print(p.x) # 10
# p.x = 100 # AttributeError: can't set attribute

正規表現re

import re

s = r'abc de fg abc'
# . は任意の一文字

# match() 文字列の先頭で正規表現とマッチするか判定
m = re.match('a.c', s)
print(m.group()) # abc

# search() 文字列を操作して正規表現がどこにマッチするか調べる(最初の一致)
m = re.search('a.c', s)
print(m.span()) # (0, 3)  :0から2までが一致
print(m.group()) # abc

# findall() 正規い表現にマッチする部分文字列を全て探し出しリストとして返す
m = re.findall('a.c', s)
print(m) # ['abc', 'abc']

# finditer() 重複しないマッチオブジェクトのイテレータを返す
m = re.finditer('a.c', s)
print([w.group() for w in m]) # ['abc', 'abc']

s1 = ('/abc-def-ghi/xxxxxxxxxxxx/123456789/yyy-yyy-yy')
s2 = ('/abc-def-ghi/XXXXXXXXXXXX/123456789/YYY-YYY-YY')
# (?P<name> re) で、groupnameをつけることができる。
m = re.match(r'/[\w-]+/(?P<xxx>[\w]+)/[\d]+/(?P<yyy>[\w-]+)', s1)
print(m.group('xxx')) # xxxxxxxxxxxx
print(m.group('yyy')) # yyy-yyy-yy
# compileを使うと高速化できる
RE_STACK_ID = re.compile(r'/[\w-]+/(?P<xxx>[\w]+)/[\d]+/(?P<yyy>[\w-]+)')
for s in (s1, s2):
    m = RE_STACK_ID.match(s)
    if m:
        print(m.group('xxx'))
        print(m.group('yyy'))

s = 'My name is ... Mike'
print(s.split()) # ['My', 'name', 'is', '...', 'Mike']

# 正規表現でsplit
p = re.compile(r'\W+')
print(p.split(s)) # ['My', 'name', 'is', 'Mike']

# 正規表現で置換
p = re.compile('name|names')
print(p.sub('firstname', s)) # My firstname is ... Mike
print(p.sub('firstname', s + ' names', count=1)) # My firstname is ... Mike names
print(p.subn('firstname', s + ' names')) # ('My firstname is ... Mike firstnames', 2)

def hexrepl(match):
    value = int(match.group())
    return hex(value)

# matchしたら関数を適用することもできる
p = re.compile(r'\d')
print(p.sub(hexrepl, '12345 01 test')) # 0x10x20x30x40x5 0x00x1 test

# greedyである正規表現の直し方
s = '<html><head><title>Title</title></head>'
print(re.match('<.*>', s).group()) # <html><head><title>Title</title></head>
print(re.match('<.*?>', s).group()) # <html>