LeetCode-Notes
  • Introduction
  • Records of Practice
  • 关于Github 不支持密码问题
  • 面试题
    • 搜索广告
    • 多模态大模型
    • 刷题记录
  • 算法代码实现
  • Python
    • Python 笔记
  • Spark
    • PySpark
    • Spark Issues
    • Spark调优笔记
  • FeatureEngineering
    • Feature Cleaning
    • Feature Selection
    • Feature Transformation
    • Feature Crossing
  • Recommendation Algorithm
    • Recall-and-PreRank
      • Non-Negative Matrix Fatorization(NMF)
      • Fatorization Machine(FM)
      • User-base/Item-base实现
      • 多路召回实现
    • Ranking
      • NeuralFM
      • DeepFM
      • Deep&Cross network (DCN)
    • DeepLearning-Basic
      • Attention
      • Dropout
      • Batch Norm
  • Machine Learning
    • XGBoost
    • Cross Entropy Loss
    • Other models
  • Graph Neural Network
    • GNN-1-Basic
  • Big Data
    • Reservoir Sampling
  • SQL
    • SQL and PySpark functions
    • Query Film Infomation
    • Create, Insert and Alter Actor Table
    • Manage Employment Data
    • Manage Employment Data -2
  • DataStructure
    • Searching
      • Find power
      • 2 Sum All Pair II
      • Two Sum
      • Search in Rotate Array
      • Search In Shifted Sorted Array II
      • Search in 2D array
      • Three Sum with duplicated values
      • Median of Two Sorted Arrays
    • Array
      • Longest Consecutive Subarray
      • Merge Two Array in-place
      • Trapping water
      • Rotate matrix
    • Sorting
      • Merge intervals
      • 排序
      • 最小的k个数
      • Find TopK largest- QuickSelect快速选择 method
      • MergeSort Linkedlist
      • 第K大元素
    • LinkedList
      • Reverse LinkedList I
      • Reverse K-group linked list
      • Detect Start of Cycle
      • HasCycle
      • DetectCycle II
      • 链表的共同节点
      • 链表中倒数第k个节点
      • 删除链表倒数第k个节点
      • 合并两个链表
      • 在排序数组中查找元素的第一个和最后一个位置
      • 删除链表里面重复的元素-1
    • Tree
      • Find Tree height (general iteration method)
      • Check BST and Check CompleteTree
      • ZigZag Order traversal
      • Binary Tree diameter I
      • Maximum Path Sum Binary Tree
      • Maximum Path Sum Binary Tree II
      • Binary Tree Path Sum To Target III
      • Tree diameter 树的直径II
      • Tree ReConstruction
      • Check if B is Subtree of A
      • The Kth smallest in Binary Search Tree
      • 打印Tree的右视图
      • 二叉搜索树的后序遍历序列
      • 重建二叉树
      • 判断二叉树是否对称
      • Path Sum to Target in Binary Tree
      • Tree-PreOrder-InOrder-PostOrder
    • Heap&Queue
      • Top-K smallest
      • 滑动窗口最大值
      • Find the K-Largest
    • 合并k个已排序的链表
    • String
      • Reverse String
      • 最长不含重复字符的子字符串
      • 最长回文串
      • 最长回文子序列-DP
    • DFS/BFS
      • Number of island
      • Number of Provinces
      • All Permutations of Subsets without duplication
      • All Permutations of Subsets with duplication
      • Combinations Of Coins
      • All Subset I (without fixing size of subset, without order, without duplication)
      • All Subset of K size without duplication II
      • All Subset of K size III (with duplication without considering order)
      • All Permutation II (with duplication and consider order)
      • Factor Combination-质数分解
    • DynamicProgramming
      • DP-解题过程
      • Find Continuous Sequence Sum to Target
      • 1800. Maximum Ascending Subarray Sum
      • NC91 最长上升子序列
      • 查找string的编码方式个数
      • Maximum Product
      • Longest Common Substring
      • Longest Common Substring-II
      • minEditCost
      • Backpack I
      • Array Hopper I
      • Minimum distance between strings
      • 最大正方形
  • Big Data Algorithms
    • Big Data Processing Algorithms
      • Reservior Sampling
      • Shuffle
      • MapReduce
      • Bloom Filter
      • BitMap
      • Heap For Big Data
Powered by GitBook
On this page
  • Problem to solve
  • Idea
  • Coding
  • Reference

Was this helpful?

  1. Big Data Algorithms
  2. Big Data Processing Algorithms

Reservior Sampling

Big Data; Sampling;

Problem to solve

