DeepFM 是 Deep 与 FM 结合的产物,也是 Wide&Deep 的改进版,只是将其中的 LR 替换成了 FM,提升了模型 wide 侧提取信息的能力。
本文是纯干货,记录实现过程。
1、搭建环境
安装参考:https://fuxictr.github.io/tutorials/v2.0/installation.html
我用的是第2种源代码加载的方式
工作目录及文件结构:
- 📂releases
- 📂FuxiCTR-2.3.6
- 📂config
- 📂researchDeepFM_config
- dataset_config.yaml
- model_config.yaml
- 📂researchDeepFM_config
- 📂data
- 📂taobao_ad_csv
- 📂source
- raw_sample.csv
- ad_feature.csv
- behavior_log.csv
- user_profile.csv
- train_sample.csv
- valid_sample.csv
- test_sample.csv
- 📂source
- 📂taobao_ad_csv
- researchDeepFM.ipynb
1、安装torch
python -m pip install torch torchvision torchaudio
2、安装python -m pip install requirements.txt
官方的requirements.txt
有bug,部分包(polars、scikit-learn、numpy)的版本设置的不正确,修改后的
keras_preprocessing
pandas
PyYAML
scikit-learn==1.4.2
numpy==1.26.4
h5py
tqdm
pyarrow
polars<1.0.0
2、准备数据集
从天池中挑选 CTR 数据集
选择使用淘宝的广告数据集: https://tianchi.aliyun.com/dataset/56?spm=a2c22.12282016.0.0.27934197LJh3Hx
字段说明如下:
(1) user:脱敏过的用户ID;
(2) adgroup_id:脱敏过的广告单元ID;
(3) time_stamp:时间戳;
(4) pid:资源位;
(5) noclk:为1代表没有点击;为0代表点击;
(6) clk:为0代表没有点击;为1代表点击;
分割数据集
train、valid、test比例为6:2:2
# Author : xqk
# Time : 2025/01/09 15:03
# Function: 分割数据集
# GitHub: https://github.com/xqk
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split # 划分数据集
train_data = pd.read_csv("data/taobao_ad_csv/source/raw_sample.csv")
# 拆分数据
print("拆分数据中。。。")
split_ratio = 0.6 # 60%的训练数据
seed = 5 # 随机种子
# 分割训练集与测试集
xtrain, xtest, ytrain, ytest = train_test_split(
train_data, range(train_data.shape[0]), test_size=split_ratio, random_state=seed)
# print(xtest)
# print("------------------")
# print(xtrain)
xtrain.to_csv('data/taobao_ad_csv/train_sample.csv', index=False)
xtest.to_csv('data/taobao_ad_csv/xtest_sample.csv', index=False)
del train_data
del xtrain
del xtest
del ytrain
del ytest
# 再把x测试集按照1:1分为验证集合测试集
test_train_data = pd.read_csv("data/taobao_ad_csv/xtest_sample.csv")
test_xtrain, test_xtest, test_ytrain, test_ytest = train_test_split(
test_train_data, range(test_train_data.shape[0]), test_size=0.5, random_state=seed)
test_xtrain.to_csv('data/taobao_ad_csv/valid_sample.csv', index=False)
test_xtest.to_csv('data/taobao_ad_csv/test_sample.csv', index=False)
# 删除xtest_sample.csv
os.remove("data/taobao_ad_csv/xtest_sample.csv")
del test_train_data
del test_xtrain
del test_xtest
del test_ytrain
del test_ytest
print("拆分数据结束")
配置文件
config/researchDeepFM_config/dataset_config.yaml
taobao_ad_researchDeepFM:
data_root: ./data/
data_format: csv
train_data: ./data/taobao_ad_csv/train_sample.csv
valid_data: ./data/taobao_ad_csv/valid_sample.csv
test_data: ./data/taobao_ad_csv/test_sample.csv
min_categr_count: 1
feature_cols:
[{name: ["user","adgroup_id","pid", "nonclk"],
active: True, dtype: str, type: categorical},
{name: "weekday", active: True, dtype: str, type: categorical, preprocess: convert_weekday},
{name: "hour", active: True, dtype: str, type: categorical, preprocess: convert_hour}]
label_col: {name: clk, dtype: float}
config/researchDeepFM_config/model_config.yaml
Base:
model_root: './model_checkpoints/'
num_workers: 3
verbose: 1
early_stop_patience: 2
pickle_feature_encoder: True
save_best_only: True
eval_steps: null
debug_mode: False
group_id: null
use_features: null
feature_specs: null
feature_config: null
DeepFM_taobao_ad_csv:
model: DeepFM
dataset_id: taobao_ad_researchDeepFM
loss: 'binary_crossentropy'
metrics: ['logloss', 'AUC']
task: binary_classification
optimizer: adam
hidden_units: [64, 32]
hidden_activations: relu
net_regularizer: 0
embedding_regularizer: 1.e-8
learning_rate: 1.e-3
batch_norm: False
net_dropout: 0
batch_size: 128
embedding_dim: 4
epochs: 1
shuffle: True
seed: 2019
monitor: 'AUC'
monitor_mode: 'max'
3、训练模型
# 加载源码
import sys
sys.path.append('/data/users/user/lab/workspace/fuxictr/releases/FuxiCTR-2.3.6')
# Author : xqk
# Time : 2025/01/09 15:03
# Function: 训练DeepFM模型
# GitHub: https://github.com/xqk
import os
import logging
from datetime import datetime
from fuxictr.utils import load_config, set_logger, print_to_json
from fuxictr.features import FeatureMap
from fuxictr.pytorch.torch_utils import seed_everything
from fuxictr.pytorch.dataloaders import RankDataLoader
from fuxictr.preprocess import FeatureProcessor, build_dataset
from model_zoo import DeepFM
import polars as pl
from datetime import datetime
class CustomizedFeatureProcessor(FeatureProcessor):
""" 定制的特征
根据 time_stamp 生成 weekday 和 hour 特征值
"""
def convert_weekday(self, col_name=None):
def _convert_weekday(timestamp):
dt = datetime.fromtimestamp(timestamp)
return int(dt.strftime('%w'))
return pl.col("time_stamp").apply(_convert_weekday)
def convert_hour(self, col_name=None):
def _convert_hour(timestamp):
dt = datetime.fromtimestamp(timestamp)
return int(dt.hour)
return pl.col("time_stamp").apply(_convert_hour)
# Load params from config files
config_dir = './config/researchDeepFM_config'
experiment_id = 'DeepFM_taobao_ad_csv'
params = load_config(config_dir, experiment_id)
# set up logger and random seed
set_logger(params)
logging.info("Params: " + print_to_json(params))
seed_everything(seed=params['seed'])
# Set feature_encoder that defines how to preprocess data
feature_encoder = CustomizedFeatureProcessor(feature_cols=params["feature_cols"],
label_col=params["label_col"],
dataset_id=params["dataset_id"],
data_root=params["data_root"])
# Build dataset
params["train_data"], params["valid_data"], params["test_data"] = \
build_dataset(feature_encoder,
train_data=params["train_data"],
valid_data=params["valid_data"],
test_data=params["test_data"])
# Get feature_map that defines feature specs
data_dir = os.path.join(params['data_root'], params['dataset_id'])
feature_map = FeatureMap(params['dataset_id'], data_dir)
feature_map.load(os.path.join(data_dir, "feature_map.json"), params)
logging.info("Feature specs: " + print_to_json(feature_map.features))
# Get train and validation data generators
train_gen, valid_gen = RankDataLoader(feature_map,
stage='train',
train_data=params['train_data'],
valid_data=params['valid_data'],
batch_size=params['batch_size'],
data_format=params["data_format"],
shuffle=params['shuffle']).make_iterator()
# Model initialization and fitting
model = DeepFM(feature_map, **params)
model.fit(train_gen, validation_data=valid_gen, epochs=params['epochs'])
logging.info('***** Validation evaluation *****')
model.evaluate(valid_gen)
logging.info('***** Test evaluation *****')
test_gen = RankDataLoader(feature_map,
stage='test',
test_data=params['test_data'],
batch_size=params['batch_size'],
data_format=params["data_format"],
shuffle=False).make_iterator()
model.evaluate(test_gen)
4、使用模型预估
取用户当前上文的一些候选对象(例如:用户 A 搜索“客厅”得到一批图片,数据格式参照训练的数据),预测这些对象的“点击率”用于后续的排序
评估的代码示例:
input_data = RankDataLoader(feature_map,
stage='test',
test_data=params['test_data'],
batch_size=params['batch_size'],
data_format=params["data_format"],
shuffle=False).make_iterator()
# 进行评估,结果output为点击率分数的一维数组
output = model.predict(input_data)