본문 바로가기

Theory/Algorithms

Sparse Matrix (희소행렬) 구현하기

반응형

행렬 곱하기는 행렬 크기가 클수록 많은 연산량이 소요됩니다. 만약 행렬에 0이 대부분이고 0이 아닌 요소가 (non-zero elements) 매우 적은 경우 0은 곱연산에 의미가 없으므로 일반적인 행렬 곱연산은 비효율적이겠죠. 이럴때 효율적으로 행렬 곱을 수행할 수 있는 방법은 sparse matrix (희소행렬) 형태로 행렬을 재구성하는 것입니다. 희소행렬은 0이 아닌 요소에 대해서만 (행 인덱스, 열 인덱스, 값)을 가진 형태로 0이 아닌 요소만 저장하고 불필요한 0인 요소는 따로 저장하지 않습니다. Scipy 라이브러리의 sparse matrix 모듈에 관련 함수들이 구현되어 있고 이번 포스트에서는 raw한 파이썬으로 구현해보도록 하겠습니다.

Data structure

먼저 희소행렬을 구성하기 위한 자료구조를 정의합니다. 생성자에 행렬의 크기 (행, 열)을 정의하고 "append" 라는 메소드로 구현하여 0이 아닌 요소를 추가하게끔 구현합니다.

class SparseMatrix(object):
    def __init__(self, rows, cols):
        '''
        Arguments:
        - rows: Number of rows of matrix
        - cols: Number of columns of matrix
        '''

        self.rows = rows
        self.cols = cols

        self.matrix = list()

    def append(self, i, j, value):
        '''
        Argumnets:
        - i: Index of row
        - j: Index of column
        - value: Value of Element
        '''

        ## Add [i, j, value]
        if self.rows <= i:
            raise ValueError(f'Invalid row index... Should be lower than {self.rows}')
        if self.cols <= j:
            raise ValueError(f'Invalid col index... Should be lower than {self.cols}')

        self.matrix.append([i, j, value])
  • "append" 메소드는 (행 인덱스, 열 인덱스, 값)을 입력으로 받아 0이 아닌 요소를 추가합니다. 내부적으로는 "self.matrix" 라는 리스트에 0이 아닌 요소를 담습니다.

Transpose

이후 행렬 곱연산을 수행하기 위해서 필요한 전치행렬을 희소행렬 형태로 구현합니다. 위에서 정의한 "SparseMatrix" 클래스를 새로 선언하고 전치행렬이므로 (열 인덱스, 행 인덱스, 값) 요소를 추가합니다.

    def transpose(self):
        transpose_matrix = SparseMatrix(rows=self.cols, cols=self.rows)
        for element in self.matrix:
            transpose_matrix.append(i=element[1], j=element[0], value=element[2])

        return transpose_matrix

 

기타 함수들

1. (행 인덱스, 열 인덱스)의 오름차순으로 희소행렬 요소를 재정렬하는 함수를 구현합니다.

    def sort(self):
        self.matrix = sorted(self.matrix, key=lambda x: (x[0], x[1]))

2. (행 인덱스, 열 인덱스)에 따른 값을 가져오는 함수를 구현합니다. 해당하는 (행, 열)이 없을 경우 0을 리턴합니다.

    def get_value(self, i, j):
        '''
        Arguments:
        - i: Row index
        - j: Col index
        Return:
        - Corresponding value of i, j
        '''

        if self.rows <= i or self.cols <= j:
            raise ValueError(f'Wrong index')

        for elem in self.matrix:
            if elem[0] == i and elem[1] == j:
                return elem[2]
        ## If (i, j) not exist, return 0
        return 0

3. 행렬의 크기, 요소 프린트 함수 등을 구현합니다.

    @property
    def shape(self):
        return (self.rows, self.cols)

    def __len__(self):
        return len(self.matrix)

    def __repr__(self):
        s = ''
        for elem in self.matrix:
            s += ' '.join(str(elem))
            s += '\n'

        return s

 

Sparse matrix multiplication

2개의 희소행렬 곱연산을 구현합니다. 참고로 희소행렬 곱하기 결과가 꼭 희소행렬일 필요는 없습니다만 여기서는 편의상 곱하기 결과 행렬도 희소행렬 형태로 출력합니다.

from collections import defaultdict

