【Rust】DataFrameライブラリPolarsを使ってみる

前置き

RustでDataFrameを扱うためのPolarsというライブラリがあります。

コア部分はRustで書かれていますが、Pythonからも使用することができます。
実際にGoogle検索してみると、RustよりもPythonの解説記事が多くヒットします。

RustからPolarsを扱う際に参考にできる情報が少なくて試行錯誤したので、個人的な備忘録も兼ねて記事を作成したいと思います。

公式ドキュメントはこちら: https://docs.pola.rs/

環境情報

$ cat /etc/os-release 
PRETTY_NAME="Zorin OS 17.2"
NAME="Zorin OS"
VERSION_ID="17"
VERSION="17.2"
VERSION_CODENAME=jammy
ID=zorin
ID_LIKE="ubuntu debian"
HOME_URL="https://zorin.com/os/"
SUPPORT_URL="https://help.zorin.com/"
BUG_REPORT_URL="https://zorin.com/os/feedback/"
PRIVACY_POLICY_URL="https://zorin.com/legal/privacy/"
UBUNTU_CODENAME=jammy
$ cargo version
cargo 1.83.0 (5ffbef321 2024-10-29)

事前準備

住所と郵便番号のCSVデータが公開されているので、今回はこれを題材にしたいと思います。
https://www.post.japanpost.jp/zipcode/dl/utf-zip.html

cargo initでプロジェクトを作成します。
その後、cargo addでpolarsとclapをインストールします。
clapはコマンドライン引数を扱うために使用します。

$ cargo add polars -F lazy,csv,parquet,strings
$ cargo add clap -F derive

オプションなしでaddすると今回使いたい機能が入らないので、-F (–features)オプションでfeatureを指定します。

Cargo.toml

[package]
name = "polars-sample"
version = "0.1.0"
edition = "2021"

[dependencies]
clap = { version = "4.5.27", features = ["derive"] }
polars = { version = "0.46.0", features = ["lazy", "csv", "parquet", "strings"] }

コード

use std::{
    error::Error,
    fs::{self, File},
    path::Path,
};

use clap::Parser;
use polars::{
    frame::DataFrame,
    io::{SerReader, SerWriter},
    prelude::{
        col, lit, CsvReadOptions, CsvWriter, DataType, Field, IntoLazy, ParquetWriter, Schema,
        SortMultipleOptions,
    },
};

#[derive(Debug, Parser)]
struct Args {
    #[arg(short, long, default_value = "utf_ken_all.csv")]
    input_filepath: String,
    #[arg(short, long, default_value = "Data")]
    output_dirname: String,
}

fn main() -> Result<(), Box<dyn Error>> {
    let args = Args::parse();

    //CSVファイルを読み込んでDataFrameを作成する
    let schema = Schema::from_iter(vec![
        Field::new("全国地方公共団体コード".into(), DataType::String),
        Field::new("旧郵便番号".into(), DataType::String),
        Field::new("郵便番号".into(), DataType::String),
        Field::new("都道府県名(カタカナ)".into(), DataType::String),
        Field::new("市区町村名(カタカナ)".into(), DataType::String),
        Field::new("町域名(カタカナ)".into(), DataType::String),
        Field::new("都道府県名(漢字)".into(), DataType::String),
        Field::new("市区町村名(漢字)".into(), DataType::String),
        Field::new("町域名(漢字)".into(), DataType::String),
        Field::new(
            "一町域が二以上の郵便番号で表される場合の表示".into(),
            DataType::Int32,
        ),
        Field::new(
            "小字毎に番地が起番されている町域の表示".into(),
            DataType::Int32,
        ),
        Field::new("丁目を有する町域の場合の表示".into(), DataType::Int32),
        Field::new(
            "一つの郵便番号で二以上の町域を表す場合の表示".into(),
            DataType::Int32,
        ),
        Field::new("更新の表示".into(), DataType::Int32),
        Field::new("変更理由".into(), DataType::Int32),
    ]);
    let df = CsvReadOptions::default()
        .with_has_header(false)
        .with_schema(Some(schema.into()))
        .try_into_reader_with_file_path(Some(args.input_filepath.into()))?
        .finish()?;
    println!("レコード数: {}", df.shape().0);

    //東京都のデータに絞り込む
    let df_tokyo = df
        .clone()
        .lazy()
        .filter(col("都道府県名(漢字)").eq(lit("東京都")))
        .collect()?;
    println!("東京都のレコード数: {}", df_tokyo.shape().0);

    //郵便番号を前後に分割する
    let df_postal_code = df
        .clone()
        .lazy()
        .with_columns([
            col("郵便番号")
                .str()
                .slice(lit(0), lit(3))
                .alias("postal_code_first"),
            col("郵便番号")
                .str()
                .slice(lit(3), lit(4))
                .alias("postal_code_second"),
        ])
        .collect()?;

    println!("郵便番号の分割:");
    println!(
        "{:?}",
        df_postal_code
            .select(["postal_code_first", "postal_code_second"])?
            .head(Some(10))
    );

    //郵便番号の最初の3桁でgroup byしてカウントする
    let df_postal_code_count = df_postal_code
        .clone()
        .lazy()
        .group_by([col("postal_code_first")])
        .agg([col("postal_code_first").count().alias("count")])
        .sort(
            ["count"],
            SortMultipleOptions::default().with_order_descending(true),
        )
        .collect()?;

    //作成したDataFrameを保存するディレクトリを作成する
    let output_dir = Path::new(&args.output_dirname);
    if !output_dir.exists() {
        fs::create_dir_all(&args.output_dirname)?;
    }

    //オリジナルのDataFrameはParquet形式で保存する
    let mut output_file_df = File::create(output_dir.join("original.parquet"))?;
    let mut df = df;
    ParquetWriter::new(&mut output_file_df).finish(&mut df)?;

    //その他のDataFrameはCSV形式で保存する
    let fn_save_as_csv = |filename: &str, df: &mut DataFrame| -> Result<(), Box<dyn Error>> {
        let mut output_file = File::create(output_dir.join(filename))?;
        CsvWriter::new(&mut output_file).finish(df)?;

        Ok(())
    };
    fn_save_as_csv("tokyo.csv", &mut df_tokyo.clone())?;
    fn_save_as_csv("postal_code_count.csv", &mut df_postal_code_count.clone())?;

    Ok(())
}

