Composing generators in Python

先日のエントリでは、射影(Projection)を用いた領域検索について解説し、簡単な実装を行いました。今回はその実装をリファクタリングしてみます。


では、早速始めましょう。先日のコードをジェネレータとしてsequential_search()関数にまとめたコードから開始します。

#!/usr/local/bin/python3.0 -t

def sequential_search(r, ps):
    for p in (p
              for p in (p
                        for p in ps
                        if min(r[0][0], r[1][0]) <= p[0] <= max(r[0][0], r[1][0]))
              if min(r[0][1], r[1][1]) <= p[1] <= max(r[0][1], r[1][1])):
        yield p

if __name__ == '__main__':
    ps = (( 84, 211, 'A'),
          (253,  49, 'B'),
          (147, 189, 'C'),
          (105,  84, 'D'),
          (126, 330, 'E'),
          (189, 253, 'F'),
          ( 49, 147, 'G'),
          (168, 105, 'H'),
          (211, 168, 'I'),
          (309, 126, 'J'),
          (232, 281, 'K'),
          (351, 316, 'L'),
          (330,  70, 'M'),
          (288, 351, 'N'),
          ( 70, 267, 'O'),
          (274, 232, 'P'),
          )

    r = ((126, 232),
         (211, 351))

    for p in sequential_search(r, ps):
        print(p[-1])


まず、内包表記の中で用いられているifの条件を関数として掃き出してみます。新たにwithin()関数を作成しましょう。引数dには射影する次元を指定します。例えばx軸に射影する場合は0を与えます。また、within()関数を導入したことに合わせ、内包表記内のifの条件も書き換えています。

def within(r, d, p):
    return min(r[0][d], r[1][d]) <= p[d] <= max(r[0][d], r[1][d])
    
def sequential_search(r, ps):
    for p in (p
              for p in (p
                        for p in ps
                        if within(r, 0, p))
              if within(r, 1, p)):
        yield p

Pythonの内包表記は大変便利なのですが、入れ子にすると少々見通しが悪くなる場合もあります。Python3.0からジェネレータとして実装されている、filter()関数を使用してみましょう。

def sequential_search(r, ps):
    for p in filter(lambda p: within(r, 1, p),
                    filter(lambda p: within(r, 0, p), ps)):
        yield p

ちょっとだけ見通しがよくなりました。次に、within()関数をカリー化して、lambdaを消してしまいましょう。Pythonでは、functools.partial()関数を用いて関数をカリー化することが出来ますが、within()関数に関しては後ほど最適化する余地を残すため、自前でカリー化を施してみます。

def within(r, d):
    def _(p):
        return min(r[0][d], r[1][d]) <= p[d] <= max(r[0][d], r[1][d])
    return _
    
def sequential_search(r, ps):
    for p in filter(within(r, 1),
                    filter(within(r, 0), ps)):
        yield p

within()関数をカリー化したことに伴い、lambdaが消えてしまいました。

within()関数内の包有判定では、呼び出されるたびにminおよびmaxによる範囲の正規化が行われてしまいます。カリー化したのに合わせ、これを最適化しておきましょう。予め興味のある次元の範囲を整列し、prmおよびprp変数に代入しておきます。

from operator import itemgetter

def within(r, d):
    prm, prp = tuple(x[d]
                     for x in sorted(r, key=itemgetter(d)))
    def _(p):
        return prm <= p[d] <= prp
    return _

以上でwithin()関数の準備は終了です。


さて、今一度sequential_search()関数を眺めてみましょう。

def sequential_search(r, ps):
    for p in filter(within(r, 1),
                    filter(within(r, 0), ps)):
        yield p

sequential_search()関数をリファクタリングする目的を2つ設定します。

  • 確立に基づいた優先順位による射影および包有判定を行うようにする
  • 多次元へ対応する

まず、多次元への対応を済ませてしまいましょう。例えば、この関数を単純に三次元に拡張したい場合は、以下のようになるかと思います。

def sequential_search(r, ps):
    for p in filter(within(r, 2),
                    filter(within(r, 1),
                           filter(within(r, 0), ps))):
        yield p

しかし、このような拡張はあまり得策ではなさそうです。

目的は点群をそれぞれの次元において射影および包有判定を行い、一点づつ反復することでした。入力はn次元の点群であり、出力はそれぞれの次元における包有判定を全てパスした点を返すジェネレータです。そのようなジェネレータを作ってしまえばよいのです。

簡単のため、まずは二つのジェネレータを合成するcompose()関数を作成してみます。引数には、カリー化されたジェネレータを渡します。カリー化をするために、functools.partial()関数を使用しています。また、ジェネレータを作用させる順序は、初めの引数からとします。

from functools import partial

def compose(function1, function2):
    def _(iterable):
        for p in function2(function1(iterable)):
            yield p
    return _
    
def sequential_search(r, ps):
    composed = compose(partial(filter, within(r, 0)),
                       partial(filter, within(r, 1)))

    for p in composed(ps):
        yield p

