DeepFM 是 Deep 与 FM 结合的产物,也是 Wide&Deep 的改进版,只是将其中的 LR 替换成了 FM,提升了模型 wide 侧提取信息的能力。

本文是纯干货,记录实现过程。

1、搭建环境

安装参考:https://fuxictr.github.io/tutorials/v2.0/installation.html
我用的是第2种源代码加载的方式
工作目录及文件结构:

  • 📂releases
    • 📂FuxiCTR-2.3.6
  • 📂config
    • 📂researchDeepFM_config
      • dataset_config.yaml
      • model_config.yaml
  • 📂data
    • 📂taobao_ad_csv
      • 📂source
        • raw_sample.csv
        • ad_feature.csv
        • behavior_log.csv
        • user_profile.csv
      • train_sample.csv
      • valid_sample.csv
      • test_sample.csv
  • researchDeepFM.ipynb

1、安装torch
python -m pip install torch torchvision torchaudio
2、安装python -m pip install requirements.txt
官方的requirements.txt有bug,部分包(polars、scikit-learn、numpy)的版本设置的不正确,修改后的

keras_preprocessing
pandas
PyYAML
scikit-learn==1.4.2
numpy==1.26.4
h5py
tqdm
pyarrow
polars<1.0.0

2、准备数据集

天池中挑选 CTR 数据集

选择使用淘宝的广告数据集: https://tianchi.aliyun.com/dataset/56?spm=a2c22.12282016.0.0.27934197LJh3Hx

字段说明如下:
(1) user:脱敏过的用户ID;
(2) adgroup_id:脱敏过的广告单元ID;
(3) time_stamp:时间戳;
(4) pid:资源位;
(5) noclk:为1代表没有点击;为0代表点击;
(6) clk:为0代表没有点击;为1代表点击;

分割数据集

train、valid、test比例为6:2:2

# Author : xqk 
# Time : 2025/01/09 15:03 
# Function: 分割数据集
# GitHub: https://github.com/xqk

import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split  # 划分数据集

train_data = pd.read_csv("data/taobao_ad_csv/source/raw_sample.csv")

# 拆分数据
print("拆分数据中。。。")
split_ratio = 0.6  # 60%的训练数据
seed = 5  # 随机种子
 
# 分割训练集与测试集
xtrain, xtest, ytrain, ytest = train_test_split(
    train_data, range(train_data.shape[0]), test_size=split_ratio, random_state=seed)
 
# print(xtest)
# print("------------------")
# print(xtrain)

xtrain.to_csv('data/taobao_ad_csv/train_sample.csv', index=False)
xtest.to_csv('data/taobao_ad_csv/xtest_sample.csv', index=False)

del train_data
del xtrain
del xtest
del ytrain
del ytest

# 再把x测试集按照1:1分为验证集合测试集
test_train_data = pd.read_csv("data/taobao_ad_csv/xtest_sample.csv")
test_xtrain, test_xtest, test_ytrain, test_ytest = train_test_split(
    test_train_data, range(test_train_data.shape[0]), test_size=0.5, random_state=seed)
test_xtrain.to_csv('data/taobao_ad_csv/valid_sample.csv', index=False)
test_xtest.to_csv('data/taobao_ad_csv/test_sample.csv', index=False)

# 删除xtest_sample.csv
os.remove("data/taobao_ad_csv/xtest_sample.csv") 

del test_train_data
del test_xtrain
del test_xtest
del test_ytrain
del test_ytest

print("拆分数据结束")

配置文件

config/researchDeepFM_config/dataset_config.yaml

taobao_ad_researchDeepFM:
    data_root: ./data/
    data_format: csv
    train_data: ./data/taobao_ad_csv/train_sample.csv
    valid_data: ./data/taobao_ad_csv/valid_sample.csv
    test_data: ./data/taobao_ad_csv/test_sample.csv
    min_categr_count: 1
    feature_cols:
        [{name: ["user","adgroup_id","pid", "nonclk"], 
                 active: True, dtype: str, type: categorical},
         {name: "weekday", active: True, dtype: str, type: categorical, preprocess: convert_weekday},
         {name: "hour", active: True, dtype: str, type: categorical, preprocess: convert_hour}]
    label_col: {name: clk, dtype: float}

config/researchDeepFM_config/model_config.yaml

Base: 
    model_root: './model_checkpoints/'
    num_workers: 3
    verbose: 1
    early_stop_patience: 2
    pickle_feature_encoder: True
    save_best_only: True
    eval_steps: null
    debug_mode: False
    group_id: null
    use_features: null
    feature_specs: null
    feature_config: null

DeepFM_taobao_ad_csv:
    model: DeepFM
    dataset_id: taobao_ad_researchDeepFM
    loss: 'binary_crossentropy'
    metrics: ['logloss', 'AUC']
    task: binary_classification
    optimizer: adam
    hidden_units: [64, 32]
    hidden_activations: relu
    net_regularizer: 0
    embedding_regularizer: 1.e-8
    learning_rate: 1.e-3
    batch_norm: False
    net_dropout: 0
    batch_size: 128
    embedding_dim: 4
    epochs: 1
    shuffle: True
    seed: 2019
    monitor: 'AUC'
    monitor_mode: 'max'

3、训练模型

# 加载源码
import sys
sys.path.append('/data/users/user/lab/workspace/fuxictr/releases/FuxiCTR-2.3.6')
# Author : xqk 
# Time : 2025/01/09 15:03 
# Function: 训练DeepFM模型
# GitHub: https://github.com/xqk

import os
import logging
from datetime import datetime
from fuxictr.utils import load_config, set_logger, print_to_json
from fuxictr.features import FeatureMap
from fuxictr.pytorch.torch_utils import seed_everything
from fuxictr.pytorch.dataloaders import RankDataLoader
from fuxictr.preprocess import FeatureProcessor, build_dataset
from model_zoo import DeepFM
import polars as pl
from datetime import datetime

class CustomizedFeatureProcessor(FeatureProcessor):
    """ 定制的特征
    根据 time_stamp 生成 weekday 和 hour 特征值
    """

    def convert_weekday(self, col_name=None):
        def _convert_weekday(timestamp):
            dt = datetime.fromtimestamp(timestamp)
            return int(dt.strftime('%w'))
        return pl.col("time_stamp").apply(_convert_weekday)

    def convert_hour(self, col_name=None):
        def _convert_hour(timestamp):
            dt = datetime.fromtimestamp(timestamp)
            return int(dt.hour)
        return pl.col("time_stamp").apply(_convert_hour)

# Load params from config files
config_dir = './config/researchDeepFM_config'
experiment_id = 'DeepFM_taobao_ad_csv'
params = load_config(config_dir, experiment_id)

# set up logger and random seed
set_logger(params)
logging.info("Params: " + print_to_json(params))
seed_everything(seed=params['seed'])

# Set feature_encoder that defines how to preprocess data
feature_encoder = CustomizedFeatureProcessor(feature_cols=params["feature_cols"],
                                             label_col=params["label_col"],
                                             dataset_id=params["dataset_id"], 
                                             data_root=params["data_root"])

# Build dataset
params["train_data"], params["valid_data"], params["test_data"] = \
    build_dataset(feature_encoder, 
                  train_data=params["train_data"],
                  valid_data=params["valid_data"],
                  test_data=params["test_data"])

# Get feature_map that defines feature specs
data_dir = os.path.join(params['data_root'], params['dataset_id'])
feature_map = FeatureMap(params['dataset_id'], data_dir)
feature_map.load(os.path.join(data_dir, "feature_map.json"), params)
logging.info("Feature specs: " + print_to_json(feature_map.features))

# Get train and validation data generators
train_gen, valid_gen = RankDataLoader(feature_map, 
                                      stage='train', 
                                      train_data=params['train_data'],
                                      valid_data=params['valid_data'],
                                      batch_size=params['batch_size'],
                                      data_format=params["data_format"],
                                      shuffle=params['shuffle']).make_iterator()

# Model initialization and fitting
model = DeepFM(feature_map, **params)
model.fit(train_gen, validation_data=valid_gen, epochs=params['epochs'])

logging.info('***** Validation evaluation *****')
model.evaluate(valid_gen)

logging.info('***** Test evaluation *****')
test_gen = RankDataLoader(feature_map, 
                          stage='test',
                          test_data=params['test_data'],
                          batch_size=params['batch_size'],
                          data_format=params["data_format"],
                          shuffle=False).make_iterator()
model.evaluate(test_gen)

4、使用模型预估

取用户当前上文的一些候选对象(例如:用户 A 搜索“客厅”得到一批图片,数据格式参照训练的数据),预测这些对象的“点击率”用于后续的排序

评估的代码示例:


input_data = RankDataLoader(feature_map, 
                          stage='test',
                          test_data=params['test_data'],
                          batch_size=params['batch_size'],
                          data_format=params["data_format"],
                          shuffle=False).make_iterator()

# 进行评估,结果output为点击率分数的一维数组
output = model.predict(input_data)