今回のCSVファイルはヘッダ行がないため、カラム名とデータ型はこちらで指定します。

Field::new("全国地方公共団体コード".into(), DataType::String)

0か1しか入っていないカラムのデータ型はUInt8とかBooleanでいいと思いましたが、それだとエラーになってうまくいかなかったため、おとなしくInt32を使っています。

作成したSchemaをCSVファイル読込みの際にパラメータとして渡します。

let df = CsvReadOptions::default()
        .with_has_header(false)
        .with_schema(Some(schema.into()))
        .try_into_reader_with_file_path(Some(args.input_filepath.into()))?
        .finish()?;

データを絞り込む際にはfilter()を使用します。

let df_tokyo = df
        .clone()
        .lazy()
        .filter(col("都道府県名(漢字)").eq(lit("東京都")))
        .collect()?;

ここで呼び出しているlazy()はLazyFrameを作成する関数です。
LazyFrameは遅延評価を行うデータフレームで、最終的にcollect()が実行されるまで実際の変換処理は実行されません。
遅延評価を行うことによって、クエリの大幅な効率化が可能になります。

郵便番号を前後に分割します。
カラムに対して何か操作したいときはwith_columns()を使用します。
ここでは、郵便番号カラムの値を文字列として取り出して、substringを取得しています。
処理後のデータを格納するカラム名はalias()で指定します。

let df_postal_code = df
        .clone()
        .lazy()
        .with_columns([
            col("郵便番号")
                .str()
                .slice(lit(0), lit(3))
                .alias("postal_code_first"),
            col("郵便番号")
                .str()
                .slice(lit(3), lit(4))
                .alias("postal_code_second"),
        ])
        .collect()?;

特定のカラムを取り出したいときはselect()を使用します。

println!(
        "{:?}",
        df_postal_code
            .select(["postal_code_first", "postal_code_second"])?
            .head(Some(10))
    );

郵便番号の最初の3桁でgroup byしてカウントします。

let df_postal_code_count = df_postal_code
        .clone()
        .lazy()
        .group_by([col("postal_code_first")])
        .agg([col("postal_code_first").count().alias("count")])
        .sort(
            ["count"],
            SortMultipleOptions::default().with_order_descending(true),
        )
        .collect()?;

DataFrameをファイルに出力します。
Parquet形式で出力する場合はParquetWriter、CSV形式で出力する場合はCsvWriterを使用します。
どちらも使い方は同じですね。

ParquetWriter::new(&mut output_file_df).finish(&mut df)?;

ところで、今回のコードではDataFrameに対してclone()を何度も実行しています。
Polarsのclone()はdeep copyするわけではないので、パフォーマンスには影響ありません。
lazy()を呼び出すと所有権が移動してしまうため、clone()でコピーを作成しています。
Pythonのドキュメントしか見つかりませんでしたが、以下のような記載があります。

This is a cheap operation that does not copy data.

メモ

郵便番号の頭3桁でgroup byしてカウントしたところ、最も多いのは939で1337件、最も少ないのは539で1件でした。
939は富山県、539は大阪市中央区に割り当てられているらしいです。