본문 바로가기

블로그/머신러닝

[추천 시스템] Model-based CF: Matrix Factorization(MF)

CF는 두 가지 방식이 존재한다. 

 

첫째, Memory-based: Rating 자료를 모두 메모리에 가지고 있으면서 추천을 위한 계산에 사용하는 방식

둘째, Model-based: Rating 자료는 모델을 학습하는 데 사용하고 추천을 위한 계산은 학습된 모델로 하는 방식

 

앞선 포스팅에서 진행한 3가지 협업필터링은 메모리에 적재된 Rating에 관련된 자료를 바탕으로 추천을 위한 계산을 했다.

 

앞으로 배울 Deep-learning based 모델이나 MF와 같은 모델은 자료를 바탕으로 모델을 학습하고 추천은 학습된 모델을 바탕으로 진행하는 Model-based 방식이다.

 

Model-based CF는 Model learning에는 많은 자원이 소요되지만, 예측에 들어가는 비용은 비교적 적은 편이며, 모델을 주기적으로 Re-training 하는 방식으로 update 한다.  추천 시스템의 유명한 Singular value decomposition(SVD) 나 Factorization Machines 또한 Model-based에 속한다.

 

과연 영화를 추천하는 TASK에서 성능이 어느 정도 나올까?

추천 모델 RMSE
CF - 사용자 유사도 1.017
CF - 사용자 유사도 + 유사도 순위 1.010
CF - 사용자 유사도 + 유사도 순위 + 사용자 평가 경향 0.943
Matrix Factorization(MF) 맨 아래에 공개

이전 포스팅 - CF - 사용자 유사도 + 유사도 순위 + 사용자 평가 경향

 

https://fenzhan.tistory.com/33

 

[추천 시스템] 협업 필터링 Collaborative Filtering - 유사도 순위 반영

이전 포스팅에서는 단순히 영화를 시청한 유저가 영화를 보고 평가한 Rating 성향을 코사인 유사도를 바탕으로 구하고, 유사도가 높은 사람을 추천하는 CF를 구현해보았다. 이번에는 유저의 영화

fenzhan.tistory.com

본 포스팅에서 활용한 데이터셋 : Movie lens 100K

https://www.kaggle.com/datasets/imkushwaha/movielens-100k-dataset

 

MovieLens 100K Dataset

 

www.kaggle.com

본 포스팅에서 활용한 자료 :

연세대학교 인공지능대학원 개인화추천시스템 강의 - 임일 교수님

Python을 이용한 개인화 추천시스템 - 임일 교수님

 

 

Matrix factorization(MF)에 대해서 간략히 알아보고 구현해보자.

 

MF를 간단하게 말하면, 사용자가 상품에 대하여 남긴 평점에 대한 매트릭스(User-Item Matrix)를 dimensions을 줄인 두 개의 Matrix로 분리하는 것이다. 아래 그림을 보자

 

그림을 통해서 보면, Rating Matrix를 User Matrix와 itme Matrix로 나눠서, 각 Matrix에 대한 최적값을 구하고 두 Matrix를 내적 하면 유저에 대한 상품 선호도를 구할 수 있을 것이다라는 가정이다. 여기서 K는 각 매트릭스의 차원수이며, 임의로 설정할 수 있다.

 

*어찌 보면 행렬을 분해하는 SVD와 같다고 생각할 수 있으며, 실제로 MF는 SVD의 아이디어에서 착안하였다고 한다.

 

정리하면 다음과 같다.

 

K개의 잠재 요인(latent factor)

  • R : rating matrix – M명의 사용자가 N개 아이템에 대해 평가한 데이터를 포함한 2차원 행렬
    MF : 이 R 행렬을 사용자 행렬(P)과 아이템행렬(Q)로 분해하여 분석하는 방식
  • P : User latent matrix – 사용자 잠재요인 행렬(M*K)
    각 사용자의 특성을 나타내는 K개 요인의 값으로 이루어진 행렬
  • Q : Item latent matirx – 아이템 잠재요인 행렬(N*K)
    각 아이템의 특성을 나타내는 K개의 요인으로 이루어진 행렬

 

이제 대략적으로 MF가 어떻게 작동하는지 그림을 그려보자.

 

1) 각 User, item Matrix에 대한 값을 초기화함

 

- User와 item의 매트릭스에 대한 파라미터는 학습이 시작되기 전에 정규분포 기반의 랜덤 값으로 초기화

 

2) User Matrix, Itme Matrix를 내적해 구한 값과 실제 Rating matrix의 값과의 차이인 에러를 구함

 

User i의 item j에 대한 예측값 R^_ij과 오차 e_ij

3) Error를 줄이는 방법으로 Matrix에 대한 값을 최적화함

 

MF에서는 오차 e_ij의 제곱인 Squred Error를 줄이는 방식으로, P와 Q^T의 매트릭스에 해당하는 값을 최적화한다.

 

MF의 Objective Function은 아래식의 합을 최소화하는 것이며,

 