def SparseMatrixMul(A, B):
    '''
    Arguments:
    - A, B: Sparse Matrices
    Return:
    - Matrix Multiplication of A, B in Sparse Matrix structure
    '''

    assert isinstance(A, SparseMatrix) and isinstance(B, SparseMatrix)

    if A.cols != B.rows:
        raise ValueError(f'Size Mismatch... \n Size of A: {A.shape} and Size of B: {B.shape}')

    ## Sort left matrix
    A.sort()
    ## Transpose right matrix
    B_T = B.transpose()
    ## Sort Transposed right matrix
    B_T.sort()
    ## Declare new Sparse Matrix for storing multiplication result
    C = SparseMatrix(rows=A.rows, cols=B.cols)

    ## Temp dictionary for summation
    temp = defaultdict(float)

    curr_row = A.matrix[0][0]
    for a_elem in A.matrix:
        a_row, a_col, a_value = a_elem
        for b_elem in B_T.matrix:
            b_row, b_col, b_value = b_elem
            if a_col == b_col:
                new_row = a_row
                new_col = b_row
                new_value = a_value * b_value
                temp[(new_row, new_col)] += new_value

    ## Add elements to result matrix
    for k, v in temp.items():
        row_index, col_index = k
        C.append(i=row_index, j=col_index, value=v)

    return C
  • $A, B$ 두 행렬이 있을 때 $A$의 열과 $B$의 행 수가 일치해야 합니다.
  • $B$ 행렬을 전치하고 $A, B$ 모든 요소에 대해 반복문을 수행하여 $A$의 열과 $B$의 행이 일치하면 두 요소를 곱하고 결과 행렬에 위치할 (행, 열)에 대해 결과를 해쉬테이블에 저장합니다. (여기서는 defaultdict를 사용합니다.)
  • 마지막으로 결과 희소행렬에 대해 요소를 추가해줍니다.
  • 시간복잡도는 $n_A$=Number of non-zeros in A, $n_B$=Number of non-zeros in B 라 하면 $O(n_A\cdot n_B)$가 소요되어 일반 dense 행렬 곱연산에 비해 non-zero 요소에 대해서만 곱연산이 수행되므로 더 효율적입니다.

 

결과 확인

if __name__ == '__main__':
    '''
    Test Case 1              Test Case 2
    A: (3,4)                 A: (3,5)           
    [[0,0,0,4],              [[1,0,0,1,0],
     [1,2,0,0],               [0,0,0,0,2],
     [0,0,3,0]]               [0,3,0,0,0]]
     
    B: (4,3)                 B: (5,2)
    [[1,0,0],                [[2,0], [0,0], [0,1], [0,0], [0,3]]
     [0,0,2],
     [2,3,0],                C: (3,2)
     [4,0,0]]                [[2,0],[0,6],[0,0]]
     
    C: (3,3)
    [[16, 0, 0],
     [1, 0, 4],
     [6, 9, 0]]      
    '''

    a = SparseMatrix(rows=3, cols=4)
    a.append(0,3,4)
    a.append(1,0,1)
    a.append(1,1,2)
    a.append(2,2,3)
    print(a)
    b = SparseMatrix(rows=4, cols=3)
    b.append(0,0,1)
    b.append(1,2,2)
    b.append(2,0,2)
    b.append(2,1,3)
    b.append(3,0,4)
    print(b)
    c = SparseMatrixMul(a, b)
    print(c)

    a = SparseMatrix(rows=3, cols=5)
    a.append(0,0,1)
    a.append(0,3,1)
    a.append(1,4,2)
    a.append(2,1,3)
    b = SparseMatrix(rows=5, cols=2)
    b.append(0,0,2)
    b.append(2,1,1)
    b.append(4,1,3)
    c = SparseMatrixMul(a, b)
    print(c)

 

홍머스 정리

  • 그래프를 구성하는 모든 노드가 서로 연결되어 있지 않으므로 affinity matrix 구성시 대부분의 요소가 0이 되는 희소행렬 형태가 되어 SNS 데이터 같은 사회관계망 데이터에 주로 사용됨.
  • Numpy 행렬에 대해서는 scipy.sparse 모듈을 사용

 

참조

반응형

'Theory > Algorithms' 카테고리의 다른 글

Cholesky Decomposition (촐레스키 분해) - 파이썬 구현  (0) 2022.02.20
Trie  (0) 2021.07.15
위상 정렬 (Topological Sort)  (0) 2021.03.05
자료구조 - 서로소 집합  (0) 2021.03.04
유니코드와 UTF-8  (0) 2021.03.02