LeetCode-Notes
  • Introduction
  • Records of Practice
  • 关于Github 不支持密码问题
  • 面试题
    • 搜索广告
    • 多模态大模型
    • 刷题记录
  • 算法代码实现
  • Python
    • Python 笔记
  • Spark
    • PySpark
    • Spark Issues
    • Spark调优笔记
  • FeatureEngineering
    • Feature Cleaning
    • Feature Selection
    • Feature Transformation
    • Feature Crossing
  • Recommendation Algorithm
    • Recall-and-PreRank
      • Non-Negative Matrix Fatorization(NMF)
      • Fatorization Machine(FM)
      • User-base/Item-base实现
      • 多路召回实现
    • Ranking
      • NeuralFM
      • DeepFM
      • Deep&Cross network (DCN)
    • DeepLearning-Basic
      • Attention
      • Dropout
      • Batch Norm
  • Machine Learning
    • XGBoost
    • Cross Entropy Loss
    • Other models
  • Graph Neural Network
    • GNN-1-Basic
  • Big Data
    • Reservoir Sampling
  • SQL
    • SQL and PySpark functions
    • Query Film Infomation
    • Create, Insert and Alter Actor Table
    • Manage Employment Data
    • Manage Employment Data -2
  • DataStructure
    • Searching
      • Find power
      • 2 Sum All Pair II
      • Two Sum
      • Search in Rotate Array
      • Search In Shifted Sorted Array II
      • Search in 2D array
      • Three Sum with duplicated values
      • Median of Two Sorted Arrays
    • Array
      • Longest Consecutive Subarray
      • Merge Two Array in-place
      • Trapping water
      • Rotate matrix
    • Sorting
      • Merge intervals
      • 排序
      • 最小的k个数
      • Find TopK largest- QuickSelect快速选择 method
      • MergeSort Linkedlist
      • 第K大元素
    • LinkedList
      • Reverse LinkedList I
      • Reverse K-group linked list
      • Detect Start of Cycle
      • HasCycle
      • DetectCycle II
      • 链表的共同节点
      • 链表中倒数第k个节点
      • 删除链表倒数第k个节点
      • 合并两个链表
      • 在排序数组中查找元素的第一个和最后一个位置
      • 删除链表里面重复的元素-1
    • Tree
      • Find Tree height (general iteration method)
      • Check BST and Check CompleteTree
      • ZigZag Order traversal
      • Binary Tree diameter I
      • Maximum Path Sum Binary Tree
      • Maximum Path Sum Binary Tree II
      • Binary Tree Path Sum To Target III
      • Tree diameter 树的直径II
      • Tree ReConstruction
      • Check if B is Subtree of A
      • The Kth smallest in Binary Search Tree
      • 打印Tree的右视图
      • 二叉搜索树的后序遍历序列
      • 重建二叉树
      • 判断二叉树是否对称
      • Path Sum to Target in Binary Tree
      • Tree-PreOrder-InOrder-PostOrder
    • Heap&Queue
      • Top-K smallest
      • 滑动窗口最大值
      • Find the K-Largest
    • 合并k个已排序的链表
    • String
      • Reverse String
      • 最长不含重复字符的子字符串
      • 最长回文串
      • 最长回文子序列-DP
    • DFS/BFS
      • Number of island
      • Number of Provinces
      • All Permutations of Subsets without duplication
      • All Permutations of Subsets with duplication
      • Combinations Of Coins
      • All Subset I (without fixing size of subset, without order, without duplication)
      • All Subset of K size without duplication II
      • All Subset of K size III (with duplication without considering order)
      • All Permutation II (with duplication and consider order)
      • Factor Combination-质数分解
    • DynamicProgramming
      • DP-解题过程
      • Find Continuous Sequence Sum to Target
      • 1800. Maximum Ascending Subarray Sum
      • NC91 最长上升子序列
      • 查找string的编码方式个数
      • Maximum Product
      • Longest Common Substring
      • Longest Common Substring-II
      • minEditCost
      • Backpack I
      • Array Hopper I
      • Minimum distance between strings
      • 最大正方形
  • Big Data Algorithms
    • Big Data Processing Algorithms
      • Reservior Sampling
      • Shuffle
      • MapReduce
      • Bloom Filter
      • BitMap
      • Heap For Big Data
Powered by GitBook
On this page

Was this helpful?

  1. Spark

PySpark

练习回顾

1. Download data set MovieLen data set and PySpark

! wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
! unzip ml-latest-small.zip
! ls
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

2. Start a session for spark