$e_{i j}^2=\left(r_{i j}-\sum_{k=1}^K p_{i k} q_{k j}\right)^2+\frac{\beta}{2} \sum_{k=1}^K\left(\|P\|^2+\|Q\|^2\right)$

 

위 그림의 앞 텀(term)은 오차 제곱이며 뒤에 붙은 식은 Overfitting을 방지하기 위해 정규화 텀을 붙여준다.

정규화 텀은 파라미터인 P와 Q의 값이 커지지 않도록 정규화를 해주는 기능을 수행한다. 여기서 beta/2를 바탕으로 정규화 수준을 결정하게 된다.

 

MF에서는 오차함수를 최소화시키기 위해 오차함수를 p와 q로 편미분 한 SGD(Stochastic Gradient Descent)가 적용된다.

 

$\frac{\partial}{\partial p_{i k}} e_{i j}^2=-2\left(r_{i j}-\hat{r}_{i j}\right)\left(q_{k j}\right)=-2 e_{i j} q_{k j}$
$\frac{\partial}{\partial q_{i k}} e_{i j}^2=-2\left(r_{i j}-\hat{r}_{i j}\right)\left(p_{i k}\right)=-2 e_{i j} p_{i k}$

 

위의 수식으로 구한 Gradient를 바탕으로 각 아이템과 유저의 매트릭스를 업데이트한다. 아래 수식의 alpha는 learning rate로 파라미터 업데이트를 어느 정도 크기로 변화시킬지 정하는 하이퍼 파라미터이다. 

 

$p_{i k}^{\prime}=p_{i k}+\alpha \frac{\partial}{\partial p_{i k}} e_{i j}^2=p_{i k}+2 \alpha e_{i j} q_{k j}$
$q_{k j}^{\prime}=q_{k j}+\alpha \frac{\partial}{\partial q_{k j}} e_{i j}^2=q_{k j}+2 \alpha e_{i j} p_{i k}$

 

4) 평가 경향 추가

 

앞 포스팅에서 유저의 평가 경향을 추가하였을 때, CF의 성능이 개선된 것을 확인할 수 있었다.

 

사람 1은 영화를 볼 때 기본적으로 평점을 높게 주며, 사람 2는 평점에 대한 기준이 높아 굉장히 낮게 준다.  따라서, 이러한 평가 경향을 반영하기 위해 평점에 대한 정규화 처리를 해주며, 이를 통해 성능을 높일 수 있었다.

 

이를 위해, 예측 평점 R^_ij에 대한 식을 아래와 같이 변형을 한다. 여기서 "b"는 학습하는 데이터 전체 평점의 평균이며, bu_i는 사용자가 가진 편향, bdj는 아이템이 가진 편향이다.

 

$\hat{r}_{i j}=b+b u_i+b d_j+\sum_{k=1}^k p_{i k} q_{k j}$

 

주의 할 점은 bias 또한 학습 대상인 파라미터이며, Bias에 대한 update rule은 아래와 같다. 

 

$b u_i^{\prime}=b u_i+\alpha \times\left(e_{i j}-\beta b u_i\right)$
$b d_j^{\prime}=b d_j+\alpha \times\left(e_{i j}-\beta b d_j\right)$

 

Python 코드로 위의 식을 확인해보자.

import numpy as np
import pandas as pd
from sklearn.utils import shuffle

r_cols = ['user_id', 'movie_id', 'rating', 'timestamp']
ratings = pd.read_csv('archive/u.data', names=r_cols,  sep='\t',encoding='latin-1')
ratings = ratings[['user_id', 'movie_id', 'rating']].astype(int)            # timestamp 제거

# train test 분리
TRAIN_SIZE = 0.75
ratings = shuffle(ratings, random_state=12)
cutoff = int(TRAIN_SIZE * len(ratings))
ratings_train = ratings.iloc[:cutoff]
ratings_test = ratings.iloc[cutoff:]

