2000字范文,分享全网优秀范文,学习好帮手!
2000字范文 > 推荐系统之协同过滤算法分布式实现(附代码实现)

推荐系统之协同过滤算法分布式实现(附代码实现)

时间:2018-10-25 08:17:07

相关推荐

推荐系统之协同过滤算法分布式实现(附代码实现)

图片来自网络

文章作者:Sunbow高级工程师

编辑整理:Hoh Xil

导读:本文主要介绍协同过滤基础知识,以及分布式实现设计,并最终在 Spark 平台上对同现相似度、Cosine相似度、欧几里得距离相似度、关联规则进行代码实现。

协同过滤推荐介绍

协同过滤推荐是最经典、最常用的推荐算法,该算法通过分析用户的兴趣,在用户群中找到指定用户的相似用户,综合这些相似用户对某一信息的评价,形成系统对该指定用户针对此信息的喜好程度的预测。常见的协同过滤有以下2种。

基于用户的协同过滤(UserCF),就是基于用户对物品的评分得到用户向量(以物品为维度,得到用户向量),计算相似用户,然后再进行推荐,如图1所示。

图1 基于用户的协同过滤示例

基于物品的协同过滤(ItemCF),就是基于物品所对应的用户评分得到物品向量(以用户为维度,得到物品向量),计算相似物品,然后再进行推荐,如图2所示。

图2 基于物品的协同过滤示例

相似度计算公式

1. 同现相似度

物品 A 和物品 B 的同现相似度公式定义如下:

其中分母 N(A) 是喜欢物品 A 的用户数、N(B) 是喜欢物品 B 的用户数,而分子则是同时喜欢物品 A 和物品 B 的用户数。

2.几里得距离

该距离最初用于计算欧几里得空间中两个点的距离,假设 x、y 是 n 维空间中的两个点,它们之间的欧几里得距离如下:

可以看出,当 n=2 时,欧几里得距离就是平面上两个点的距离。

当用欧几里得距离表示相似度时,一般采用以下公式进行转换:距离越小,相似度值越大。

3.皮尔逊相关系数

皮尔逊相关系数一般用于计算两个定距变量间联系的紧密程度,它的取值范围为 [-1,+1]。

是x 和 y 的样品标准偏差。

4.Cosine 相似度

Cosine 相似度被广泛应用于计算文档数据的相似度:

相似度分布式实现

1. 同现相似度分布式设计

对于同现相似度矩阵计算,首先以用户 id 为 key 进行 group by 操作,得到每个用户的所有物品集合,然后对每个用户的物品集合进行 flatMap 操作:对物品集合生成两两物品对(物品,物品),其中只生成上三角部分;之后对(物品,物品)对进行 group by 操作,得到物品与物品的总出现次数,随后再根据同现相似度公式:

w(i,j) = N(i)∩N(j)/sqrt(N(i)×N(j))

其中分子是 i 与 j 的同现频次,分母的 N(i) 是 i 频次、N(j) 是 j 频次。计算物品与物品的相似度,最终得到所有上三角部分的相似度。过程如图3所示。

图3分布式同现相似度矩阵计算过程

2. 欧几里得距离相似度分布式设计

对于欧几里得距离相似度的计算,采用离散计算公式:

d(x, y) = sqrt(∑((x(i)-y(i))×(x(i)-y(i))))

其中 i 只取 x、y 同现的点,未同现的点不参与相似度计算;

sim(x, y) = m / (1+d(x, y))

m 为 x、y 重叠数(同现次数)。

对于 Cosine 相似度的计算,采用离散计算公式,其中 i 只取 x、y 同现的点,未同现的点不参与相似度计算。

之后的欧几里得距离相似度计算如下:对(物品,物品)对进行 group by 操作,得到物品与物品的总出现次数,以及(评分 i-评分 j)平方累加值,并且对累加值开方,最后根据公式 m / (1 + d(x, y)) 计算相似度。过程如图4所示。

图4 分布式欧几里得距离相似度和 Cosine 相似度计算过程

3. Cosine相似度计算分布式设计

Cosine 相似度计算如下:对(物品,物品)对进行 group by 操作,得到 x×y=sum(评分i×评分j),|x|^2=sum(评分i^2),|y|^2=sum(评分j^2),最后根据公式计算相似度。过程如图4所示。