import findspark
findspark.init()
from pyspark.sql import SparkSession
# SparkSession: 创建一个spark的实例
# builder: 构造器,用于添加其他设定
# appName("..."): 实例应用名称
# master("local[*]"): 连接spark 到cluster, local指本地,[*]指任意core 数目
# 如果是local[4]: 连接到本地4个cores
# getOrCreate(): obtain existing instance or create new instance if not exist
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
# Test the spark
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3, False)

3. Load data

root = "ml-latest-small/"
# if using databrick
databrick_root = "/FileStore/tables/movielen_small/"
movies_df = spark.read.load(root+"movies.csv", format='csv', header = True)
ratings_df = spark.read.load(root+ "ratings.csv", format='csv', header = True)
links_df = spark.read.load(root+"links.csv", format='csv', header = True)
tags_df = spark.read.load(root+"tags.csv", format='csv', header = True)


# Or 
movies_df = spark.read.csv(root+"movies.csv", header = True)
# spark.read.options(...).json(...) or .jdbc(...) 
# options: settings for loading, it can be timezone format

Note when creating dataframe:

  1. When converting Pandas DataFrame to PySpark DataFrame, use StructType + createDataFrame + pandas dataframe

  2. When creating pyspark dataframe directly, use createDataFrame + list of dictionary, each element in list = one row in table

from pyspark.sql.types import *

df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show()

mySchema = StructType([ StructField("col name1", IntegerType(), True)\
                    ,StructField("col name2", StringType(), True)\
                    ,StructField("col name3", IntegerType(), True)])
dic = {"col name1": [1,2,3], "col name2":['a','b','c'],"col name3":[4,5,6]}
df = pd.DataFrame(dic)
spark_df = spark.createDataFrame(df, mySchema)
spark_df.show()

4. Change data type:

from pyspark.sql.types import IntegerType, FloatType
t_df = tags_df.withColumn('movieId-new', tags_df.movieId.astype('int'))
t_df= t_df.withColumn('userId',t_df['userId'].astype('int'))
t_df= t_df.withColumn('movieId',t_df['movieId'].astype('int'))
t_df= t_df.withColumn('timestamp',t_df['timestamp'].astype('int'))
t_df.collect()[1]

# Or
t_df= t_df.withColumn('movieId',t_df['movieId'].astype(FloatType))
#...

5. Query:

5.1 DataFrame method

from pyspark.sql.functions import col, explode, split
# find the amount of unique movieId
movies_df.select('movieId').distinct().count()

# use agg() aggregation function to find mean rating of each
# movie.  agg({column_name:aggregation function name})
# 选择topK 个平均评分最高的element, .limit(k)
ratings_df.groupBy("movieId").agg({'rating':"mean"}) \
    .sort("avg(rating)",ascending = False) \
    .limit(10) \
    .show()
    
    
    

5.2 SQL method

First register SQL tables from Spark data frame

# Register SQL tables in SQL
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

# Or 使用global temporary view。但是 用global temporary view
# 时 在SQL里面需要 select * from global_temp.movies 要加多global_temp.
# 这一句
#
#
#Note: Difference between Global temp view and temp view
# If you want to have a temporary view that is shared among all sessions 
# and keep alive until the Spark application terminates, 
# you can create a global temporary view.
movies_df.createOrReplaceGlobalTempView("movies")

Then use spark.sql to run SQL directly

# 使用Spark SQL
# spark.sql(...): spark, the spark session created
user_count = spark.sql("select count(distinct userId) as user_count from ratings")

# %sql 
# Show movies that are not rated
movie_not_rated = spark.sql("select distinct movieId, title from movies where movieId not in (select distinct movieId from ratings)")
movie_not_rated.show()

# select distinct movie Id
# distinct 用于对rows 除重,如果有多个column,就考虑每一个row的组合作为一个
# distinct value
movies_df.select(['moviesId']).distinct().show()


# 搜索
from pyspark.sql.functions import col, explode, split
movie_missing_value= movies_df.where(col('title').isNull() | col('genres').isNull())
movie_missing_value.show()

6. User Define Function in PySpark

from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType

from pyspark.sql import SparkSession
# 创建pyspark APP实例
spark = SparkSession.builder.appName("Spark-Practice").master('local').getOrCreate() 
# 读取数据
json_df = spark.read.json("sample_data/anscombe.json")

# define user define function
def square_udf(s):
  if s == None:
    return 1
  return s*s

def diff_square_udf(s1,s2):
  if s1 == None or s2==None:
    return None
  return (s1-s2)**2
# register udf function
square = udf(square_udf, FloatType())
diff_square = udf(diff_square_udf,FloatType())
# apply udf to data
json_df = json_df.withColumn("Z", square(col("X") + col("Y")))
json_df = json_df.withColumn("Diff", diff_square(col("X"), col("Y")))
json_df.show(10)

PreviousPython 笔记NextSpark Issues

Last updated 3 years ago

Was this helpful?