Source code for flightrisk.features.pipeline
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import pandas as pd
from flightrisk.data.loaders import KKBoxArtifacts, OrangeBelgiumArtifacts
from flightrisk.features.kkbox import KKBoxFeatureConfig, build_kkbox_feature_matrix
from flightrisk.features.orange import OrangeFeatureConfig, prepare_orange_features
from flightrisk.utils.logging import get_logger
from flightrisk.utils.paths import get_paths
_log = get_logger(__name__)
[docs]
@dataclass(frozen=True)
class KKBoxFeatureBundle:
"""Output of the KKBox feature pipeline.
:param features: Wide feature matrix keyed by ``msno``.
:param labels: Churn labels aligned to ``features`` by ``msno``.
:param cutoff: Build cutoff used to assemble the matrix.
"""
features: pd.DataFrame
labels: pd.DataFrame
cutoff: pd.Timestamp
[docs]
def build_kkbox_bundle(
artifacts: KKBoxArtifacts,
*,
cutoff: str | pd.Timestamp,
config: KKBoxFeatureConfig | None = None,
) -> KKBoxFeatureBundle:
"""Run the full KKBox feature pipeline and align it with labels.
:param artifacts: Validated raw frames from
:func:`flightrisk.data.loaders.load_kkbox`.
:param cutoff: Build cutoff (string or :class:`pandas.Timestamp`).
:param config: Optional feature configuration.
:returns: A :class:`KKBoxFeatureBundle` ready for modeling.
"""
cutoff_ts = pd.Timestamp(cutoff)
cfg = config or KKBoxFeatureConfig()
_log.info("building kkbox feature matrix at cutoff=%s", cutoff_ts.date())
feature_matrix = build_kkbox_feature_matrix(
artifacts.members,
artifacts.transactions,
artifacts.user_logs,
cutoff=cutoff_ts,
config=cfg,
)
labels = artifacts.labels[["msno", "is_churn"]].copy()
aligned = feature_matrix.merge(labels, on="msno", how="inner")
features = aligned.drop(columns=["is_churn"])
aligned_labels = aligned[["msno", "is_churn"]]
_log.info(
"kkbox features: %d rows, %d feature columns",
len(features),
features.shape[1] - 1,
)
return KKBoxFeatureBundle(features=features, labels=aligned_labels, cutoff=cutoff_ts)
[docs]
@dataclass(frozen=True)
class OrangeFeatureBundle:
"""Output of the Orange Belgium feature pipeline.
:param features: Cleaned feature frame.
:param treatment: 0/1 treatment column from the original RCT.
:param outcome: 0/1 outcome column.
"""
features: pd.DataFrame
treatment: pd.Series
outcome: pd.Series
[docs]
def build_orange_bundle(
artifacts: OrangeBelgiumArtifacts,
*,
config: OrangeFeatureConfig | None = None,
) -> OrangeFeatureBundle:
"""Prepare the Orange Belgium dataset for uplift modelling.
:param artifacts: Validated raw frame from
:func:`flightrisk.data.loaders.load_orange_belgium`.
:param config: Optional feature configuration.
:returns: An :class:`OrangeFeatureBundle`.
"""
cfg = config or OrangeFeatureConfig()
_log.info("preparing orange-belgium features")
features = prepare_orange_features(artifacts.features, config=cfg)
return OrangeFeatureBundle(
features=features,
treatment=artifacts.treatment,
outcome=artifacts.outcome,
)
[docs]
def write_kkbox_bundle(bundle: KKBoxFeatureBundle, *, name: str = "kkbox") -> Path:
"""Persist a KKBox bundle to ``data/features/{name}/``.
:param bundle: Bundle returned by :func:`build_kkbox_bundle`.
:param name: Subdirectory name under ``data/features``.
:returns: The directory the bundle was written to.
"""
base = get_paths().data_features / name
base.mkdir(parents=True, exist_ok=True)
bundle.features.to_parquet(base / "features.parquet", index=False)
bundle.labels.to_parquet(base / "labels.parquet", index=False)
(base / "cutoff.txt").write_text(bundle.cutoff.isoformat())
_log.info("wrote kkbox bundle to %s", base)
return base
[docs]
def write_orange_bundle(bundle: OrangeFeatureBundle, *, name: str = "orange") -> Path:
"""Persist an Orange Belgium bundle to ``data/features/{name}/``.
:param bundle: Bundle returned by :func:`build_orange_bundle`.
:param name: Subdirectory name under ``data/features``.
:returns: The directory the bundle was written to.
"""
base = get_paths().data_features / name
base.mkdir(parents=True, exist_ok=True)
bundle.features.to_parquet(base / "features.parquet", index=False)
pd.DataFrame(
{
"treatment": bundle.treatment.reset_index(drop=True),
"outcome": bundle.outcome.reset_index(drop=True),
}
).to_parquet(base / "labels.parquet", index=False)
_log.info("wrote orange bundle to %s", base)
return base