본문 바로가기

Computer/Python

제너레이터와 yield (3) - 지연 계산

반응형

제너레이터와 yield (1)

제너레이터와 yield (2) - 피보나치, 무한급수


현재값만 필요한 경우에 제너레이터가 속도/메모리 사용 측면에서 유리하나 제너레이터를 사용하게 되면 수열의 다른 값을 참조할 수 없기에 (현재값만 사용하여 처리하는 알고리즘을 단일 패스 (single pass) 혹은 온라인 (online) 이라 합니다) 사용하기 까다로울 수 있습니다. 이때 활용할 수 있는 모듈이 바로 itertools 입니다.

 

itertools 모듈

이번 포스트에서는 파이썬의 itertools 내장 모듈에 대해 알아보려고 합니다. itertools는 효율적인 반복을 위한 반복기 빌딩 블록을 generator를 이용한 iterator 형태로 구성하여 빠르고 효율적으로 메

hongl.tistory.com


대용량 데이터 분석에 제너레이터를 사용하는 예제를 만들어 보겠습니다. 초 단위로 기록한 데이터를 20년 치 분석한다면 처리해야할 개수는 20*365*24*60*60 개가 되어 아마 전체 데이터셋을 메모리에 올릴 수 없을 겁니다. 만약 문제가 "타임스탬프, 값" 형태로 저장된 데이터 파일에서 값이 정규 분포를 벗어나는 날짜, 간단한 anomaly 를 찾는다는 것이라면 리스트 할당 없이 제너레이터 만으로 해결할 수 있습니다. 먼저 파일을 읽는 함수를 제너레이터로 구현하되, 알고리즘을 테스트할 수 있도록 가짜 데이터를 생성할 제너레이터도 만듭니다. 이 두 함수는 제너레이터의 next() 함수가 호출될 때마다 데이터의 한 줄씩 리턴하는 지연 계산을 구현합니다.

from random import normalvariate, randint
from datetime import datetime
from itertools import count

def read_data(filename):
    with open(filename) as fd:
        for line in fd:
            data = line.strip().split(',')
            timestamp, value = map(int, data)
            yield datetime.fromtimestamp(timestamp), value
            
def read_fake_data(filename):
    for timestamp in count():
        # 일주일에 한 번씩 특이한 데이터 삽입
        if randint(0, 7*60*60*24-1) == 1:
            value = normalvariate(0, 1)
        else:
            value = 100
        yield datetime.fromtimestamp(timestamp), value
  • itertools.count 는 start/step 을 매개변수로 받아 start 로부터 step 만큼 떨어진 수들을 무한히 생성하는 무한 반복자를 리턴합니다.

이제 같은 날 발생한 데이터를 묶어서 반환하는 함수를 만듭니다. itertools 모듈의 groupby 함수를 사용하면 되는데, groupby 함수는 iterable 객체와 그룹으로 묶어줄 키를 인자로 받아 튜플을 생성하는 제너레이터를 반환합니다. 이 튜플에는 그룹의 키와 그룹의 항목을 생성하는 제너레이터가 담겨 있어 데이터가 기록된 시점의 날짜를 키 함수로 사용하면 같은 날 발생한 데이터를 그루핑할 수 있습니다. 예를 들어 입력이 "A A A A B B A A" 이고 groupby를 사용해 글자별로 묶는다면 (A, [A,A,A,A]), (B, [B,B]), (A, [A,A]) 세 그룹이 생성됩니다. 따라서 그룹을 시간 혹은 연도별로 묶어도 됩니다.

from itertools import groupby

def groupby_day(iterable):
    key = lambda row: row[0].day
    for day, data_group in groupby(iterable, key):
        yield list(data_group)
  • 여기서 data_group은 이터레이터인데 처리를 위해 list로 형변환 한후에 yield 합니다.

이제 날짜별 데이터가 정규 분포를 따르는지를 돌려주는 함수를 만듭니다. 이 함수와 itertools.filterfalse 함수를 사용하면 정규 분포를 따르지 않는 날짜별 데이터를 얻을 수 있습니다.

from scipy.stats import normaltest
from itertools import filterfalse

def is_normal(data, threshold=1e-3):
    _, values = *zip(data)
    k2, p_value = normaltest(values)
    if p_value < threshold:
        return False
    return True
    
def filter_anomalous_group(data):
    yield from filterfalse(is_normal, data)
  • yield from 명령어는 직접적으로 yield 하지 않지만 yield 를 실제 실행할 제너레이터를 받습니다. 

이제 anomaly 를 다음과 같이 찾을 수 있습니다. 이 방법을 사용하면 전체 데이터를 한 번에 다 읽지 않고도 이상치를 간단하게 찾을 수 있고 5개 보다 이상치를 더 찾아야 한다면 anomaly_generator를 계속 사용하면 됩니다. 이렇게 명시적으로 요청된 데이터에 대한 계산만 이루어지는 것을 지연 계산 (lazy evaluation) 이라 하며 이전 상태를 저장하지 않으므로 메모리를 아낄 수 있습니다.

from itertools import islice

def filter_anomaly_data(data):
    data_group = groupby_day(data)
    yield from filter_anomalous_group(data_group)

data = read_fake_data('dd')
anomaly_generator = filter_anomaly_data(data)
first_five_anomalies = islice(anomaly_generator, 5)

for data_anomaly in first_five_anomalies:
    start_date = data_anomaly[0][0]
    end_date = data_anomaly[-1][0]
    print(f'Anomaly from {start_date} - {end_date}')

 

또한, 다른 방식의 분석, 예를 들어 하루 단위가 아니라 한 시간 크기의 moving window에 대해 계산하려면 코드를 전체 고칠 필요 없이 groupby_day 함수를 groupby_window 함수로 대체하면 됩니다.

def groupby_window(data, window_size=3600):
    window = tuple(islice(data, window_size))
    for item in data:
        yield window
        window = window[1:] + (item,)
  • 여기서 주의할 점은 yield window 이후에 window 를 업데이트 한다는 점입니다. read_fake_data 로부터 전달된 data 제너레이터는 islice 함수에서 window_size 만큼 데이터를 소비합니다. 
  • 따라서 이후 for item in data 구문에서 data 제너레이터는 window_size 이후의 데이터를 리턴하게 됩니다. 따라서 for 루프에서 가져오는 첫 번째 항목은 window_size 번째 값이 되겠죠.
반응형