PySpark
练习回顾
1. Download data set MovieLen data set and PySpark
! wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
! unzip ml-latest-small.zip
! ls
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
2. Start a session for spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
# SparkSession: 创建一个spark的实例
# builder: 构造器,用于添加其他设定
# appName("..."): 实例应用名称
# master("local[*]"): 连接spark 到cluster, local指本地,[*]指任意core 数目
# 如果是local[4]: 连接到本地4个cores
# getOrCreate(): obtain existing instance or create new instance if not exist
spark = SparkSession \
.builder \
.master("local[*]") \
.appName("moive analysis") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
# Test the spark
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3, False)
3. Load data
root = "ml-latest-small/"
# if using databrick
databrick_root = "/FileStore/tables/movielen_small/"
movies_df = spark.read.load(root+"movies.csv", format='csv', header = True)
ratings_df = spark.read.load(root+ "ratings.csv", format='csv', header = True)
links_df = spark.read.load(root+"links.csv", format='csv', header = True)
tags_df = spark.read.load(root+"tags.csv", format='csv', header = True)
# Or
movies_df = spark.read.csv(root+"movies.csv", header = True)
# spark.read.options(...).json(...) or .jdbc(...)
# options: settings for loading, it can be timezone format
Note when creating dataframe:
When converting Pandas DataFrame to PySpark DataFrame, use StructType + createDataFrame + pandas dataframe
When creating pyspark dataframe directly, use createDataFrame + list of dictionary, each element in list = one row in table
from pyspark.sql.types import *
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show()
mySchema = StructType([ StructField("col name1", IntegerType(), True)\
,StructField("col name2", StringType(), True)\
,StructField("col name3", IntegerType(), True)])
dic = {"col name1": [1,2,3], "col name2":['a','b','c'],"col name3":[4,5,6]}
df = pd.DataFrame(dic)
spark_df = spark.createDataFrame(df, mySchema)
spark_df.show()
4. Change data type:
from pyspark.sql.types import IntegerType, FloatType
t_df = tags_df.withColumn('movieId-new', tags_df.movieId.astype('int'))
t_df= t_df.withColumn('userId',t_df['userId'].astype('int'))
t_df= t_df.withColumn('movieId',t_df['movieId'].astype('int'))
t_df= t_df.withColumn('timestamp',t_df['timestamp'].astype('int'))
t_df.collect()[1]
# Or
t_df= t_df.withColumn('movieId',t_df['movieId'].astype(FloatType))
#...
5. Query:
5.1 DataFrame method
from pyspark.sql.functions import col, explode, split
# find the amount of unique movieId
movies_df.select('movieId').distinct().count()
# use agg() aggregation function to find mean rating of each
# movie. agg({column_name:aggregation function name})
# 选择topK 个平均评分最高的element, .limit(k)
ratings_df.groupBy("movieId").agg({'rating':"mean"}) \
.sort("avg(rating)",ascending = False) \
.limit(10) \
.show()
5.2 SQL method
First register SQL tables from Spark data frame
# Register SQL tables in SQL
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")
# Or 使用global temporary view。但是 用global temporary view
# 时 在SQL里面需要 select * from global_temp.movies 要加多global_temp.
# 这一句
#
#
#Note: Difference between Global temp view and temp view
# If you want to have a temporary view that is shared among all sessions
# and keep alive until the Spark application terminates,
# you can create a global temporary view.
movies_df.createOrReplaceGlobalTempView("movies")
Then use spark.sql to run SQL directly
# 使用Spark SQL
# spark.sql(...): spark, the spark session created
user_count = spark.sql("select count(distinct userId) as user_count from ratings")
# %sql
# Show movies that are not rated
movie_not_rated = spark.sql("select distinct movieId, title from movies where movieId not in (select distinct movieId from ratings)")
movie_not_rated.show()
# select distinct movie Id
# distinct 用于对rows 除重,如果有多个column,就考虑每一个row的组合作为一个
# distinct value
movies_df.select(['moviesId']).distinct().show()
# 搜索
from pyspark.sql.functions import col, explode, split
movie_missing_value= movies_df.where(col('title').isNull() | col('genres').isNull())
movie_missing_value.show()
6. User Define Function in PySpark
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
from pyspark.sql import SparkSession
# 创建pyspark APP实例
spark = SparkSession.builder.appName("Spark-Practice").master('local').getOrCreate()
# 读取数据
json_df = spark.read.json("sample_data/anscombe.json")
# define user define function
def square_udf(s):
if s == None:
return 1
return s*s
def diff_square_udf(s1,s2):
if s1 == None or s2==None:
return None
return (s1-s2)**2
# register udf function
square = udf(square_udf, FloatType())
diff_square = udf(diff_square_udf,FloatType())
# apply udf to data
json_df = json_df.withColumn("Z", square(col("X") + col("Y")))
json_df = json_df.withColumn("Diff", diff_square(col("X"), col("Y")))
json_df.show(10)
Last updated