相似度计算代码实现

对同现相似度、Cosine 相似度、欧几里得距离相似度、关联规则进行代码实现,实现语言为 Scala,实现平台为 Spark,其实现代码如下:

1.同现相似度计算

/*** 同现相似度计算* w(i,j) = N(i)∩N(j)/sqrt(N(i)*N(j))* @param user_rdd 用户评分* @param RDD[ItemSimi] 返回物品相似度**/def CooccurrenceSimilarity(user_ds: Dataset[ItemPref]): Dataset[ItemSimi] = {import user_ds.sparkSession.implicits._// 1 (用户:物品) => (用户:(物品集合))val user_ds1 = user_ds.groupBy("userid").agg(collect_set("itemid")). withColumnRenamed ("collect_set(itemid)", "itemid_set")// 2 物品:物品,上三角数据val user_ds2 = user_ds1.flatMap { row =>val itemlist = row.getAs[scala.collection.mutable.WrappedArray[String]](1).toArray. sortedval result = new ArrayBuffer[(String, String, Double)]()for (i <- 0 to itemlist.length - 2) {for (j <- i + 1 to itemlist.length - 1) {result += ((itemlist(i), itemlist(j), 1.0))}}result}.withColumnRenamed("_1", "itemidI").withColumnRenamed("_2", "itemidJ"). withColumnRenamed("_3", "score")// 3 计算物品与物品,上三角,同现频次val user_ds3 = user_ds2.groupBy("itemidI", "itemidJ").agg(sum("score").as("sumIJ"))// 4 计算物品总共出现的频次val user_ds0 = user_ds.withColumn("score", lit(1)).groupBy("itemid").agg(sum ("score").as("score"))// 5 计算同现相似度val user_ds4 = user_ds3.join(user_ds0.withColumnRenamed("itemid", "itemidJ"). withColumnRenamed("score", "sumJ").select("itemidJ", "sumJ"), "itemidJ")val user_ds5 = user_ds4.join(user_ds0.withColumnRenamed("itemid", "itemidI").withColumnRenamed("score", "sumI").select("itemidI", "sumI"), "itemidI")// 根据公式N(i)∩N(j)/sqrt(N(i)*N(j)) 计算val user_ds6 = user_ds5.withColumn("result", col("sumIJ") / sqrt(col("sumI") * col("sumJ")))// 6 上、下三角合并println(s"user_ds6.count(): ${user_ds6.count()}")val user_ds8 = user_ds6. select("itemidI", "itemidJ", "result").union(user_ds6.select($"itemidJ".as("itemidI"), $"itemidI".as("itemidJ"), $"result"))println(s"user_ds8.count(): ${user_ds8.count()}")// 7 结果返回val out = user_ds8.select("itemidI", "itemidJ", "result").map { row =>val itemidI = row.getString(0)val itemidJ = row.getString(1)val similar = row.getDouble(2)ItemSimi(itemidI, itemidJ, similar)}out}

2. Cosine相似度计算

/*** Cosine相似度计算* T(x,y) = ∑x(i)y(i) / sqrt(∑(x(i)*x(i))) * sqrt(∑(y(i)*y(i)))* @param user_rdd 用户评分* @param RDD[ItemSimi] 返回物品相似度**/def CosineSimilarity(user_ds: Dataset[ItemPref]): Dataset[ItemSimi] = {import user_ds.sparkSession.implicits._// 1 数据准备val user_ds1 = user_ds.withColumn("iv", concat_ws(":", $"itemid", $"pref")).groupBy("userid").agg(collect_set("iv")).withColumnRenamed("collect_set(iv)", "itemid_set").select("userid", "itemid_set")// 2 物品:物品,上三角数据val user_ds2 = user_ds1.flatMap { row =>val itemlist = row.getAs[scala.collection.mutable.WrappedArray [String]](1).toArray. sortedval result = new ArrayBuffer[(String, String, Double, Double)]()for (i <- 0 to itemlist.length - 2) {for (j <- i + 1 to itemlist.length - 1) {result += ((itemlist(i).split(":")(0), itemlist(j).split(":")(0), itemlist(i). split(":")(1).toDouble, itemlist(j).split(":")(1).toDouble))}}result}.withColumnRenamed("_1", "itemidI").withColumnRenamed("_2", "itemidJ").withColumnRenamed("_3", "scoreI").withColumnRenamed("_4", "scoreJ")// 3 按照公式计算相似度// x*y = ∑x(i)y(i)// |x|^2 = ∑(x(i)*x(i))// |y|^2 = ∑(y(i)*y(i))// result = x*y / sqrt(|x|^2) * sqrt(|y|^2)val user_ds3 = user_ds2.withColumn("cnt", lit(1)).groupBy("itemidI", "itemidJ").agg(sum(($"scoreI" * $"scoreJ")).as("sum_xy"),sum(($"scoreI" * $"scoreI")).as("sum_x"),sum(($"scoreJ" * $"scoreJ")).as("sum_y")).withColumn("result", $"sum_xy" / (sqrt($"sum_x") * sqrt($"sum_y")))// 4 上、下三角合并val user_ds8 = user_ds3.select("itemidI", "itemidJ", "result").union(user_ds3.select($"itemidJ".as("itemidI"), $"itemidI".as("itemidJ"), $"result"))// 5 结果返回val out = user_ds8.select("itemidI", "itemidJ", "result").map { row =>val itemidI = row.getString(0)val itemidJ = row.getString(1)val similar = row.getDouble(2)ItemSimi(itemidI, itemidJ, similar)}out}

3. 欧几里得距离相似度计算

/*** 欧几里得距离相似度计算* d(x, y) = sqrt(∑((x(i)-y(i)) * (x(i)-y(i))))* sim(x, y) = n / (1 + d(x, y))* @param user_rdd 用户评分* @param RDD[ItemSimi] 返回物品相似度**/def EuclideanDistanceSimilarity(user_ds: Dataset[ItemPref]): Dataset[ItemSimi] = {import user_ds.sparkSession.implicits._// 1 数据准备val user_ds1 = user_ds.withColumn("iv", concat_ws(":", $"itemid", $"pref")).groupBy("userid").agg(collect_set("iv")).withColumnRenamed("collect_set(iv)", "itemid_set").select("userid", "itemid_set")// 2 物品:物品,上三角数据val user_ds2 = user_ds1.flatMap { row =>val itemlist = row.getAs[scala.collection.mutable.WrappedArray[String]] (1).toArray.sortedval result = new ArrayBuffer[(String, String, Double, Double)]()for (i <- 0 to itemlist.length - 2) {for (j <- i + 1 to itemlist.length - 1) {result += ((itemlist(i).split(":")(0), itemlist(j).split(":")(0), itemlist (i).split(":")(1).toDouble, itemlist(j).split(":")(1).toDouble))}}result}.withColumnRenamed("_1", "itemidI").withColumnRenamed("_2", "itemidJ").withColumnRenamed("_3", "scoreI").withColumnRenamed("_4", "scoreJ")// 3 按照公式计算相似度// dist = sqrt(∑((x(i)-y(i)) * (x(i)-y(i))))// cntSum = sum(1)// result = cntSum / (1 + dist)val user_ds3 = user_ds2.withColumn("cnt", lit(1)).groupBy("itemidI", "itemidJ").agg(sqrt(sum(($"scoreI" - $"scoreJ") * ($"scoreI" - $"scoreJ"))).as("dist"), sum($"cnt").as("cntSum")).withColumn("result", $"cntSum" / (lit(1.0) + $"dist"))// 4 上、下三角合并val user_ds8 = user_ds3.select("itemidI", "itemidJ", "result").union(user_ds3.select ($"itemidJ".as("itemidI"), $"itemidI".as("itemidJ"), $"result"))// 5 结果返回val out = user_ds8.select("itemidI", "itemidJ", "result").map { row =>val itemidI = row.getString(0)val itemidJ = row.getString(1)val similar = row.getDouble(2)ItemSimi(itemidI, itemidJ, similar)}out}

4. 关联规则计算

/*** 关联规则计算* 支持度(Support):在所有项集中{X, Y}出现的可能性,即项集中同时含有X和Y的概率P(X U Y)/P(I),I表* 示全部事务* 置信度(Confidence):在先决条件X发生的条件下,关联结果Y发生的概率,即P(X U Y)/P(X)* 提升度(lift):在含有X的条件下同时含有Y的可能性与在没有X的条件下项集中含有Y的可能性之比,即* confidence(X => Y)/P(Y)* @param user_rdd 用户评分* @param RDD[ItemAssociation] 返回物品相似度**/def AssociationRules(user_ds: Dataset[ItemPref]): Dataset[ItemAssociation] = {import user_ds.sparkSession.implicits._// 1 (用户:物品) => (用户:(物品集合))val user_ds1 = user_ds.groupBy("userid").agg(collect_set("itemid")).withColumnRenamed ("collect_set(itemid)", "itemid_set")// 2 物品:物品,上三角数据val user_ds2 = user_ds1.flatMap { row =>val itemlist = row.getAs[WrappedArray[String]](1).toArray.sortedval result = new ArrayBuffer[(String, String, Double)]()for (i <- 0 to itemlist.length - 2) {for (j <- i + 1 to itemlist.length - 1) {result += ((itemlist(i), itemlist(j), 1.0))}}result}.withColumnRenamed("_1", "itemidI").withColumnRenamed("_2", "itemidJ"). withColumnRenamed("_3","score")// 3 计算物品与物品,上三角,同现频次val user_ds3 = user_ds2.groupBy("itemidI", "itemidJ").agg(sum("score").as("sumIJ"))// 4 计算物品总共出现的频次val user_ds0 = user_ds.withColumn("score", lit(1)).groupBy("itemid").agg(sum ("score").as("score"))val user_all = user_ds1.count// 5 计算支持度(Support)val user_ds4 = user_ds3.select("itemidI", "itemidJ", "sumIJ").union(user_ds3.select($"itemidJ".as("itemidI"), $"itemidI".as("itemidJ"), $"sumIJ")).withColumn("support", $"sumIJ" / user_all.toDouble)// user_ds4.orderBy($"support".desc).show// 6 置信度(Confidence)val user_ds5 = user_ds4.join(user_ds0.withColumnRenamed("itemid", "itemidI").withColumnRenamed("score", "sumI"), "itemidI").withColumn("confidence", $"sumIJ" / $"sumI")// user_ds5.orderBy($"confidence".desc).show// 7 提升度(lift)val user_ds6 = user_ds5.join(user_ds0.withColumnRenamed("itemid", "itemidJ").withColumnRenamed("score", "sumJ"), "itemidJ").withColumn("lift", $"confidence" / ($"sumJ" / user_all.toDouble))// user_ds6.orderBy($"lift".desc).show// 计算同现相似度val user_ds8 = user_ds6.withColumn("similar", col("sumIJ") / sqrt(col("sumI") * col("sumJ")))// user_ds8.orderBy($"similar".desc).show// 8 结果返回val out = user_ds8.select("itemidI", "itemidJ", "support", "confidence", "lift", "similar").map { row =>val itemidI = row.getString(0)val itemidJ = row.getString(1)val support = row.getDouble(2)val confidence = row.getDouble(3)val lift = row.getDouble(4)val similar = row.getDouble(5)ItemAssociation(itemidI, itemidJ, support, confidence, lift, similar)}out}

总结

在大规模分布式工程实践中,当样本量级比较大的时候,会导致物品向量或者用户向量维度很高(比如1亿用户,那物品的向量维度会有1亿维),会导致计算性能问题。这里在工程实践中,如何解决这个问题呢,最简单粗暴的方案就是考虑采样方法,进行降维,其中采样包括:对用户进行采样策略和对物品采样策略,最终目的使得计算性能满足所需要的性能要求。当然还有一些高级方案,比如可以借鉴 Facebook 的 Faiss 原理,这里就不具体展开讲解。

「 更多干货,更多收获」

【推荐实践】微博在线机器学习及深度学习实践(附PPT下载链接)

推荐系统系列教程之十二:Facebook是怎么为十亿人互相推荐好友的?

【干货】史上最全个性化推荐技术资料包(附下载链接)

【报告分享】企业数据中台整体介绍及建设方案(附52页pdf下载链接)

【推荐算法】基于用户和产品的协同过滤算法

关注我们

您的「在看」,我的动力????

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。