反復を行うコードが大変すっきりしました。そのため、入力と出力の関係もよりはっきりしました。

さらにcompose()関数をリファクタリングして、三つ以上のジェネレータを合成出来るようにします。複数のジェネレータを渡せるよう、compose()関数に指定する引数を反復可能なオブジェクトとしています。

from operator import itemgetter
from functools import partial

def compose(functions):
    functions = iter(functions)
    try:
        function = next(functions)
    except StopIteration:
        return lambda x: x
    else:
        composed = compose(functions)
        def _(iterable):
            for x in composed(function(iterable)):
                yield x
        return _
    
def sequential_search(r, ps):
    composed = compose((partial(filter, within(r, 0)),
                        partial(filter, within(r, 1)),
                        ))

    for p in composed(ps):
        yield p


さて最後に、確立に基づいた優先順位による射影と包有判定を行うようにリファクタリングします。検索範囲の差分の絶対値を整列し、それぞれのジェネレータに変換します。

from operator import itemgetter
from functools import partial

def sequential_search(r, ps):
    composed = compose(partial(filter, within(r, d))
                       for d, pd in (sorted(enumerate(map(lambda a, b: abs(a - b), *r)),
                                            key=itemgetter(1))))

    for p in composed(ps):
        yield p

さらに、operator.sub()関数を使ってlambdaを消してしまいましょう。

from operator import itemgetter, sub
from functools import partial

def sequential_search(r, ps):
    composed = compose(partial(filter, within(r, d))
                       for d, pd in (sorted(enumerate(map(abs, map(sub, *r))),
                                            key=itemgetter(1))))

    for p in composed(ps):
        yield p


以上でリファクタリングは終了です。コードは以下のようになりました。確立に基づく射影の順序を選択させるだけではなく、多次元の領域検索に耐えうるものとなりました。

#!/usr/local/bin/python3.0 -t

from operator import itemgetter, sub
from functools import partial

def within(r, d):
    prm, prp = tuple(x[d]
                     for x in sorted(r, key=itemgetter(d)))
    def _(p):
        return prm <= p[d] <= prp
    return _

def compose(functions):
    functions = iter(functions)
    try:
        function = next(functions)
    except StopIteration:
        return lambda x: x
    else:
        composed = compose(functions)
        def _(iterable):
            for x in composed(function(iterable)):
                yield x
        return _
    
def sequential_search(r, ps):
    composed = compose(partial(filter, within(r, d))
                       for d, pd in (sorted(enumerate(map(abs, map(sub, *r))),
                                            key=itemgetter(1))))

    for p in composed(ps):
        yield p

if __name__ == '__main__':
    ps = (( 84, 211, 'A'),
          (253,  49, 'B'),
          (147, 189, 'C'),
          (105,  84, 'D'),
          (126, 330, 'E'),
          (189, 253, 'F'),
          ( 49, 147, 'G'),
          (168, 105, 'H'),
          (211, 168, 'I'),
          (309, 126, 'J'),
          (232, 281, 'K'),
          (351, 316, 'L'),
          (330,  70, 'M'),
          (288, 351, 'N'),
          ( 70, 267, 'O'),
          (274, 232, 'P'),
          )

    r = ((126, 232),
         (211, 351))

    for p in sequential_search(r, ps):
        print(p[-1])


さて、今回作成したcompose()関数のようなジェネレータを合成するアイデアは他の用途にも応用出来そうです。以下のコードは、0から始まる50未満の偶数群を列挙します。

from functools import partial
from itertools import count, takewhile

if __name__ == '__main__':
    composed = compose((partial(takewhile, lambda x: x < 50),
                        partial(filter,    lambda x: x % 2 == 0),
                        ))

    print(tuple(composed(count())))
(0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48)

また、上述の数値群の二乗を取りたい場合は、以下のように予め、もう一つジェネレータを合成すればよいでしょう。

from functools import partial
from itertools import count, takewhile

if __name__ == '__main__':
    composed = compose((partial(takewhile, lambda x: x < 50),
                        partial(filter,    lambda x: x % 2 == 0),
                        partial(map,       lambda x: x ** 2),
                        ))

    print(tuple(composed(count())))
(0, 4, 16, 36, 64, 100, 144, 196, 256, 324, 400, 484, 576, 676, 784, 900, 1024, 1156, 1296, 1444, 1600, 1764, 1936, 2116, 2304)

ジェネレータの合成を行わないコードは以下となります。動的にジェネレータを選択する必要がない場合、以下のコードのほうが好みです。

#!/usr/local/bin/python3.0 -t

from itertools import count, takewhile

if __name__ == '__main__':
    print(tuple(map(lambda x: x ** 2,
                    filter(lambda x: x % 2 == 0,
                           takewhile(lambda x: x < 50,
                                     count())))))

まとめ

今回は先日のエントリで紹介した確立に基づいた射影および包有判定を実装しました。そのため、ジェネレータを合成する関数について考えてみました。

次回のエントリでは先日のエントリの話題に立ち戻り、射影を使った矩形の交差判定を紹介したいと思っています。