# New MF class for training & testing
class NEW_MF():
    # Initializing the object
    def __init__(self, ratings, K, alpha, beta, iterations, tolerance=0.005, verbose=True):
        self.R = np.array(ratings)
        # user_id, movie_id를 R의 index와 매칭하기 위한 dictionary 생성
        item_id_index = []
        index_item_id = []
        for i, one_id in enumerate(ratings):
            item_id_index.append([one_id, i])
            index_item_id.append([i, one_id])
        self.item_id_index = dict(item_id_index)
        self.index_item_id = dict(index_item_id)        
        user_id_index = []
        index_user_id = []
        for i, one_id in enumerate(ratings.T):
            user_id_index.append([one_id, i])
            index_user_id.append([i, one_id])
        self.user_id_index = dict(user_id_index)
        self.index_user_id = dict(index_user_id)
        # 다른 변수 초기화
        self.num_users, self.num_items = np.shape(self.R)
        self.K = K
        self.alpha = alpha
        self.beta = beta
        self.iterations = iterations
        self.tolerance = tolerance
        self.verbose = verbose

    # 테스트 셋을 선정하는 메소드 
    def set_test(self, ratings_test):                           # Setting test set
        test_set = []
        for i in range(len(ratings_test)):                      # Selected ratings
            x = self.user_id_index[ratings_test.iloc[i,0]] #test셋에 해당하는 데이터의 인덱스를 받고 0으로 바꿔버림(25000번)     # Getting R indice for the given user_id and movie_id
            y = self.item_id_index[ratings_test.iloc[i,1]]
            z = ratings_test.iloc[i,2]
            test_set.append([x, y, z])
            self.R[x, y] = 0                    # Setting test set ratings to 0
        self.test_set = test_set
        return test_set                         # Return test set

    def test(self):                             # Training 하면서 test set의 정확도를 계산하는 메소드 
        # Initializing user-feature and movie-feature matrix
        self.P = np.random.normal(scale=1./self.K, size=(self.num_users, self.K))
        self.Q = np.random.normal(scale=1./self.K, size=(self.num_items, self.K))

        # Initializing the bias terms
        self.b_u = np.zeros(self.num_users)
        self.b_d = np.zeros(self.num_items)
        self.b = np.mean(self.R[self.R.nonzero()])

        # List of training samples
        rows, columns = self.R.nonzero()
        self.samples = [(i,j, self.R[i,j]) for i, j in zip(rows, columns)]

        # Stochastic gradient descent for given number of iterations
        best_RMSE = 10000
        best_iteration = 0
        training_process = []
        for i in range(self.iterations):
            np.random.shuffle(self.samples)
            self.sgd()
            rmse1 = self.rmse()
            rmse2 = self.test_rmse()
            training_process.append((i, rmse1, rmse2))
            if self.verbose:
                if (i+1) % 10 == 0:
                    print("Iteration: %d ; Train RMSE = %.6f ; Test RMSE = %.6f" % (i+1, rmse1, rmse2))
            if best_RMSE > rmse2:  #제일 좋은 BEST 값을 가지고 있다가, 최저값보다 얼마나 차이나면 끝낼것인가?         # New best record
                best_RMSE = rmse2
                best_iteration = i # 최적의 값이 몇 번째 iter에 나왔는지, 기록하는 것
            elif (rmse2 - best_RMSE) > self.tolerance: # RMSE is increasing over tolerance
                break
        print(best_iteration, best_RMSE)
        return training_process

    # Stochastic gradient descent to get optimized P and Q matrix
    def sgd(self):
        for i, j, r in self.samples:
            prediction = self.get_prediction(i, j)
            e = (r - prediction)

            self.b_u[i] += self.alpha * (e - self.beta * self.b_u[i])
            self.b_d[j] += self.alpha * (e - self.beta * self.b_d[j])

            self.Q[j, :] += self.alpha * (e * self.P[i, :] - self.beta * self.Q[j,:])
            self.P[i, :] += self.alpha * (e * self.Q[j, :] - self.beta * self.P[i,:])

    # Computing mean squared error
    def rmse(self):
        xs, ys = self.R.nonzero()
        self.predictions = []
        self.errors = []
        for x, y in zip(xs, ys):
            prediction = self.get_prediction(x, y)
            self.predictions.append(prediction)
            self.errors.append(self.R[x, y] - prediction)
        self.predictions = np.array(self.predictions)
        self.errors = np.array(self.errors)
        return np.sqrt(np.mean(self.errors**2))

    # Test RMSE 계산하는 method 
    def test_rmse(self):
        error = 0
        for one_set in self.test_set:
            predicted = self.get_prediction(one_set[0], one_set[1])
            error += pow(one_set[2] - predicted, 2)
        return np.sqrt(error/len(self.test_set))

    # Ratings for user i and moive j
    def get_prediction(self, i, j):
        prediction = self.b + self.b_u[i] + self.b_d[j] + self.P[i, :].dot(self.Q[j, :].T)
        return prediction

    # Ratings for user_id and moive_id
    def get_one_prediction(self, user_id, movie_id):
        return self.get_prediction(self.user_id_index[user_id], self.item_id_index[movie_id])

# Testing MF RMSE
R_temp = ratings.pivot(index='user_id', columns='movie_id', values='rating').fillna(0)
mf = NEW_MF(R_temp, K=220, alpha=0.0014, beta=0.075, iterations=350, tolerance=0.0001, verbose=True)
test_set = mf.set_test(ratings_test)
result = mf.test()
print(mf.get_one_prediction(1,2),R_temp.loc[1][2])

K = 220, alpha =0.0014, beta =0.075, iter = 350, tolerance =0.0001 으로 하이퍼 파라미터를 설정하였을 때,

TEST에 대한 RMSE 성능은 약 0.904이 나왔다.

 

이를 통해 MF 모델이 앞선 포스팅의 CF 모델보다 우수한 성능을 보여주는 것을 확인했다 !

추천 모델 RMSE
CF - 사용자 유사도 1.017
CF - 사용자 유사도 + 유사도 순위 1.010
CF - 사용자 유사도 + 유사도 순위 + 사용자 평가 경향 0.943
Matrix Factorization(MF) 0.904