在Big Data 的Random Sampling里面, 我们遇到的问题是数据量太大以至于不能知道所有数据sample的总量,并且不能直接把所有数据都存放到一个固定的buffer里面进行直接随机采样。

比如假设我们有一个容量为k的 Reservoir 储水池,我们要把n个数随机地采样选k个到储水池里面。这个问题我们可以这么看, 我们有一个容量为n的水槽, 这个水槽里面的容量为k的部分是用作储水池保留sample。 一个新的sample到来时,站在sample的角度,这个sample X 在采样时被随机放到储水池的概率 / 被保留的概率 = P(X 被保留) = k/n

问题就是这个n我们是不知道的,不能直接用来做random sampling。 我们只知道目前进来的sample的个数即 k+ i (在储水池满了之后)。 因此,我们在Reservior Sampling的思路就是我们要在新样本 X 不断进来时我们要一直保持 储水池中sample X的保留概率 P(新样本X 被保留) = k/(k+i ), 直到 k+i =n 为止,我们就得到 P(新样本X在采样时被保留) = k/n

Idea

Resevior Sampling 的做法步骤如下:

  1. 定义Reservior 容量为k, 先把data stream里面前k 个值填满 buffer

  2. 记录目前已经到来的sample个数 k+ i

  3. 随机在 range [1, k+i] 里面选取一个index值

  4. 如果index <=k , 即储水池里面的对应index值的sample要被新进来的sample取代,所以 reservior的第k+i个sample 要被 保留的概率 = k/(k+i)

  5. Repeat 直到所有数据 (n个数)被遍历完

Resevior Sampling 的每个sample被保留的概率是一样的proof:

  1. 定义reservoir储水池有 容量k, 储水池里第j个被保留的sample 为 xj. 而新进来的第i个sample 为 xi, 而i> k. 那么 在第i个sample xi到来时 P(xi 被保留) = k/i

  2. P(xj 被xi取代) = P(xj 被选中) * P(xi 被保留) = 1/k * k/i = 1/i

  3. 同理在第 i+1 个sample 到来时, 第i个sample被 第i个sample取代的概率 P(xi 被 xi+1取代) = 1/ k * k/(i+1) = 1/(i+1)

  4. 那么在第 i个 sample之前被保留的情况下 在第i+1 个sample 到来时 P(xi 被保留) = P(第i个sample之前被保留) * P(第i个sample不被取代) = k/i *(1- 1/(i+1)) = k/(i+1)

  5. 以此类推, 当第n个sample到来时,即所有n个sample都被扫描后,第i个sample xi 保留的概率为 P(xi 被保留) = k/i * (1 - 1/(i+1)) * (1- 1/(i+2)) * ...(1- 1/n) = k/n

Coding

ReservoirSampling with size of k

import random
class ReservoirSampler():
    def __init__(self, k =5):
        self.arr = []
        self.k = k
        self.count = 0
    def sample(self, val):
        self.count += 1
        # random pick jth element in reservoir
        ind = random.randint(0, self.count)
        if len(self.arr) < self.k:
            # insert value to reservoir when it is not full
            self.arr.append(val)
        elif ind < self.k:
            #Keep the new sample i with possibility of k/i
            # by replacing jth element in reservoir
            self.arr[ind] = val
s = ReservoirSampler(10)
import numpy as np
data = np.random.rand(1000).tolist()
for i, v in enumerate(data):
    s.sample(v)
    if i>= s.k:
        print(s.arr)

Return random largest value:

In data stream, there could be multiple duplicated max value. This function is to return the index of one of those max values randomly.


import random
class RandomMax():
    def __init__(self, ):
        self.sample = 0
        self.index = 0
        self.max_v = -float("inf")
        self.count = 1
    def sampleMax(self, val):
        # record the index of current input value
        self.index += 1
        if val > self.max_v:
            #set current max value to the new value
            self.max_v = val
            self.count = 1 # only 1 max value
            self.sample = self.index # index of max value
        elif val == self.max_v:
            # randomly pick one of max_values and return its index 
            self.count += 1 
            idx = random.randint(0, self,index)
            # pick one of the max values with equal possibility 1/count,
            # count = the amount of the same max values
            if idx ==0:
                self.sample = self.index
        return self.sample
    
s = RandomMax()
import numpy as np
data = np.random.rand(1000).tolist()
for i, v in enumerate(data):
    print(s.sampleMax(v))
    


Reference

PreviousBig Data Processing AlgorithmsNextShuffle

Last updated 3 years ago

Was this helpful?

[1]

[2]

https://en.wikipedia.org/wiki/Reservoir_sampling
https://www.cnblogs.com/ECJTUACM-873284962/p/6910842.html#_label6