其乐融融的IT技术小站

如何使用XGBoost和InluxDB进行时间序列预测

译者 | 李睿

审校 | 孙淑娟

XGBoost是一个开源的机器学习库,它实现了优化的分布式梯度增强算法。XGBoost使用并行处理实现快速性能,很好地处理缺失值,在小型数据集上执行良好,并防止过拟合。所有这些优点使XGBoost成为回归问题(例如预测)的一种流行解决方案。

预测是各种业务目标的关键任务,例如预测分析、预测维护、产品规划、预算等。许多预测或预测问题都涉及到时间序列数据。这使得XGBoost成为开源时间序列数据库InfluxDB的出色伙伴。

本教程将学习如何使用XGBoost的Python包预测来自InfluxDB时间序列数据库的数据。还将使用InfluxDB Python客户端库从InfluxDB查询数据,并将数据转换为Pandas DataFrame,以便更容易地使用时间序列数据,然后再做预测。此外,还将更详细地讨论XGBoost的优点。

一、要求

本教程是在通过Homebrew安装Python 3的macOS系统上执行的。建议设置额外的工具,如virtualenv、pyenv或conda-env,以简化Python和客户端安装。否则,其全部要求如下:

  • influxdb-client=1.30.0
  • pandas = 1.4.3
  • xgboost>=1.7.3
  • influxdb-client>=1.30.0
  • pandas>=1.4.3
  • matplotlib>=3.5.2
  • sklearn>=1.1.1

本教程还假设有一个免费的层InfluxDB云帐户,并且已经创建了一个存储桶和一个令牌,可以将存储桶视为数据库或InfluxDB中数据组织的最高层次结构。在本教程中,将创建一个名为NOAA的存储桶。

二、决策树、随机森林和梯度增强

为了理解XGBoost是什么,必须理解决策树、随机森林和梯度增强。决策树是一种有监督的学习方法,由一系列特征测试组成。每个节点都是一个测试,所有节点都被组织在一个流程图结构中。分支表示最终决定将哪个叶标签或类标签分配给输入数据的条件。

图片

机器学习中的决策树用于确定明天是否会下雨。经过编辑以显示决策树的组件:叶、分支和节点。

决策树、随机森林和梯度增强背后的指导原则是,多个“弱学习者”或分类器共同做出强大的预测。

随机森林包含多个决策树。决策树中的每个节点都被认为是弱学习者,随机森林中的每个决策树被认为是随机森林模型中许多弱学习者中的一个。通常情况下,所有的数据都被随机划分为子集,并通过不同的决策树进行传递。

使用决策树和随机森林的梯度增强是相似的,但它们的结构方式不同。梯度增强树也包含决策树森林,但这些决策树是额外构建的,所有数据都经过决策树集合。梯度增强树可能包含一组分类树或回归树,分类树用于离散值(例如猫或狗)。回归树用于连续值(例如0到100)。

三、什么是XGBoost?  

梯度增强是一种用于分类和预测的机器学习算法。XGBoost只是一个极端类型的梯度增强。它的极端之处在于,可以通过并行处理的能力更有效地执行梯度增强。XGBoost文档中的下图说明了如何使用梯度增强来预测某人是否会喜欢一款电子游戏。

图片

采用两棵决策树被用来决定某人是否可能喜欢一款电子游戏。将两棵树的叶子得分相加,以确定哪一个人最有可能喜欢这款电子游戏。

XGBoost的一些优点:

  • 相对容易理解。
  • 适用于具有很少特征的小型、结构化和规则数据。

XGBoost的一些缺点:

  • 易于过拟合,对异常值敏感。在XGBoost中使用时间序列数据的物化视图进行预测可能是一个好主意。
  • 在稀疏或无监督数据上表现不佳。

四、使用XGBoost进行时间序列预测

在这里使用的是空气传感器样本数据集,它是由InfluxDB提供的。该数据集包含来自多个传感器的温度数据。正在为单个传感器创建温度预测,其数据是这样的:

图片

使用以下Flux代码导入单个时间序列的数据集和过滤器。(Flux是InfluxDB的查询语言)

import "join"

import "influxdata/influxdb/sample"

//dataset is regular time series at 10 second intervals

data = sample.data(set: "airSensor")

|> filter(fn: (r) => r._field == "temperature" and r.sensor_id == "TLM0100")

随机森林和梯度增强可以用于时间序列预测,但它们需要将数据转换为监督学习。这意味着必须以滑动窗口方法或缓慢移动方法将数据向前移动,以将时间序列数据转换为监督学习集,也可以用Flux准备数据。在理想情况下,应该首先执行一些自相关分析,以确定要使用的最佳方法。为简洁起见,将使用以下Flux代码按一个常规时间间隔移动数据。

import "join"

import "influxdata/influxdb/sample"

data = sample.data(set: "airSensor")

|> filter(fn: (r) => r._field == "temperature" and r.sensor_id == "TLM0100")

shiftedData = data

|> timeShift(duration: 10s , columns: ["_time"] )

join.time(left: data, right: shiftedData, as: (l, r) => ({l with data: l._value, shiftedData: r._value}))

|> drop(columns: ["_measurement", "_time", "_value", "sensor_id", "_field"])

左右滑动查看完整代码

图片

