Problem description

  • Recommendation Systems can be implemented via many ways
  • Two main types:
    1. content-based
    2. collaborative-based
      • model based
      • memory based
  • Will implement the different types of systems in this project

Model based using spark and PySpark

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Recommendations').getOrCreate()

movies = spark.read.csv('movies.csv',inferSchema=True,header=True)
ratings = spark.read.csv('ratings.csv',inferSchema=True,header=True)

Understanding Data

  • Movies dataset

    • MovieId : non-null id
    • Title: categorial data string
    • Genres: categorial data
  • Genres column should probably be its own dataset with a genre id

movieId title genres
1 Toy Story (1995) Adventure, Animati..
  • Ratings dataset

    • userId
    • movieId
    • rating: explicit ratings given by user
    • timestamp
userId movieId rating timestamp
1 1 4.0 964982703
from pyspark.sql.functions import isnan, when, count, col
# Find count for empty, None, Null, Nan with string literals.

movie_ratings.select(
    [count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in movie_ratings.columns]
   ).show()
movieId userId rating timestamp title genres
0 0 0 0 4 0
  • data is very well populated
  • not sparse at all

Model Selection

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ALS model
als = ALS(
         userCol="userId", 
         itemCol="movieId",
         ratingCol="rating", 
         nonnegative = True, 
         implicitPrefs = False,
         coldStartStrategy="drop"
)

(train, test) = ratings.randomSplit([0.8, 0.2], seed = 2020)

param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()


evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))


cv = CrossValidator(estimator=als, 
        estimatorParamMaps=param_grid, 
        evaluator=evaluator, 
        numFolds=5)

#Fit cross validator to the 'train' dataset
model = cv.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)  
  • RMSE:0.871
  • Num models to be tested: 16

  • The lower the RMSE value the better