Data Stream mining
Simulating data stream scenario
In data streams, data arrives continuously. If it is not processed immediately or stored, then it is lost forever. In this exercise, we will read data from a csv file as a stream instead of reading it using pd.read_csv
. We will use the oil price dataset from Kaggle.
In the first task, you need to read the csv line-by-line, parse (split the data on the comma), and store it in a pandas dataframe.
import pandas as pd
'''
TODO: read the oil.csv line-by-line and split each line on the comma
to have a new record each time. Add the records one-by-one to the dataframe.
Ignore the records that have no values for the dcoilwtico attribute.
'''
# Using readline()
= open('sample_data/oil.csv', 'r')
file1 = 0
count = []
data
# reading the header of the table
= file1.readline()
line if line:
= line.strip()
line = line.split(',')
columns = pd.DataFrame(columns = columns)
df_oil
while line:
+= 1
count
# Get next line from file
= file1.readline()
line
# if line is empty
# end of file is reached
if not line:
break
= line.strip()
line = line.split(',')
record if (record[1] == ''):
continue
len(df_oil)] = record
df_oil.loc[
file1.close() df_oil
Reservoir Sampling
Reservoir sampling is a randomized algorithm for selecting \(k\) samples from a list of \(n\) items. The number of selected values \(k\) is usually much smaller than the total number of elements in the list \(n\). In data streams, the total number of elements is unknow. Typically, \(n\) is large enough such that the data cannot fit into active memory.
We will assume the oil price as a stream of \(n\) records, and we need to to select \(k\) records where \(1 \le k \le n\). We will also read the records from the csv file one-by-one and ignore the lines with no value for the dcoilwtico
attribute.
'''
TODO: read the oil.csv line-by-line and split each line on the comma
to have a new record each time. Create a reservoir with size of k = 10
and store only 10 elements at any given time moment. After processing a 100
elements, display the content of the reservoir.
'''
import random
def printReservoir(reservoir):
for i in reservoir:
print(i);
print(); # print new line
= open('sample_data/oil.csv', 'r')
file1 = 10
k = 0
count = []
data # reading the header of the table
= file1.readline()
line
while line:
# Get next line from file
= file1.readline()
line
# if line is empty
# end of file is reached
if not line:
break
= line.strip()
line = line.split(',')
record if (record[1] == ''):
continue
if (count < k):
data.append(record)else:
= random.randrange(count + 1)
j if (j < k):
= record
data[j] += 1
count if (count % 100 == 0):
print('After processing (', count, ') elements from the stream:')
printReservoir(data)
file1.close()print('After processing the whole stream:')
data
Windowing Models
Another approach for processing data streams is to look at a fixed length (Window) of the last \(k\) elements from the stream. We will combine the windowing model with estimating the probability density function of the values to learn the data distribution. We will use the kernel density estimation to estimate the density of the oil price. For more information about using the kernel density estimator can be found in the scikit learn documentation about 1D Kernel Density Estimation
= [2, 3, 5, 7]
prime_numbers
prime_numbers.pop() prime_numbers
'''
TODO: read the oil.csv line-by-line and split each line on the comma
to have a new record each time. Define a 'step' as an integer variable
such that 0 < step <= window_size. After receiving 'step' samples from the stream,
estimate the density and plot the figure.
Note:
step = 1 ==> sliding window, (would be expensive unless using online upddate of the density function)
1 < step < window_size ==> Hopping Window
step = window_size ==> Tumbling Window
If you will use step < 100, don't plot the density every 'step' (there will be a lot of figures)
For the bandwidth: Don't use a fixed value such as 0.2 or 0.5.
Instead, you can use the normal rule for computing the bandwidth. That is:
If you are using the kernel = "gaussian", then bandwidth = 1.06 * s * (len(data)^0.2)
where s is the the standard deviation of the data.
If kernel = "epanechnikov", then bandwidth = 2.345 * s * (len(data)^0.2).
You may test using 1.06 with "epanechnikov" kernel and look at the differences in the figure.
'''
import numpy as np
from sklearn.neighbors import KernelDensity
import matplotlib.pyplot as plt
def estimateDensity(data):
= "gaussian"
kernel = np.std(data)
s = 1.06 * s * (len(data)**0.2)
bandwidth
= min(data) - 3 * s
mi = max(data) + 3 * s
ma = np.linspace(mi, ma, 100)
X_plot # print(X_plot)
= KernelDensity(kernel=kernel, bandwidth=0.5).fit(data)
kde = kde.score_samples(X_plot)
log_dens
= plt.subplots()
fig, ax
ax.plot(0],
X_plot[:,
np.exp(log_dens),= "cornflowerblue",
color = 2,
linewidth = "-"
linestyle # label = "kernel = '{0}'".format(kernel),
)
plt.show()
= open('sample_data/oil.csv', 'r')
file1 = 500
Window_size = 100
step = 0
count = []
data # reading the header of the table
= file1.readline()
line
while line:
# Get next line from file
= file1.readline()
line
# if line is empty
# end of file is reached
if not line:
break
= line.strip()
line = line.split(',')
record if (record[1] == ''):
continue
if (count < Window_size):
float(record[1]))
data.append(else:
float(record[1]))
data.append(0)
data.pop(+= 1
count if (count % 100 == 0) and count >= Window_size:
print('After processing (', count, ') elements from the stream:')
print('Number of elements = ', len(data))
= np.array(data).reshape(-1, 1)
X
estimateDensity(X) file1.close()
Creating Hashtable for the Data Stream
Hashing is an important technique for filtering data streams. In this exercise, we will work on simple techniques for hashing the data streams. Again, we will not use pd.read_csv
to read the csv file of the oil dataset. Instead, we will read it line-by-line and parse each line, convert the values of dcoilwtico
from strings
to float
and process the records one-by-one. We will ignore the records that have empty string in the dcoilwtico
attribute.
First, we will create a numpy array of size (array_size = 4096) and type Boolean to hash the values in the column dcoilwtico
. We will initialize the Boolean array to false (each element will be initialized to False
). When an element is hashed to an index \(i\), we store True
at that index of the array.
Second, we will use the hashing function \(h(x) = int(x*100) \%\ array\_size\) to hash the elements of dcoilwtico
, where the \(\%\) represents the modulus function in Python.
Finally, we will keep track of the number of collisions, which represent the cases when as element is hashed to the an index that has been already set to True. Compute the ratio of the number of collisions to the total number of elements in the attribute dcoilwtico
.
Why do we have a large ratio of collisions?
'''
TODO:
1. create a boolean array of size 4096 (also test the case of 1024)
2. create a dictionary to keep the list of elements that are assigned to each hash key.
3. keep track of the total number of records with value in the dcoilwtico attribute
4. keep track of the number of collisions
5. display the number and ratio of collisions
6. record the time for performing this task.
'''
import time
= time.time()
start
= 0
count = 4096 # try a size of 1024
array_size = np.array([0]*array_size, dtype = 'bool')
hashArray = dict()
hashTable = 0
collisions
= open('sample_data/oil.csv', 'r')
file1 # read the header of the table but we will not use it
= file1.readline()
line
while line:
# Get next line from file
= file1.readline()
line
# if line is empty
# end of file is reached
if not line:
break
= line.strip()
line = line.split(',')
record if (record[1] == ''):
continue
+= 1
count = int(float(record[1]) * 100) % array_size
h if (hashArray[h] == True):
+= 1
collisions float(record[1]))
hashTable[h].append(else:
= True
hashArray[h] = [float(record[1])]
hashTable[h]
file1.close()= time.time()
end
print('Number of collisions = ', collisions, 'Ratio = ', collisions/count)
print('Elapsed time = ', end - start, ' seconds')
'''
TODO: check the lists in the used dictionary for storing the
list of elements that are assigned to each hash key.
Find two different elements that have been assigned to the same index and
apply the hashing function on these elements to confirm that they are hashed
to the same key.
'''
for key in hashTable:
# display only the lists with more than one element in the dictionary
if(len(hashTable[key]) > 1):
print(hashTable[key])
'''
We found that elements [73.7, 32.74] are assigned to the same key.
Let us hash them again to confirm.
'''
= [73.7, 32.74]
m print('Array size = ', array_size)
for i in m:
= int(i * 100) % array_size
h print ("h({}) = {}".format(i, h))
'''
A simpler way to keep track with the distinct elements in a data stream is to define
a set to store those elements. Upon the arrival of a new element from the stream,
check if it is in the set or not. If not, add the element to the set.
However, as discussed in the lecture, this is impractical solution as this set
might become extremely large. We can test this solution on the oil dataset
since it is a small dataset. We will also read the data in a streaming way
and record the run time to compare it with the time when using the Boolean array
to hash the table.
'''
import time
= time.time()
start
= 0
count = set()
distinct_value
= open('sample_data/oil.csv', 'r')
file1 # reading the header of the table
= file1.readline()
line
while line:
# Get next line from file
= file1.readline()
line
# if line is empty
# end of file is reached
if not line:
break
= line.strip()
line = line.split(',')
record if (record[1] == ''):
continue
float(record[1]))
distinct_value.add(
file1.close()= time.time()
end
print('Elapsed time = ', end - start, ' seconds')
We can see that the time for using the set structure is faster than using the Boolean array. Moreover, for the set we do not have the collision problem. However, the set might be extremely large, which makes it an impractical solution. We can use bitarray
to speed up the hashing process and reduce the required space. Instead of using one byte for each Boolean value, we will use only one bit in this case. You will need to install bitarray
library. Check this website for more information about using bitarray
.
%pip install bitarray
import time
from bitarray import bitarray
= time.time()
start
= 0
count = 4096 # try a size of 1024
array_size = bitarray(4096)
hashArray 0)
hashArray.setall(= dict()
hashTable = 0
collisions
= open('sample_data/oil.csv', 'r')
file1 # reading the header of the table
= file1.readline()
line
while line:
# Get next line from file
= file1.readline()
line
# if line is empty
# end of file is reached
if not line:
break
= line.strip()
line = line.split(',')
record if (record[1] == ''):
continue
+= 1
count = int(float(record[1]) * 100) % array_size
h if (hashArray[h] == True):
+= 1
collisions float(record[1]))
hashTable[h].append(else:
= True
hashArray[h] = [float(record[1])]
hashTable[h]
file1.close()= time.time()
end
print('Number of collisions = ', collisions, 'Ratio = ', collisions/count)
print('Elapsed time = ', end - start, ' seconds')
Classifying Data Streams
Online processing of data streams is important as the data cannot be stored in the memory so we need to process one element at a time. Moreover, we need to process each element only once. In the case of machine learning, we need to train the models using one sample at a time. This is completely opposite to the traditional way of training the ML models where the models are trained on a whole training set at the same time.
An online model is therefore a stateful, dynamic object. It keeps learning and doesn’t have to revisit past data. For this exercise, you may use the river libray and run some of the online examples such as Binary classification and HoeffdingTreeClassifier. You may check a set performance measures that they defined and use them (e.g. ROCAUC, Accuracy).
!pip install river
from river.datasets import synth
from river import evaluate
from river import metrics
from river import tree
= synth.Agrawal(classification_function=0, seed=42)
gen = iter(gen.take(1000))
dataset
= tree.HoeffdingTreeClassifier(
model =100,
grace_period=1e-5,
delta=['elevel', 'car', 'zipcode']
nominal_attributes
)
= metrics.Accuracy()
metric
evaluate.progressive_val_score(dataset, model, metric)
= synth.Agrawal(classification_function=0, seed=42)
gen = iter(gen.take(1000))
dataset
= tree.HoeffdingTreeClassifier(
model =100,
grace_period=1e-5,
delta=['elevel', 'car', 'zipcode']
nominal_attributes
)
= metrics.ROCAUC()
metric
evaluate.progressive_val_score(dataset, model, metric)print(metric)
= synth.Agrawal(classification_function=0, seed=42)
gen = iter(gen.take(1000))
dataset
= tree.HoeffdingTreeClassifier(
model =100,
grace_period=1e-5,
delta=['elevel', 'car', 'zipcode']
nominal_attributes
)
= metrics.BalancedAccuracy()
metric
evaluate.progressive_val_score(dataset, model, metric)print(metric)
= synth.Agrawal(classification_function=0, seed=42)
gen = iter(gen.take(1000))
dataset
= tree.HoeffdingTreeClassifier(
model =100,
grace_period=1e-5,
delta=['elevel', 'car', 'zipcode']
nominal_attributes
)
= metrics.Precision()
metric
evaluate.progressive_val_score(dataset, model, metric)print(metric)