如果想向模型输入中添加额外的滞后数据,可以改为遵循以下Flux逻辑。

import "experimental"

import "influxdata/influxdb/sample"

data = sample.data(set: "airSensor")

|> filter(fn: (r) => r._field == "temperature" and r.sensor_id == "TLM0100")

shiftedData1 = data

|> timeShift(duration: 10s , columns: ["_time"] )

|> set(key: "shift" , value: "1" )

shiftedData2 = data

|> timeShift(duration: 20s , columns: ["_time"] )

|> set(key: "shift" , value: "2" )

shiftedData3 = data

|> timeShift(duration: 30s , columns: ["_time"] )

|> set(key: "shift" , value: "3")

shiftedData4 = data

|> timeShift(duration: 40s , columns: ["_time"] )

|> set(key: "shift" , value: "4")

union(tables: [shiftedData1, shiftedData2, shiftedData3, shiftedData4])

|> pivot(rowKey:["_time"], columnKey: ["shift"], valueColumn: "_value")

|> drop(columns: ["_measurement", "_time", "_value", "sensor_id", "_field"])

// remove the NaN values

|> limit(n:360)

|> tail(n: 356)

此外,我们必须使用向前验证来训练算法。这涉及到将数据集分为测试集和训练集。然后利用XGB Regressor对XGBoost模型进行训练,并用拟合方法进行预测。最后,我们使用平均绝对误差 (MAE)来确定预测的准确性。对于10秒的延迟,计算出的平均绝对误差 (MAE)为0.035。我们可以把这理解为96.5%的预测是非常正确的。下图展示了我们从XGBoost得到的预测结果与从训练/测试分割得到的期望值之间的对比。

图片

以下是完整的脚本。这段代码主要是从这里的教程中引入的。

import pandas as pd

from numpy import asarray

from sklearn.metrics import mean_absolute_error

from xgboost import XGBRegressor

from matplotlib import pyplot

from influxdb_client import InfluxDBClient

from influxdb_client.client.write_api import SYNCHRONOUS

# query data with the Python InfluxDB Client Library and transform data into a supervised learning problem with Flux

client = InfluxDBClient(url="https://us-west-2-1.aws.cloud2.influxdata.com", token="NyP-HzFGkObUBI4Wwg6Rbd-_SdrTMtZzbFK921VkMQWp3bv_e9BhpBi6fCBr_0-6i0ev32_XWZcmkDPsearTWA==", org="0437f6d51b579000")

# write_api = client.write_api(write_optinotallow=SYNCHRONOUS)

query_api = client.query_api()

df = query_api.query_data_frame('import "join"'

'import "influxdata/influxdb/sample"'

'data = sample.data(set: "airSensor")'

'|> filter(fn: (r) => r._field == "temperature" and r.sensor_id == "TLM0100")'

'shiftedData = data'

'|> timeShift(duration: 10s , columns: ["_time"] )'

'join.time(left: data, right: shiftedData, as: (l, r) => ({l with data: l._value, shiftedData: r._value}))'

'|> drop(columns: ["_measurement", "_time", "_value", "sensor_id", "_field"])'

'|> yield(name: "converted to supervised learning dataset")'

)

df = df.drop(columns=['table', 'result'])

data = df.to_numpy()

# split a univariate dataset into train/test sets

def train_test_split(data, n_test):

return data[:-n_test:], data[-n_test:]

# fit an xgboost model and make a one step prediction

def xgboost_forecast(train, testX):

# transform list into array

train = asarray(train)

# split into input and output columns

trainX, trainy = train[:, :-1], train[:, -1]

# fit model

model = XGBRegressor(objective='reg:squarederror', n_estimators=1000)

model.fit(trainX, trainy)

# make a one-step prediction

yhat = model.predict(asarray([testX]))

return yhat[0]

# walk-forward validation for univariate data

def walk_forward_validation(data, n_test):

predictions = list()

# split dataset

train, test = train_test_split(data, n_test)

history = [x for x in train]

# step over each time-step in the test set

for i in range(len(test)):

# split test row into input and output columns

testX, testy = test[i, :-1], test[i, -1]

# fit model on history and make a prediction

yhat = xgboost_forecast(history, testX)

# store forecast in list of predictions

predictions.append(yhat)

# add actual observation to history for the next loop

history.append(test[i])

# summarize progress

print('>expected=%.1f, predicted=%.1f' % (testy, yhat))

# estimate prediction error

error = mean_absolute_error(test[:, -1], predictions)

return error, test[:, -1], predictions

# evaluate

mae, y, yhat = walk_forward_validation(data, 100)

print('MAE: %.3f' % mae)

# plot expected vs predicted

pyplot.plot(y, label='Expected')

pyplot.plot(yhat, label='Predicted')

pyplot.legend()

pyplot.show()

五、结论

希望这篇博文能够激励人们利用XGBoost和InfluxDB进行预测。为此建议查看相关的报告,其中包括如何使用本文描述的许多算法和InfluxDB来进行预测和执行异常检测的示例。

原文链接:https://www.infoworld.com/article/3682070/time-series-forecasting-with-xgboost-and-influxdb.html

赞 ()
分享到:更多 ()

相关推荐

内容页底部广告位3
留言与评论(共有 0 条评论)
   
验证码: