2  数据清洗

本章介绍数据清洗的完整流程,包括缺失值处理、日期格式统一、重复值删除、离群值标注等步骤。

2.1 清洗流程概览

步骤 具体要求 清洗前 清洗后
缺失值检测 统计每列缺失值数量和比例 见下表 无缺失值
缺失值处理 向前填充(ffill) - 已填充
日期格式统一 转换为datetime64格式 字符串 datetime64
数据类型检查 确认为数值型 已正确 已正确
重复值处理 检测并删除重复行 0条重复 0条重复
离群值标注 标注涨跌幅>±20%的记录 未标注 已标注

2.2 缺失值检测

2.2.1 检测方法

# 统计缺失值
missing_summary = {}
for code, df in stock_data.items():
    missing_count = df.isna().sum().sum()
    total_cells = df.shape[0] * df.shape[1]
    missing_pct = missing_count / total_cells * 100
    missing_summary[code] = {
        'missing_count': missing_count,
        'missing_pct': missing_pct
    }
    print(f"{code}: 缺失值 {missing_count} ({missing_pct:.2f}%)")

2.2.2 检测结果

股票代码 缺失值数量 缺失比例 数据质量
600036 0 0.00% 优秀
601398 0 0.00% 优秀
002594 0 0.00% 优秀
600104 0 0.00% 优秀
000002 0 0.00% 优秀
600519 0 0.00% 优秀
000858 0 0.00% 优秀
601857 0 0.00% 优秀
000063 0 0.00% 优秀
002352 0 0.00% 优秀
注意数据质量良好

本次数据无缺失值,akshare数据源质量较高。但仍需了解缺失值的可能原因和处理方法。

2.2.3 缺失值可能原因

原因 说明 示例
停牌 股票因重大事项停牌期间没有交易数据 重组、并购等
节假日 A股市场在法定节假日不开盘 春节、国庆等
数据源问题 akshare接口可能存在数据缺失 网络中断、服务器故障
新股上市 部分股票可能在2020年后才上市 注册制新股

2.3 缺失值处理

采用向前填充(ffill)为主,向后填充(bfill)为辅的方法:

def handle_missing_values(df):
    """处理缺失值"""
    # 记录处理前的缺失值数量
    before = df.isna().sum().sum()

    # 向前填充(使用历史数据)
    df = df.ffill()

    # 如果仍有缺失值(开头部分),用向后填充
    df = df.bfill()

    # 记录处理后的缺失值数量
    after = df.isna().sum().sum()

    return df, before, after

2.3.1 处理结果

指标 处理前 处理后 变化
缺失值总数 0 0 无变化
数据行数 1515 1515 无变化
提示填充方法选择理由

向前填充(ffill)更适合时间序列金融数据:

  1. 使用历史数据填补当前缺失,不会引入未来信息
  2. 保持价格/成交量的连续性
  3. 避免人为制造的价格跳变

向后填充(bfill)仅用于处理序列开头的缺失值。

2.4 日期格式统一

2.4.1 统一方法

# 转换为datetime格式
df['date'] = pd.to_datetime(df['date'])

# 设为索引
df.set_index('date', inplace=True)

# 验证日期格式
print(f"日期类型: {df.index.dtype}")
print(f"日期范围: {df.index.min()}{df.index.max()}")

2.4.2 统一结果

项目 统一前 统一后
数据类型 object(字符串) datetime64[ns]
日期格式 ‘2020-01-02’ 2020-01-02 00:00:00
索引设置 普通列 DataFrame索引

2.5 数据类型检查

2.5.1 检查方法

# 检查各列数据类型
numeric_cols = ['open', 'close', 'high', 'low', 'volume', 'amount']
for col in numeric_cols:
    if df[col].dtype == 'object':
        df[col] = pd.to_numeric(df[col], errors='coerce')
        print(f"{col}: 字符串转换为数值型")
    else:
        print(f"{col}: 已是 {df[col].dtype} 类型")

2.5.2 检查结果

列名 数据类型 说明
open float64 开盘价
close float64 收盘价
high float64 最高价
low float64 最低价
volume float64 成交量
amount float64 成交额

所有数值列已确认为float64类型,数据类型检查通过。

2.6 重复值处理

2.6.1 检测方法

# 检测重复日期
duplicates = df.index.duplicated().sum()
print(f"重复日期数量: {duplicates}")

# 删除重复行(保留第一条)
if duplicates > 0:
    df = df[~df.index.duplicated(keep='first')]
    print(f"已删除 {duplicates} 条重复记录")

2.6.2 处理结果

股票代码 重复记录数 删除数量 处理后行数
600036 0 0 1515
601398 0 0 1515
002594 0 0 1515
600104 0 0 1515
000002 0 0 1515
600519 0 0 1515
000858 0 0 1515
601857 0 0 1515
000063 0 0 1514
002352 0 0 1512

本次数据无重复记录。

2.7 离群值标注

计算日收益率并标注极端波动:

2.7.1 标注方法

# 计算日收益率
df['return'] = df['close'].pct_change()

# 标注离群值(单日涨跌幅超过±20%)
df['is_extreme'] = df['return'].abs() > 0.20

# 统计离群值
extreme_count = df['is_extreme'].sum()
print(f"极端波动记录: {extreme_count} 条 ({extreme_count/len(df)*100:.2f}%)")

2.7.2 标注结果

股票代码 极端波动天数 占比
600036 0 0.00%
601398 0 0.00%
002594 2 0.13%
600104 1 0.07%
000002 3 0.20%
600519 0 0.00%
000858 0 0.00%
601857 1 0.07%
000063 2 0.13%
002352 1 0.07%
警告极端波动原因分析
  • 涨跌停板限制:A股涨跌停限制为10%(主板)/20%(创业板、科创板)
  • 复牌交易:长期停牌后复牌可能出现大幅波动
  • 市场异常:极端市场事件(如2020年初疫情冲击)
  • 除权除息:分红派息后股价调整

2.8 宽表与长表转换

2.8.1 创建宽表

收盘价宽表:日期为索引,每列一只股票

# 收盘价宽表
close_prices = {code: df['close'] for code, df in cleaned_stock_data.items()}
wide_df = pd.DataFrame(close_prices)
print(f"宽表维度: {wide_df.shape}")

宽表结构

日期 招商银行 工商银行 比亚迪
2020-01-02 35.21 5.43 45.67
2020-01-03 35.45 5.41 46.12

2.8.2 转换为长表

# 使用pd.melt转换
long_df = wide_df.reset_index().melt(
    id_vars='date',
    var_name='code',
    value_name='close'
)
print(f"长表维度: {long_df.shape}")

长表结构

date code close
2020-01-02 600036 35.21
2020-01-02 601398 5.43
2020-01-02 002594 45.67

2.8.3 适用场景分析

提示宽表 vs 长表

宽表适用场景

  1. 多股票对比分析:直接比较不同股票在同一时点的价格
  2. 相关性分析:计算股票间的相关系数矩阵
  3. 可视化绘图:使用matplotlib绘制多股票走势图
  4. 投资组合分析:计算组合收益率、波动率等

长表适用场景

  1. 数据分组操作:按股票代码分组进行统计分析
  2. 数据库存储:符合数据库范式设计,便于SQL查询
  3. ggplot绘图:适合使用seaborn等工具绘制分面图
  4. 数据清洗:统一处理所有股票的缺失值、异常值
  5. 面板数据分析:适合固定效应模型等计量分析

2.9 多表合并

2.9.1 股票与指数合并

# 将个股数据与沪深300数据按日期合并
long_df = long_df.merge(
    hs300_data[['date', 'close']],
    on='date',
    how='left',
    suffixes=('', '_hs300')
)

合并结果

合并前行数 合并后行数 变化
15150 15150 无变化

2.9.2 宏观数据合并

# 添加年月列用于合并
long_df['year_month'] = long_df['date'].dt.strftime('%Y-%m')

# 合并CPI数据
long_df = long_df.merge(
    cpi_df,
    on='year_month',
    how='left'
)
注意频率处理

宏观数据为月度,股票数据为日度。处理方法:

  • 将月度宏观数据映射到对应月份的每个交易日
  • 使用year_month列进行合并
  • 同一月份的所有交易日共享同一个宏观指标值

2.10 数据存储

2.10.1 CSV格式(基础)

# 保存为CSV
long_df.to_csv('data/combined/combined_data.csv', index=False)

2.10.2 Parquet格式(进阶)

# 保存为Parquet
long_df.to_parquet('data/clean/stock_clean.parquet', index=False)

2.10.3 格式对比

import time

# CSV读取测试
t0 = time.time()
df_csv = pd.read_csv("data/clean/stock_clean.csv")
csv_time = time.time() - t0
csv_size = os.path.getsize('data/clean/stock_clean.csv') / 1024

# Parquet读取测试
t0 = time.time()
df_parquet = pd.read_parquet("data/clean/stock_clean.parquet")
parquet_time = time.time() - t0
parquet_size = os.path.getsize('data/clean/stock_clean.parquet') / 1024

print(f"CSV:   读取耗时 {csv_time:.3f}s, 文件大小 {csv_size:.1f} KB")
print(f"Parquet: 读取耗时 {parquet_time:.3f}s, 文件大小 {parquet_size:.1f} KB")
格式 文件大小 读取速度 类型安全 列式读取
CSV 1656.0 KB 0.013s 不支持
Parquet 693.2 KB 0.157s 支持

体积压缩率:Parquet比CSV小约58%

重要本次数据规模下的格式差异
  • 文件体积:Parquet明显更小(58%压缩率)
  • 读取速度:CSV更快(数据量小)
  • 应用建议:大数据场景下Parquet优势更明显

何时选择Parquet

  1. 数据量超过1GB
  2. 需要频繁读取部分列
  3. 需要与其他大数据工具(Spark)集成
  4. 需要保留Schema信息

2.11 小结

本章完成了以下数据清洗工作:

步骤 清洗前 清洗后 变化说明
缺失值检测 - 0个缺失值 数据质量良好
日期格式 字符串 datetime64 便于时间序列分析
数据类型 已正确 已正确 无需转换
重复值 0条 0条 无重复记录
离群值 未标注 已标注 添加is_extreme列
宽表长表 分离 已转换 便于不同分析
多表合并 分散 已合并 综合数据集
数据存储 - CSV+Parquet 双格式存储

下一章将进行描述性统计与可视化分析。