Skip to content

API Reference

This page provides automated documentation for the core components of newt.

Binning

newt.features.binning.binner

Unified binning interface.

Provides a single entry point for binning features using various algorithms.

Classes

Binner

Bases: BinnerStatsMixin, BinnerIOMixin

Unified interface for multi-feature binning using various algorithms.

The Binner class manages the discretization of multiple features, handles missing values automatically, and stores WOE encoders for downstream modeling. It supports both supervised (ChiMerge, Decision Tree, Optimal) and unsupervised (K-Means, Equal Width, Equal Frequency) algorithms.

Supported methods
  • 'chi': ChiMerge (Default)
  • 'dt': Decision Tree
  • 'opt': Optimal Binning
  • 'kmean': K-Means
  • 'quantile': Equal Frequency
  • 'step': Equal Width

Examples:

>>> from newt.features.binning import Binner
>>> binner = Binner()
>>> binner.fit(X_train, y_train, method='chi', n_bins=5, monotonic=True)
>>> # Access results via item access
>>> print(binner['age'].stats)
>>> binner['age'].plot()
>>> # Transform new data
>>> X_binned = binner.transform(X_test)
Source code in src/newt/features/binning/binner.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
class Binner(BinnerStatsMixin, BinnerIOMixin):
    """Unified interface for multi-feature binning using various algorithms.

    The Binner class manages the discretization of multiple features, handles
    missing values automatically, and stores WOE encoders for downstream modeling.
    It supports both supervised (ChiMerge, Decision Tree, Optimal) and
    unsupervised (K-Means, Equal Width, Equal Frequency) algorithms.

    Supported methods:
        - 'chi': ChiMerge (Default)
        - 'dt': Decision Tree
        - 'opt': Optimal Binning
        - 'kmean': K-Means
        - 'quantile': Equal Frequency
        - 'step': Equal Width

    Examples:
        >>> from newt.features.binning import Binner
        >>> binner = Binner()
        >>> binner.fit(X_train, y_train, method='chi', n_bins=5, monotonic=True)
        >>> # Access results via item access
        >>> print(binner['age'].stats)
        >>> binner['age'].plot()
        >>> # Transform new data
        >>> X_binned = binner.transform(X_test)
    """

    def __init__(self):
        """Initialize the Binner."""
        self.rules_: Dict[str, List[float]] = {}
        self.method_map = {
            "chi": ChiMergeBinner,
            "dt": DecisionTreeBinner,
            "kmean": KMeansBinner,
            "quantile": EqualFrequencyBinner,
            "step": EqualWidthBinner,
            "opt": OptBinningBinner,
        }
        self.binners_: Dict[str, BaseBinner] = {}
        self.woe_maps_: Dict[str, Dict[Any, float]] = {}
        self.ivs_: Dict[str, float] = {}
        self.stats_: Dict[str, pd.DataFrame] = {}
        self._X: Optional[pd.DataFrame] = None
        self._y: Optional[pd.Series] = None
        self._features: List[str] = []
        self._missing_label = "Missing"

    @staticmethod
    def _count_bins_from_splits(values: pd.Series, splits: List[float]) -> List[int]:
        """Count samples per bin under ``pd.cut(..., right=True)`` semantics."""
        if values.empty:
            return [0]
        if not splits:
            return [int(values.shape[0])]

        split_array = np.asarray(splits, dtype=np.float64)
        value_array = values.to_numpy(dtype=np.float64, copy=False)
        bin_index = np.searchsorted(split_array, value_array, side="right")
        counts = np.bincount(bin_index, minlength=len(split_array) + 1)
        return counts.astype(int).tolist()

    @staticmethod
    def _select_split_to_merge(counts: List[int], small_bin_index: int) -> int:
        """Select one split index to remove for a small bin."""
        if len(counts) <= 1:
            raise ValueError("At least two bins are required to merge.")

        last_bin_index = len(counts) - 1
        if small_bin_index <= 0:
            return 0
        if small_bin_index >= last_bin_index:
            return last_bin_index - 1

        left_count = counts[small_bin_index - 1]
        right_count = counts[small_bin_index + 1]
        if left_count <= right_count:
            return small_bin_index - 1
        return small_bin_index

    def _converge_feature_splits(
        self,
        binner: BaseBinner,
        col_data: pd.Series,
        y_series: Optional[pd.Series],
        min_sample_count: Optional[int],
    ) -> List[float]:
        """Converge feature splits under min-sample and monotonic constraints."""
        current_splits = sorted(list(set(getattr(binner, "splits_", []))))
        valid_mask = col_data.notna()
        X_valid = col_data[valid_mask]
        if X_valid.empty:
            return current_splits

        y_valid = y_series[valid_mask] if y_series is not None else None

        while True:
            if binner.monotonic and y_valid is not None and current_splits:
                current_splits = sorted(
                    list(
                        set(
                            binner._adjust_monotonicity(
                                X_valid, y_valid, current_splits
                            )
                        )
                    )
                )

            if min_sample_count is None:
                break

            counts = self._count_bins_from_splits(X_valid, current_splits)
            small_bin_index = next(
                (idx for idx, count in enumerate(counts) if count < min_sample_count),
                None,
            )
            if small_bin_index is None or not current_splits:
                break

            split_index = self._select_split_to_merge(counts, small_bin_index)
            current_splits.pop(split_index)

        return current_splits

    def _store_feature_binner(
        self,
        feature: str,
        binner: BaseBinner,
        col_data: pd.Series,
        y_series: Optional[pd.Series],
        min_sample_count: Optional[int],
    ) -> None:
        """Finalize and store one fitted feature binner."""
        final_splits = self._converge_feature_splits(
            binner=binner,
            col_data=col_data,
            y_series=y_series,
            min_sample_count=min_sample_count,
        )
        binner.splits_ = final_splits
        binner.is_fitted_ = True
        self.binners_[feature] = binner
        self.rules_[feature] = final_splits

    @property
    def woe_encoders_(self) -> Dict[str, Any]:
        """Get WOE encoders dictionary (for backward compatibility).

        Returns:
            Dict[str, Any]: Mapping of feature names to WOEEncoder objects.
        """
        from newt.features.analysis.woe_calculator import WOEEncoder

        encoders = {}
        for feature, woe_map in self.woe_maps_.items():
            encoder = WOEEncoder()
            encoder.woe_map_ = woe_map
            encoder.iv_ = self.ivs_.get(feature, 0.0)
            encoder.is_fitted_ = True
            encoders[feature] = encoder
        return encoders

    def _prepare_fit_inputs(
        self,
        X: pd.DataFrame,
        y: Optional[Union[pd.Series, str]],
        method: str,
    ) -> Tuple[pd.DataFrame, Optional[pd.Series]]:
        """Resolve target input and validate supervised targets."""
        y_series = y
        if isinstance(y, str):
            y_series = X[y]
            if y in X.columns:
                X = X.drop(columns=[y])

        if method == "chi":
            if y_series is None:
                raise ValueError("ChiMerge requires target 'y'.")
            if not isinstance(y_series, pd.Series):
                y_series = pd.Series(y_series, index=X.index)
            y_series = _validate_chi_target(
                y_series,
                context="Binner.fit(method='chi')",
            )

        return X, y_series

    def _reset_fit_state(
        self,
        X: pd.DataFrame,
        y_series: Optional[pd.Series],
        numeric_cols: List[str],
    ) -> None:
        """Reset fitted attributes before a fresh fit."""
        self.rules_ = {}
        self.binners_ = {}
        self.woe_maps_ = {}
        self.ivs_ = {}
        self.stats_ = {}
        self._X = X.copy()
        self._y = y_series.copy() if y_series is not None else None
        self._features = numeric_cols

    @staticmethod
    def _resolve_fit_columns(X: pd.DataFrame, cols: Optional[List[str]]) -> List[str]:
        if cols:
            return [column for column in cols if column in X.columns]
        return list(X.select_dtypes(include=[np.number]).columns)

    @staticmethod
    def _resolve_feature_min_samples(
        X: pd.DataFrame,
        numeric_cols: List[str],
        min_samples: Union[int, float, None],
    ) -> Dict[str, Optional[int]]:
        feature_min_samples: Dict[str, Optional[int]] = {
            col: None for col in numeric_cols
        }
        if min_samples is None:
            return feature_min_samples

        for col in numeric_cols:
            valid_count = int(X[col].notna().sum())
            if valid_count <= 0:
                continue
            feature_min_samples[col] = _resolve_min_samples_count(
                min_samples=min_samples,
                sample_count=valid_count,
                context=f"Binner.fit(feature='{col}')",
            )
        return feature_min_samples

    @staticmethod
    def _build_binner_kwargs(
        method: str,
        n_bins: int,
        min_samples: Union[int, float, None],
        monotonic: Union[bool, str, None],
        extra_kwargs: Dict[str, Any],
    ) -> Dict[str, Any]:
        kwargs_binner: Dict[str, Any] = {"n_bins": n_bins, "monotonic": monotonic}
        if method == "dt" and min_samples is not None:
            kwargs_binner["min_samples_leaf"] = min_samples
        if method == "chi" and min_samples is not None:
            kwargs_binner["min_samples"] = min_samples
        kwargs_binner.update(extra_kwargs)
        return kwargs_binner

    @staticmethod
    def _load_tqdm():
        try:
            from tqdm.auto import tqdm

            return tqdm
        except ImportError:
            return None

    def _fit_sequential_features(
        self,
        X: pd.DataFrame,
        y_series: Optional[pd.Series],
        method: str,
        n_bins: int,
        min_samples: Union[int, float, None],
        monotonic: Union[bool, str, None],
        numeric_cols: List[str],
        feature_min_samples: Dict[str, Optional[int]],
        show_progress: bool,
        extra_kwargs: Dict[str, Any],
        tqdm,
    ) -> None:
        """Fit features one by one using the selected Python binner."""
        pbar = (
            tqdm(numeric_cols, desc="Binning features", disable=not show_progress)
            if tqdm
            else numeric_cols
        )
        for col in pbar:
            binner_cls = self.method_map.get(method)
            if binner_cls is None:
                raise ValueError(f"Unknown method: {method}")

            kwargs_binner = self._build_binner_kwargs(
                method=method,
                n_bins=n_bins,
                min_samples=min_samples,
                monotonic=monotonic,
                extra_kwargs=extra_kwargs,
            )
            binner = binner_cls(**kwargs_binner)

            col_data = X[col]
            valid_mask = col_data.notna()
            if valid_mask.sum() == 0:
                continue

            y_fit = y_series[valid_mask] if y_series is not None else None
            binner.fit(col_data[valid_mask], y_fit)
            self._store_feature_binner(
                feature=col,
                binner=binner,
                col_data=col_data,
                y_series=y_series,
                min_sample_count=feature_min_samples.get(col),
            )

    @staticmethod
    def _normalize_split_lists(split_lists) -> List[List[float]]:
        return [sorted(list(set(splits))) for splits in split_lists]

    @classmethod
    def _adjust_batch_monotonic_splits(
        cls,
        rust_module,
        feature_arrays: List[np.ndarray],
        y_arr: np.ndarray,
        split_lists: List[List[float]],
        monotonic: Union[bool, str, None],
    ) -> Tuple[List[List[float]], List[bool]]:
        """Adjust Rust batch splits for monotonicity when requested."""
        if not monotonic:
            return split_lists, [True] * len(split_lists)

        monotonic_success = [False] * len(split_lists)
        if not hasattr(rust_module, "adjust_batch_chi_merge_monotonic_numpy"):
            return split_lists, monotonic_success

        try:
            native_result = rust_module.adjust_batch_chi_merge_monotonic_numpy(
                feature_arrays,
                y_arr,
                split_lists,
                _resolve_monotonic_mode(monotonic),
            )
        except Exception:
            return split_lists, monotonic_success

        if isinstance(native_result, tuple) and len(native_result) == 2:
            candidate_splits, success_flags = native_result
            if len(candidate_splits) == len(split_lists) and len(success_flags) == len(
                split_lists
            ):
                return cls._normalize_split_lists(candidate_splits), [
                    bool(success) for success in success_flags
                ]
            return split_lists, monotonic_success

        if len(native_result) == len(split_lists):
            return cls._normalize_split_lists(native_result), [True] * len(split_lists)

        return split_lists, monotonic_success

    def fit(
        self,
        X: pd.DataFrame,
        y: Optional[Union[pd.Series, str]] = None,
        method: str = "chi",
        n_bins: int = BINNING.DEFAULT_N_BINS,
        min_samples: Union[int, float, None] = None,
        cols: Optional[List[str]] = None,
        monotonic: Union[bool, str, None] = None,
        show_progress: bool = True,
        **kwargs,
    ) -> "Binner":
        """Fit the binning model to multiple features.

        Initializes and fits specific binning algorithms for each selected feature,
        calculates binning statistics, and stores WOE mappings.

        Args:
            X: Data to be binned.
            y: Target data or target column name. Required for supervised methods.
            method: Binning algorithm name ('chi', 'dt', 'opt', 'kmean', etc.).
            n_bins: Target number of bins.
            min_samples: Minimum samples threshold.
                - For 'dt': minimum samples per leaf.
                - For 'chi': float in (0, 1] means minimum bin proportion,
                  int means minimum absolute samples per bin.
            cols: List of columns to bin. If None, all numeric columns are selected.
            monotonic: Enforce monotonic bad rate trend.
                - True/'auto': Enforce auto-detected trend.
                - 'ascending'/'descending': Enforce specific trend.
            show_progress: Whether to show a progress bar.
            **kwargs: Additional parameters passed to the underlying binner.

        Returns:
            Binner: The fitted Binner instance.

        Examples:
            >>> binner.fit(df, target='default', method='chi', monotonic=True)
        """
        X, y_series = self._prepare_fit_inputs(X, y, method)
        numeric_cols = self._resolve_fit_columns(X, cols)
        feature_min_samples = self._resolve_feature_min_samples(
            X=X,
            numeric_cols=numeric_cols,
            min_samples=min_samples,
        )
        self._reset_fit_state(X, y_series, numeric_cols)

        tqdm = self._load_tqdm()

        # Determine if we can use batch Rust ChiMerge
        rust_module = _load_rust_engine()
        use_batch_rust = (
            method == "chi"
            and y_series is not None
            and rust_module
            and hasattr(rust_module, "calculate_batch_chi_merge_numpy")
        )

        if use_batch_rust:
            from scipy import stats

            threshold = float(stats.chi2.ppf(1 - (kwargs.get("alpha", 0.05)), 1))
            binner_cls = self.method_map.get(method)
            if binner_cls is None:
                raise ValueError(f"Unknown method: {method}")

            kwargs_binner = {"n_bins": n_bins, "monotonic": monotonic}
            if min_samples is not None:
                kwargs_binner["min_samples"] = min_samples
            kwargs_binner.update(kwargs)

            pbar = (
                tqdm(
                    total=len(numeric_cols),
                    desc="Binning features (Rust Batch)",
                    disable=not show_progress,
                )
                if tqdm
                else None
            )

            # Group columns by missing-mask so each batch call can share the same y.
            grouped_cols: Dict[bytes, Dict[str, Any]] = {}
            feature_meta: Dict[str, Dict[str, Any]] = {}
            for col in numeric_cols:
                binner = binner_cls(**kwargs_binner)
                col_data = X[col]
                valid_mask = col_data.notna()
                if valid_mask.sum() == 0:
                    if pbar is not None:
                        pbar.update(1)
                    continue

                feature_meta[col] = {
                    "binner": binner,
                    "col_data": col_data,
                    "valid_mask": valid_mask,
                }

                mask_key = valid_mask.to_numpy(dtype=np.bool_, copy=False).tobytes()
                if mask_key not in grouped_cols:
                    grouped_cols[mask_key] = {"mask": valid_mask, "cols": []}
                grouped_cols[mask_key]["cols"].append(col)

            def _fit_single_column_fallback(col: str):
                meta = feature_meta[col]
                binner = meta["binner"]
                col_data = meta["col_data"]
                valid_mask = meta["valid_mask"]
                binner.fit(col_data[valid_mask], y_series[valid_mask])
                self._store_feature_binner(
                    feature=col,
                    binner=binner,
                    col_data=col_data,
                    y_series=y_series,
                    min_sample_count=feature_min_samples.get(col),
                )
                if pbar is not None:
                    pbar.update(1)

            for group in grouped_cols.values():
                valid_mask = group["mask"]
                cols_in_group = group["cols"]
                y_arr = y_series[valid_mask].astype(np.int64).to_numpy()
                feature_arrays = [
                    X[col][valid_mask].astype(np.float64).to_numpy()
                    for col in cols_in_group
                ]
                min_sample_count = _resolve_chi_min_samples_count(
                    kwargs_binner.get("min_samples", 0.05),
                    len(y_arr),
                    context="Binner.fit(method='chi')",
                )

                try:
                    batch_splits = rust_module.calculate_batch_chi_merge_numpy(
                        feature_arrays,
                        y_arr,
                        n_bins,
                        threshold,
                        min_sample_count,
                    )
                except Exception:
                    for col in cols_in_group:
                        _fit_single_column_fallback(col)
                    continue

                if len(batch_splits) != len(cols_in_group):
                    for col in cols_in_group:
                        _fit_single_column_fallback(col)
                    continue

                split_lists = self._normalize_split_lists(batch_splits)
                adjusted_split_lists, monotonic_success = (
                    self._adjust_batch_monotonic_splits(
                        rust_module=rust_module,
                        feature_arrays=feature_arrays,
                        y_arr=y_arr,
                        split_lists=split_lists,
                        monotonic=monotonic,
                    )
                )

                for split_idx, col in enumerate(cols_in_group):
                    meta = feature_meta[col]
                    binner = meta["binner"]
                    col_data = meta["col_data"]
                    valid_mask = meta["valid_mask"]

                    try:
                        split_list = split_lists[split_idx]
                        if binner.monotonic:
                            if monotonic_success[split_idx]:
                                split_list = adjusted_split_lists[split_idx]
                            else:
                                split_list = BaseBinner._adjust_monotonicity(
                                    binner,
                                    col_data[valid_mask],
                                    y_series[valid_mask],
                                    split_list,
                                )

                        binner.splits_ = split_list
                        self._store_feature_binner(
                            feature=col,
                            binner=binner,
                            col_data=col_data,
                            y_series=y_series,
                            min_sample_count=feature_min_samples.get(col),
                        )
                    except Exception:
                        _fit_single_column_fallback(col)
                        continue

                    if pbar is not None:
                        pbar.update(1)

            if pbar is not None:
                pbar.close()

        else:
            # Sequential fallback
            self._fit_sequential_features(
                X=X,
                y_series=y_series,
                method=method,
                n_bins=n_bins,
                min_samples=min_samples,
                monotonic=monotonic,
                numeric_cols=numeric_cols,
                feature_min_samples=feature_min_samples,
                show_progress=show_progress,
                extra_kwargs=kwargs,
                tqdm=tqdm,
            )

        # Calculate and store statistics
        self.fit_woe(X, y_series, show_progress=show_progress)

        return self

    def fit_woe(
        self,
        X: pd.DataFrame,
        y: Union[pd.Series, str],
        show_progress: bool = True,
    ) -> "Binner":
        """Calculate and update WOE mappings for all features.

        Applicable when rules are loaded or manually set. This method updates
        WOE and IV statistics without changing existing split points.

        Args:
            X: Input DataFrame.
            y: Target data or target column name.
            show_progress: Whether to show a progress bar.

        Returns:
            Binner: Self.
        """
        y_series = y
        if isinstance(y, str):
            y_series = X[y]

        self._X = X.copy()
        self._y = y_series.copy() if y_series is not None else None

        if self._y is None:
            return self

        self._update_all_stats()
        return self

    def transform(
        self,
        X: pd.DataFrame,
        labels: bool = False,
        show_progress: bool = False,
    ) -> pd.DataFrame:
        """Discretizes values based on splits discovered during fitting. Missing
        values are automatically assigned to a 'Missing' bin.

        Args:
            X: Data to transform.
            labels: If True, return bin intervals (str).
                If False, return bin indices (int).
            show_progress: Whether to show a progress bar.

        Returns:
            pd.DataFrame: Binned data with original columns replaced by
                bin codes/labels.
        """
        X_new = X.copy()

        # tqdm for progress tracking
        try:
            from tqdm.auto import tqdm

            pbar = tqdm(
                self.binners_.items(),
                desc="Transforming features",
                disable=not show_progress,
            )
        except ImportError:
            pbar = self.binners_.items()

        for col, binner in pbar:
            if col not in X_new.columns:
                continue

            col_data = X[col]
            valid_mask = col_data.notna()

            # Transform valid values
            binned = pd.Series(index=col_data.index, dtype=object)

            if valid_mask.any():
                valid_binned = binner.transform(col_data[valid_mask])

                if labels:
                    binned[valid_mask] = valid_binned.astype(str)
                else:
                    binned[valid_mask] = valid_binned.cat.codes

            # Handle missing values - separate bin
            binned[~valid_mask] = self._missing_label if labels else -1

            X_new[col] = binned

        return X_new

    def woe_transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """Convenience method to bin and WOE-transform data in one pass.

        Args:
            X: Raw feature DataFrame.

        Returns:
            pd.DataFrame: WOE-encoded DataFrame.

        Examples:
            >>> X_woe = binner.woe_transform(X_raw)
        """
        X_new = X.copy()
        from newt.features.analysis.woe_calculator import WOEEncoder

        target_features = [col for col in self.binners_.keys() if col in X_new.columns]
        missing_woe = [col for col in target_features if col not in self.woe_maps_]
        if missing_woe:
            missing = ", ".join(missing_woe)
            raise ValueError(
                f"WOE mappings are missing for feature(s): {missing}. "
                "Call fit_woe() before woe_transform()."
            )

        for col in self.binners_.keys():
            if col not in X_new.columns:
                continue

            woe_map = self.woe_maps_[col]
            iv = self.ivs_.get(col, 0.0)

            # Create temporary encoder for transformation
            encoder = WOEEncoder()
            encoder.woe_map_ = woe_map
            encoder.iv_ = iv
            encoder.is_fitted_ = True

            # First bin the data
            col_data = X[col]
            valid_mask = col_data.notna()
            binned = pd.Series(index=col_data.index, dtype=object)

            if valid_mask.any():
                valid_binned = self.binners_[col].transform(col_data[valid_mask])
                binned[valid_mask] = valid_binned.astype(str)

            binned[~valid_mask] = self._missing_label

            # Apply WOE transformation
            X_new[col] = encoder.transform(binned)

        return X_new

    def __getitem__(self, feature: str) -> Union[BinningResult, pd.DataFrame]:
        """
        Get binning result proxy for a feature.

        Parameters
        ----------
        feature : str
            Feature name.

        Returns
        -------
        BinningResult
            Proxy object with stats and plot methods.
        """
        if feature not in self.binners_:
            raise KeyError(f"Feature '{feature}' is missing from binner.")

        return BinningResult(self, feature)

    def stats(self) -> Dict[str, pd.DataFrame]:
        """Get dictionary of statistics for all features."""
        try:
            from IPython.display import display

            HAS_IPYTHON = True
        except ImportError:
            HAS_IPYTHON = False

        result = {}
        for feat in self._features:
            if feat in self.binners_:
                result[feat] = self[feat].stats
                print(f"--- Binning Result: {feat} ---")

                # Render stats table
                if HAS_IPYTHON:
                    display(self[feat].stats)
                else:
                    print(self[feat].stats)

        return result

    def stats_plot(self):
        """Display stats and plot for all features."""
        try:
            from IPython.display import display

            HAS_IPYTHON = True
        except ImportError:
            HAS_IPYTHON = False

        for feat in self._features:
            if feat in self.binners_:
                print(f"--- Binning Result: {feat} ---")

                # Render stats table
                if HAS_IPYTHON:
                    display(self[feat].stats)
                else:
                    print(self[feat].stats)

                # Plot
                fig = self[feat].plot()
                if HAS_IPYTHON:
                    display(fig)
                else:
                    try:
                        import matplotlib.pyplot as plt

                        plt.show()
                    except ImportError:
                        pass

    def woe_map(self) -> Dict[str, Dict[Any, float]]:
        """Get WOE maps for all features."""
        return {
            feat: self.get_woe_map(feat)
            for feat in self._features
            if feat in self.binners_
        }

    def __contains__(self, feature: str) -> bool:
        """Check if feature is in binner."""
        return feature in self.binners_

    def __iter__(self):
        """Iterate over feature names."""
        return iter(self._features)

    def __len__(self) -> int:
        """Number of binned features."""
        return len(self.binners_)

    def features(self) -> List[str]:
        """Get list of binned feature names."""
        return list(self.binners_.keys())
Attributes
woe_encoders_ property

Get WOE encoders dictionary (for backward compatibility).

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Mapping of feature names to WOEEncoder objects.

Functions
__contains__(feature)

Check if feature is in binner.

Source code in src/newt/features/binning/binner.py
950
951
952
def __contains__(self, feature: str) -> bool:
    """Check if feature is in binner."""
    return feature in self.binners_
__getitem__(feature)

Get binning result proxy for a feature.

Parameters

feature : str Feature name.

Returns

BinningResult Proxy object with stats and plot methods.

Source code in src/newt/features/binning/binner.py
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
def __getitem__(self, feature: str) -> Union[BinningResult, pd.DataFrame]:
    """
    Get binning result proxy for a feature.

    Parameters
    ----------
    feature : str
        Feature name.

    Returns
    -------
    BinningResult
        Proxy object with stats and plot methods.
    """
    if feature not in self.binners_:
        raise KeyError(f"Feature '{feature}' is missing from binner.")

    return BinningResult(self, feature)
__init__()

Initialize the Binner.

Source code in src/newt/features/binning/binner.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def __init__(self):
    """Initialize the Binner."""
    self.rules_: Dict[str, List[float]] = {}
    self.method_map = {
        "chi": ChiMergeBinner,
        "dt": DecisionTreeBinner,
        "kmean": KMeansBinner,
        "quantile": EqualFrequencyBinner,
        "step": EqualWidthBinner,
        "opt": OptBinningBinner,
    }
    self.binners_: Dict[str, BaseBinner] = {}
    self.woe_maps_: Dict[str, Dict[Any, float]] = {}
    self.ivs_: Dict[str, float] = {}
    self.stats_: Dict[str, pd.DataFrame] = {}
    self._X: Optional[pd.DataFrame] = None
    self._y: Optional[pd.Series] = None
    self._features: List[str] = []
    self._missing_label = "Missing"
__iter__()

Iterate over feature names.

Source code in src/newt/features/binning/binner.py
954
955
956
def __iter__(self):
    """Iterate over feature names."""
    return iter(self._features)
__len__()

Number of binned features.

Source code in src/newt/features/binning/binner.py
958
959
960
def __len__(self) -> int:
    """Number of binned features."""
    return len(self.binners_)
features()

Get list of binned feature names.

Source code in src/newt/features/binning/binner.py
962
963
964
def features(self) -> List[str]:
    """Get list of binned feature names."""
    return list(self.binners_.keys())
fit(X, y=None, method='chi', n_bins=BINNING.DEFAULT_N_BINS, min_samples=None, cols=None, monotonic=None, show_progress=True, **kwargs)

Fit the binning model to multiple features.

Initializes and fits specific binning algorithms for each selected feature, calculates binning statistics, and stores WOE mappings.

Parameters:

Name Type Description Default
X DataFrame

Data to be binned.

required
y Optional[Union[Series, str]]

Target data or target column name. Required for supervised methods.

None
method str

Binning algorithm name ('chi', 'dt', 'opt', 'kmean', etc.).

'chi'
n_bins int

Target number of bins.

DEFAULT_N_BINS
min_samples Union[int, float, None]

Minimum samples threshold. - For 'dt': minimum samples per leaf. - For 'chi': float in (0, 1] means minimum bin proportion, int means minimum absolute samples per bin.

None
cols Optional[List[str]]

List of columns to bin. If None, all numeric columns are selected.

None
monotonic Union[bool, str, None]

Enforce monotonic bad rate trend. - True/'auto': Enforce auto-detected trend. - 'ascending'/'descending': Enforce specific trend.

None
show_progress bool

Whether to show a progress bar.

True
**kwargs

Additional parameters passed to the underlying binner.

{}

Returns:

Name Type Description
Binner Binner

The fitted Binner instance.

Examples:

>>> binner.fit(df, target='default', method='chi', monotonic=True)
Source code in src/newt/features/binning/binner.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
def fit(
    self,
    X: pd.DataFrame,
    y: Optional[Union[pd.Series, str]] = None,
    method: str = "chi",
    n_bins: int = BINNING.DEFAULT_N_BINS,
    min_samples: Union[int, float, None] = None,
    cols: Optional[List[str]] = None,
    monotonic: Union[bool, str, None] = None,
    show_progress: bool = True,
    **kwargs,
) -> "Binner":
    """Fit the binning model to multiple features.

    Initializes and fits specific binning algorithms for each selected feature,
    calculates binning statistics, and stores WOE mappings.

    Args:
        X: Data to be binned.
        y: Target data or target column name. Required for supervised methods.
        method: Binning algorithm name ('chi', 'dt', 'opt', 'kmean', etc.).
        n_bins: Target number of bins.
        min_samples: Minimum samples threshold.
            - For 'dt': minimum samples per leaf.
            - For 'chi': float in (0, 1] means minimum bin proportion,
              int means minimum absolute samples per bin.
        cols: List of columns to bin. If None, all numeric columns are selected.
        monotonic: Enforce monotonic bad rate trend.
            - True/'auto': Enforce auto-detected trend.
            - 'ascending'/'descending': Enforce specific trend.
        show_progress: Whether to show a progress bar.
        **kwargs: Additional parameters passed to the underlying binner.

    Returns:
        Binner: The fitted Binner instance.

    Examples:
        >>> binner.fit(df, target='default', method='chi', monotonic=True)
    """
    X, y_series = self._prepare_fit_inputs(X, y, method)
    numeric_cols = self._resolve_fit_columns(X, cols)
    feature_min_samples = self._resolve_feature_min_samples(
        X=X,
        numeric_cols=numeric_cols,
        min_samples=min_samples,
    )
    self._reset_fit_state(X, y_series, numeric_cols)

    tqdm = self._load_tqdm()

    # Determine if we can use batch Rust ChiMerge
    rust_module = _load_rust_engine()
    use_batch_rust = (
        method == "chi"
        and y_series is not None
        and rust_module
        and hasattr(rust_module, "calculate_batch_chi_merge_numpy")
    )

    if use_batch_rust:
        from scipy import stats

        threshold = float(stats.chi2.ppf(1 - (kwargs.get("alpha", 0.05)), 1))
        binner_cls = self.method_map.get(method)
        if binner_cls is None:
            raise ValueError(f"Unknown method: {method}")

        kwargs_binner = {"n_bins": n_bins, "monotonic": monotonic}
        if min_samples is not None:
            kwargs_binner["min_samples"] = min_samples
        kwargs_binner.update(kwargs)

        pbar = (
            tqdm(
                total=len(numeric_cols),
                desc="Binning features (Rust Batch)",
                disable=not show_progress,
            )
            if tqdm
            else None
        )

        # Group columns by missing-mask so each batch call can share the same y.
        grouped_cols: Dict[bytes, Dict[str, Any]] = {}
        feature_meta: Dict[str, Dict[str, Any]] = {}
        for col in numeric_cols:
            binner = binner_cls(**kwargs_binner)
            col_data = X[col]
            valid_mask = col_data.notna()
            if valid_mask.sum() == 0:
                if pbar is not None:
                    pbar.update(1)
                continue

            feature_meta[col] = {
                "binner": binner,
                "col_data": col_data,
                "valid_mask": valid_mask,
            }

            mask_key = valid_mask.to_numpy(dtype=np.bool_, copy=False).tobytes()
            if mask_key not in grouped_cols:
                grouped_cols[mask_key] = {"mask": valid_mask, "cols": []}
            grouped_cols[mask_key]["cols"].append(col)

        def _fit_single_column_fallback(col: str):
            meta = feature_meta[col]
            binner = meta["binner"]
            col_data = meta["col_data"]
            valid_mask = meta["valid_mask"]
            binner.fit(col_data[valid_mask], y_series[valid_mask])
            self._store_feature_binner(
                feature=col,
                binner=binner,
                col_data=col_data,
                y_series=y_series,
                min_sample_count=feature_min_samples.get(col),
            )
            if pbar is not None:
                pbar.update(1)

        for group in grouped_cols.values():
            valid_mask = group["mask"]
            cols_in_group = group["cols"]
            y_arr = y_series[valid_mask].astype(np.int64).to_numpy()
            feature_arrays = [
                X[col][valid_mask].astype(np.float64).to_numpy()
                for col in cols_in_group
            ]
            min_sample_count = _resolve_chi_min_samples_count(
                kwargs_binner.get("min_samples", 0.05),
                len(y_arr),
                context="Binner.fit(method='chi')",
            )

            try:
                batch_splits = rust_module.calculate_batch_chi_merge_numpy(
                    feature_arrays,
                    y_arr,
                    n_bins,
                    threshold,
                    min_sample_count,
                )
            except Exception:
                for col in cols_in_group:
                    _fit_single_column_fallback(col)
                continue

            if len(batch_splits) != len(cols_in_group):
                for col in cols_in_group:
                    _fit_single_column_fallback(col)
                continue

            split_lists = self._normalize_split_lists(batch_splits)
            adjusted_split_lists, monotonic_success = (
                self._adjust_batch_monotonic_splits(
                    rust_module=rust_module,
                    feature_arrays=feature_arrays,
                    y_arr=y_arr,
                    split_lists=split_lists,
                    monotonic=monotonic,
                )
            )

            for split_idx, col in enumerate(cols_in_group):
                meta = feature_meta[col]
                binner = meta["binner"]
                col_data = meta["col_data"]
                valid_mask = meta["valid_mask"]

                try:
                    split_list = split_lists[split_idx]
                    if binner.monotonic:
                        if monotonic_success[split_idx]:
                            split_list = adjusted_split_lists[split_idx]
                        else:
                            split_list = BaseBinner._adjust_monotonicity(
                                binner,
                                col_data[valid_mask],
                                y_series[valid_mask],
                                split_list,
                            )

                    binner.splits_ = split_list
                    self._store_feature_binner(
                        feature=col,
                        binner=binner,
                        col_data=col_data,
                        y_series=y_series,
                        min_sample_count=feature_min_samples.get(col),
                    )
                except Exception:
                    _fit_single_column_fallback(col)
                    continue

                if pbar is not None:
                    pbar.update(1)

        if pbar is not None:
            pbar.close()

    else:
        # Sequential fallback
        self._fit_sequential_features(
            X=X,
            y_series=y_series,
            method=method,
            n_bins=n_bins,
            min_samples=min_samples,
            monotonic=monotonic,
            numeric_cols=numeric_cols,
            feature_min_samples=feature_min_samples,
            show_progress=show_progress,
            extra_kwargs=kwargs,
            tqdm=tqdm,
        )

    # Calculate and store statistics
    self.fit_woe(X, y_series, show_progress=show_progress)

    return self
fit_woe(X, y, show_progress=True)

Calculate and update WOE mappings for all features.

Applicable when rules are loaded or manually set. This method updates WOE and IV statistics without changing existing split points.

Parameters:

Name Type Description Default
X DataFrame

Input DataFrame.

required
y Union[Series, str]

Target data or target column name.

required
show_progress bool

Whether to show a progress bar.

True

Returns:

Name Type Description
Binner Binner

Self.

Source code in src/newt/features/binning/binner.py
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
def fit_woe(
    self,
    X: pd.DataFrame,
    y: Union[pd.Series, str],
    show_progress: bool = True,
) -> "Binner":
    """Calculate and update WOE mappings for all features.

    Applicable when rules are loaded or manually set. This method updates
    WOE and IV statistics without changing existing split points.

    Args:
        X: Input DataFrame.
        y: Target data or target column name.
        show_progress: Whether to show a progress bar.

    Returns:
        Binner: Self.
    """
    y_series = y
    if isinstance(y, str):
        y_series = X[y]

    self._X = X.copy()
    self._y = y_series.copy() if y_series is not None else None

    if self._y is None:
        return self

    self._update_all_stats()
    return self
stats()

Get dictionary of statistics for all features.

Source code in src/newt/features/binning/binner.py
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
def stats(self) -> Dict[str, pd.DataFrame]:
    """Get dictionary of statistics for all features."""
    try:
        from IPython.display import display

        HAS_IPYTHON = True
    except ImportError:
        HAS_IPYTHON = False

    result = {}
    for feat in self._features:
        if feat in self.binners_:
            result[feat] = self[feat].stats
            print(f"--- Binning Result: {feat} ---")

            # Render stats table
            if HAS_IPYTHON:
                display(self[feat].stats)
            else:
                print(self[feat].stats)

    return result
stats_plot()

Display stats and plot for all features.

Source code in src/newt/features/binning/binner.py
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
def stats_plot(self):
    """Display stats and plot for all features."""
    try:
        from IPython.display import display

        HAS_IPYTHON = True
    except ImportError:
        HAS_IPYTHON = False

    for feat in self._features:
        if feat in self.binners_:
            print(f"--- Binning Result: {feat} ---")

            # Render stats table
            if HAS_IPYTHON:
                display(self[feat].stats)
            else:
                print(self[feat].stats)

            # Plot
            fig = self[feat].plot()
            if HAS_IPYTHON:
                display(fig)
            else:
                try:
                    import matplotlib.pyplot as plt

                    plt.show()
                except ImportError:
                    pass
transform(X, labels=False, show_progress=False)

Discretizes values based on splits discovered during fitting. Missing values are automatically assigned to a 'Missing' bin.

Parameters:

Name Type Description Default
X DataFrame

Data to transform.

required
labels bool

If True, return bin intervals (str). If False, return bin indices (int).

False
show_progress bool

Whether to show a progress bar.

False

Returns:

Type Description
DataFrame

pd.DataFrame: Binned data with original columns replaced by bin codes/labels.

Source code in src/newt/features/binning/binner.py
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
def transform(
    self,
    X: pd.DataFrame,
    labels: bool = False,
    show_progress: bool = False,
) -> pd.DataFrame:
    """Discretizes values based on splits discovered during fitting. Missing
    values are automatically assigned to a 'Missing' bin.

    Args:
        X: Data to transform.
        labels: If True, return bin intervals (str).
            If False, return bin indices (int).
        show_progress: Whether to show a progress bar.

    Returns:
        pd.DataFrame: Binned data with original columns replaced by
            bin codes/labels.
    """
    X_new = X.copy()

    # tqdm for progress tracking
    try:
        from tqdm.auto import tqdm

        pbar = tqdm(
            self.binners_.items(),
            desc="Transforming features",
            disable=not show_progress,
        )
    except ImportError:
        pbar = self.binners_.items()

    for col, binner in pbar:
        if col not in X_new.columns:
            continue

        col_data = X[col]
        valid_mask = col_data.notna()

        # Transform valid values
        binned = pd.Series(index=col_data.index, dtype=object)

        if valid_mask.any():
            valid_binned = binner.transform(col_data[valid_mask])

            if labels:
                binned[valid_mask] = valid_binned.astype(str)
            else:
                binned[valid_mask] = valid_binned.cat.codes

        # Handle missing values - separate bin
        binned[~valid_mask] = self._missing_label if labels else -1

        X_new[col] = binned

    return X_new
woe_map()

Get WOE maps for all features.

Source code in src/newt/features/binning/binner.py
942
943
944
945
946
947
948
def woe_map(self) -> Dict[str, Dict[Any, float]]:
    """Get WOE maps for all features."""
    return {
        feat: self.get_woe_map(feat)
        for feat in self._features
        if feat in self.binners_
    }
woe_transform(X)

Convenience method to bin and WOE-transform data in one pass.

Parameters:

Name Type Description Default
X DataFrame

Raw feature DataFrame.

required

Returns:

Type Description
DataFrame

pd.DataFrame: WOE-encoded DataFrame.

Examples:

>>> X_woe = binner.woe_transform(X_raw)
Source code in src/newt/features/binning/binner.py
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
def woe_transform(self, X: pd.DataFrame) -> pd.DataFrame:
    """Convenience method to bin and WOE-transform data in one pass.

    Args:
        X: Raw feature DataFrame.

    Returns:
        pd.DataFrame: WOE-encoded DataFrame.

    Examples:
        >>> X_woe = binner.woe_transform(X_raw)
    """
    X_new = X.copy()
    from newt.features.analysis.woe_calculator import WOEEncoder

    target_features = [col for col in self.binners_.keys() if col in X_new.columns]
    missing_woe = [col for col in target_features if col not in self.woe_maps_]
    if missing_woe:
        missing = ", ".join(missing_woe)
        raise ValueError(
            f"WOE mappings are missing for feature(s): {missing}. "
            "Call fit_woe() before woe_transform()."
        )

    for col in self.binners_.keys():
        if col not in X_new.columns:
            continue

        woe_map = self.woe_maps_[col]
        iv = self.ivs_.get(col, 0.0)

        # Create temporary encoder for transformation
        encoder = WOEEncoder()
        encoder.woe_map_ = woe_map
        encoder.iv_ = iv
        encoder.is_fitted_ = True

        # First bin the data
        col_data = X[col]
        valid_mask = col_data.notna()
        binned = pd.Series(index=col_data.index, dtype=object)

        if valid_mask.any():
            valid_binned = self.binners_[col].transform(col_data[valid_mask])
            binned[valid_mask] = valid_binned.astype(str)

        binned[~valid_mask] = self._missing_label

        # Apply WOE transformation
        X_new[col] = encoder.transform(binned)

    return X_new

newt.features.binning.supervised

Classes

ChiMergeBinner

Bases: BaseBinner

Discretizes continuous data using the ChiMerge algorithm.

ChiMerge is a bottom-up merging algorithm that starts with each unique value as a bin and iteratively merges adjacent bins if they are statistically similar (based on Chi-square test).

Examples:

>>> binner = ChiMergeBinner(n_bins=5, alpha=0.05)
>>> binner.fit(X_series, y_series)
Source code in src/newt/features/binning/supervised.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
class ChiMergeBinner(BaseBinner):
    """Discretizes continuous data using the ChiMerge algorithm.

    ChiMerge is a bottom-up merging algorithm that starts with each unique value
    as a bin and iteratively merges adjacent bins if they are statistically
    similar (based on Chi-square test).

    Examples:
        >>> binner = ChiMergeBinner(n_bins=5, alpha=0.05)
        >>> binner.fit(X_series, y_series)
    """

    def __init__(
        self,
        n_bins: int = 5,
        monotonic: Union[bool, str, None] = None,
        alpha: float = 0.05,
        min_samples: Union[int, float] = 0.05,
        **kwargs,
    ):
        """Initialize ChiMergeBinner.

        Args:
            n_bins: Target number of bins.
            monotonic: Enforce monotonic trend.
            alpha: Significance level for Chi-square test (merges if p > alpha).
            min_samples: Minimum samples per bin. Float is treated as fraction
                in (0, 1], int as absolute count.
            **kwargs: Arguments passed to BaseBinner.
        """
        super().__init__(n_bins=n_bins, monotonic=monotonic, **kwargs)
        self.alpha = alpha
        self.min_samples = min_samples

    def _fit_splits(self, X: pd.Series, y: Optional[pd.Series] = None) -> List[float]:
        """
        Fast ChiMerge Implementation.
        """
        if y is None:
            raise ValueError("ChiMergeBinner requires target 'y'.")

        # 1. Prepare data
        y_series = _validate_chi_target(pd.Series(y), context="ChiMergeBinner")
        X_series = pd.Series(X)
        valid_mask = X_series.notna()
        X_arr = X_series[valid_mask].to_numpy(dtype=np.float64)
        y_arr = y_series[valid_mask].to_numpy(dtype=np.int64)

        if len(X_arr) == 0:
            return []

        threshold = float(stats.chi2.ppf(1 - self.alpha, 1))
        min_sample_count = _resolve_chi_min_samples_count(
            self.min_samples,
            len(X_arr),
            context="ChiMergeBinner",
        )

        # 2. Try Rust engine first
        rust_module = _load_rust_engine()
        if rust_module and hasattr(rust_module, "calculate_chi_merge_numpy"):
            try:
                splits = rust_module.calculate_chi_merge_numpy(
                    X_arr,
                    y_arr,
                    self.n_bins,
                    threshold,
                    min_sample_count,
                )
                return sorted(splits)
            except Exception:
                # Fallback to Python if Rust fails
                pass

        # 3. Initial binning for Python fallback
        sort_idx = np.argsort(X_arr)
        X_sorted = X_arr[sort_idx]
        y_sorted = y_arr[sort_idx]
        unique_vals, counts = np.unique(X_sorted, return_counts=True)

        event_counts = []
        start = 0
        for count in counts:
            end = start + count
            event_counts.append(np.sum(y_sorted[start:end]))
            start = end

        bins = list(zip(unique_vals, counts, event_counts))

        # 4. Merge iterations (Python fallback)
        max_bins = max(int(self.n_bins), 1)
        bins = self._merge_until_hard_cap(bins, max_bins)
        bins = self._merge_until_threshold(bins, threshold)
        bins = self._merge_for_min_samples(bins, min_sample_count)

        # 5. Extract splits
        return _calculate_cut_points_from_bins(bins)

    def _adjust_monotonicity(
        self, X: pd.Series, y: pd.Series, splits: List[float]
    ) -> List[float]:
        """Use native monotonic adjustment when available."""
        split_list = sorted(list(set(splits)))
        if not split_list:
            return []

        rust_module = _load_rust_engine()
        if rust_module and hasattr(rust_module, "adjust_chi_merge_monotonic_numpy"):
            try:
                df = pd.DataFrame({"X": X, "y": y}).dropna()
                if df.empty:
                    return []

                adjusted = rust_module.adjust_chi_merge_monotonic_numpy(
                    df["X"].to_numpy(dtype=np.float64),
                    df["y"].to_numpy(dtype=np.int64),
                    split_list,
                    _resolve_monotonic_mode(self.monotonic),
                )
                return sorted(list(set(adjusted)))
            except Exception:
                # Fall back to Python monotonic adjustment on any native failure.
                pass

        return super()._adjust_monotonicity(X, y, split_list)

    def _compute_chi_squares(self, bins):
        if len(bins) < 2:
            return np.array([])

        n_bins = len(bins)
        chi_squares = np.zeros(n_bins - 1)

        for i in range(n_bins - 1):
            n1, e1 = bins[i][1], bins[i][2]
            n2, e2 = bins[i + 1][1], bins[i + 1][2]

            total_n = n1 + n2
            total_e = e1 + e2
            total_ne = total_n - total_e

            if total_n == 0:
                chi_squares[i] = 0
                continue

            e1_expected = n1 * total_e / total_n
            e2_expected = n2 * total_e / total_n
            ne1_expected = n1 * total_ne / total_n
            ne2_expected = n2 * total_ne / total_n

            # Add eps to avoid div by zero
            e1_expected = max(e1_expected, 1e-9)
            e2_expected = max(e2_expected, 1e-9)
            ne1_expected = max(ne1_expected, 1e-9)
            ne2_expected = max(ne2_expected, 1e-9)

            chi2 = (
                (abs(e1 - e1_expected) - 0.5) ** 2 / e1_expected
                + (abs(e2 - e2_expected) - 0.5) ** 2 / e2_expected
                + (abs(n1 - e1 - ne1_expected) - 0.5) ** 2 / ne1_expected
                + (abs(n2 - e2 - ne2_expected) - 0.5) ** 2 / ne2_expected
            )
            chi_squares[i] = chi2

        return chi_squares

    def _merge_bins(self, bins, idx):
        val1, n1, e1 = bins[idx]
        val2, n2, e2 = bins[idx + 1]

        merged = (val1, n1 + n2, e1 + e2)
        new_bins = bins[:idx] + [merged] + bins[idx + 2 :]
        return new_bins

    def _merge_until_hard_cap(self, bins, max_bins: int):
        """Merge adjacent bins by smallest chi-square until bin count cap is met."""
        current = list(bins)
        while len(current) > max_bins:
            chi_squares = self._compute_chi_squares(current)
            if len(chi_squares) == 0:
                break
            min_idx = int(np.argmin(chi_squares))
            current = self._merge_bins(current, min_idx)
        return current

    def _merge_until_threshold(self, bins, threshold: float):
        """Merge adjacent bins while smallest chi-square is below threshold."""
        current = list(bins)
        while len(current) > 1:
            chi_squares = self._compute_chi_squares(current)
            if len(chi_squares) == 0:
                break

            min_idx = int(np.argmin(chi_squares))
            min_chi2 = float(chi_squares[min_idx])
            if min_chi2 >= threshold:
                break
            current = self._merge_bins(current, min_idx)
        return current

    def _merge_for_min_samples(self, bins, min_sample_count: int):
        """Merge bins until all bins satisfy minimum count or only one remains."""
        current = list(bins)
        while len(current) > 1:
            small_bin_indexes = [
                i for i, (_, count, _) in enumerate(current) if count < min_sample_count
            ]
            if not small_bin_indexes:
                break

            chi_squares = self._compute_chi_squares(current)
            if len(chi_squares) == 0:
                break

            candidate_edges = set()
            for idx in small_bin_indexes:
                if idx > 0:
                    candidate_edges.add(idx - 1)
                if idx < len(current) - 1:
                    candidate_edges.add(idx)

            if not candidate_edges:
                break

            min_idx = min(
                candidate_edges,
                key=lambda edge_idx: (float(chi_squares[edge_idx]), edge_idx),
            )
            current = self._merge_bins(current, min_idx)
        return current
Functions
__init__(n_bins=5, monotonic=None, alpha=0.05, min_samples=0.05, **kwargs)

Initialize ChiMergeBinner.

Parameters:

Name Type Description Default
n_bins int

Target number of bins.

5
monotonic Union[bool, str, None]

Enforce monotonic trend.

None
alpha float

Significance level for Chi-square test (merges if p > alpha).

0.05
min_samples Union[int, float]

Minimum samples per bin. Float is treated as fraction in (0, 1], int as absolute count.

0.05
**kwargs

Arguments passed to BaseBinner.

{}
Source code in src/newt/features/binning/supervised.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def __init__(
    self,
    n_bins: int = 5,
    monotonic: Union[bool, str, None] = None,
    alpha: float = 0.05,
    min_samples: Union[int, float] = 0.05,
    **kwargs,
):
    """Initialize ChiMergeBinner.

    Args:
        n_bins: Target number of bins.
        monotonic: Enforce monotonic trend.
        alpha: Significance level for Chi-square test (merges if p > alpha).
        min_samples: Minimum samples per bin. Float is treated as fraction
            in (0, 1], int as absolute count.
        **kwargs: Arguments passed to BaseBinner.
    """
    super().__init__(n_bins=n_bins, monotonic=monotonic, **kwargs)
    self.alpha = alpha
    self.min_samples = min_samples

DecisionTreeBinner

Bases: BaseBinner

Discretizes continuous data using a Decision Tree to find optimal splits.

Uses a classification tree to split the feature based on its relationship with the target variable. This method naturally finds boundaries that maximize separation between classes.

Examples:

>>> binner = DecisionTreeBinner(n_bins=5, min_samples_leaf=0.1)
>>> binner.fit(X_series, y_series)
>>> print(binner.splits_)
Source code in src/newt/features/binning/supervised.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class DecisionTreeBinner(BaseBinner):
    """Discretizes continuous data using a Decision Tree to find optimal splits.

    Uses a classification tree to split the feature based on its relationship with
    the target variable. This method naturally finds boundaries that maximize
    separation between classes.

    Examples:
        >>> binner = DecisionTreeBinner(n_bins=5, min_samples_leaf=0.1)
        >>> binner.fit(X_series, y_series)
        >>> print(binner.splits_)
    """

    def __init__(
        self,
        n_bins: int = 5,
        monotonic: Union[bool, str, None] = None,
        min_samples_leaf: float = 0.05,
        **kwargs,
    ):
        """Initialize DecisionTreeBinner.

        Args:
            n_bins: Maximum number of bins (max_leaf_nodes).
            monotonic: Enforce monotonic trend.
            min_samples_leaf: Minimum fraction of samples required in a leaf.
            **kwargs: Arguments passed to BaseBinner.
        """
        super().__init__(n_bins=n_bins, monotonic=monotonic, **kwargs)
        self.min_samples_leaf = min_samples_leaf

    def _fit_splits(self, X: pd.Series, y: Optional[pd.Series] = None) -> List[float]:
        if y is None:
            raise ValueError("DecisionTreeBinner requires target 'y'.")

        # Remove NaNs for tree training
        mask = (~X.isna()) & (~y.isna())
        X_clean = X[mask].values.reshape(-1, 1)
        y_clean = y[mask].values

        if len(X_clean) == 0:
            return []

        clf = DecisionTreeClassifier(
            max_leaf_nodes=self.n_bins,
            min_samples_leaf=self.min_samples_leaf,
            random_state=42,
        )
        clf.fit(X_clean, y_clean)

        # Extract thresholds
        # The tree stores thresholds in tree_.threshold
        # Only non-leaf nodes have valid thresholds (others are -2)
        thresholds = clf.tree_.threshold
        splits = [t for t in thresholds if t != -2]
        return sorted(splits)
Functions
__init__(n_bins=5, monotonic=None, min_samples_leaf=0.05, **kwargs)

Initialize DecisionTreeBinner.

Parameters:

Name Type Description Default
n_bins int

Maximum number of bins (max_leaf_nodes).

5
monotonic Union[bool, str, None]

Enforce monotonic trend.

None
min_samples_leaf float

Minimum fraction of samples required in a leaf.

0.05
**kwargs

Arguments passed to BaseBinner.

{}
Source code in src/newt/features/binning/supervised.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def __init__(
    self,
    n_bins: int = 5,
    monotonic: Union[bool, str, None] = None,
    min_samples_leaf: float = 0.05,
    **kwargs,
):
    """Initialize DecisionTreeBinner.

    Args:
        n_bins: Maximum number of bins (max_leaf_nodes).
        monotonic: Enforce monotonic trend.
        min_samples_leaf: Minimum fraction of samples required in a leaf.
        **kwargs: Arguments passed to BaseBinner.
    """
    super().__init__(n_bins=n_bins, monotonic=monotonic, **kwargs)
    self.min_samples_leaf = min_samples_leaf

OptBinningBinner

Bases: BaseBinner

Discretizes continuous data using the optbinning library.

Provides a wrapper for the Optimal Binning algorithm which uses constrained programming to find splits that optimize information value (IV).

Note: Requires optbinning and is only available on Python < 3.12.

Examples:

>>> binner = OptBinningBinner(n_bins=5, monotonic='ascending')
>>> binner.fit(X, y)
Source code in src/newt/features/binning/supervised.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
class OptBinningBinner(BaseBinner):
    """Discretizes continuous data using the `optbinning` library.

    Provides a wrapper for the Optimal Binning algorithm which uses constrained
    programming to find splits that optimize information value (IV).

    Note: Requires `optbinning` and is only available on Python < 3.12.

    Examples:
        >>> binner = OptBinningBinner(n_bins=5, monotonic='ascending')
        >>> binner.fit(X, y)
    """

    def __init__(
        self,
        n_bins: int = 5,
        monotonic: Union[bool, str, None] = None,
        **kwargs,
    ):
        """Initialize OptBinningBinner.

        Args:
            n_bins: Maximum number of bins.
            monotonic: Monotonic constraint setting.
            **kwargs: Arguments passed to `optbinning.OptimalBinning`.
        """
        # OptBinning handles monotonicity internally, so we don't pass to base
        super().__init__(n_bins=n_bins, monotonic=None)
        self.monotonic_setting = monotonic
        self.kwargs = kwargs

    def _fit_splits(self, X: pd.Series, y: Optional[pd.Series] = None) -> List[float]:
        if OptimalBinning is None:
            raise ImportError(
                "optbinning is not installed. "
                "Install the optional dependency with "
                '`pip install "newt[optbinning]"`.'
            )

        if y is None:
            raise ValueError("OptBinningBinner requires target 'y'.")

        # Map monotonic parameter to OptBinning's monotonic_trend
        if self.monotonic_setting is None or self.monotonic_setting is False:
            monotonic_trend = "auto"
        elif self.monotonic_setting is True or self.monotonic_setting == "auto":
            monotonic_trend = "auto_asc_desc"
        elif self.monotonic_setting == "ascending":
            monotonic_trend = "ascending"
        elif self.monotonic_setting == "descending":
            monotonic_trend = "descending"
        else:
            monotonic_trend = "auto"

        opt = OptimalBinning(
            name="feature",
            dtype="numerical",
            max_n_bins=self.n_bins,
            monotonic_trend=monotonic_trend,
            **self.kwargs,
        )

        opt.fit(X.values, y.values)

        # Get splits
        return sorted(opt.splits.tolist())
Functions
__init__(n_bins=5, monotonic=None, **kwargs)

Initialize OptBinningBinner.

Parameters:

Name Type Description Default
n_bins int

Maximum number of bins.

5
monotonic Union[bool, str, None]

Monotonic constraint setting.

None
**kwargs

Arguments passed to optbinning.OptimalBinning.

{}
Source code in src/newt/features/binning/supervised.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def __init__(
    self,
    n_bins: int = 5,
    monotonic: Union[bool, str, None] = None,
    **kwargs,
):
    """Initialize OptBinningBinner.

    Args:
        n_bins: Maximum number of bins.
        monotonic: Monotonic constraint setting.
        **kwargs: Arguments passed to `optbinning.OptimalBinning`.
    """
    # OptBinning handles monotonicity internally, so we don't pass to base
    super().__init__(n_bins=n_bins, monotonic=None)
    self.monotonic_setting = monotonic
    self.kwargs = kwargs

newt.features.binning.unsupervised

Classes

EqualWidthBinner

Bases: BaseBinner

Discretizes continuous data into intervals of equal width.

This method divides the range of values into 'n_bins' equal-sized intervals. Useful for uniform distributions or when the physical scale of the feature is the primary concern.

Examples:

>>> binner = EqualWidthBinner(n_bins=5)
>>> binner.fit(X_series)
Source code in src/newt/features/binning/unsupervised.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class EqualWidthBinner(BaseBinner):
    """Discretizes continuous data into intervals of equal width.

    This method divides the range of values into 'n_bins' equal-sized intervals.
    Useful for uniform distributions or when the physical scale of the feature
    is the primary concern.

    Examples:
        >>> binner = EqualWidthBinner(n_bins=5)
        >>> binner.fit(X_series)
    """

    def __init__(self, **kwargs):
        """Initialize EqualWidthBinner.

        Args:
            **kwargs: Arguments passed to BaseBinner.
        """
        super().__init__(**kwargs)

    def _fit_splits(self, X: pd.Series, y: Optional[pd.Series] = None) -> List[float]:
        # Use pd.cut with retbins to get splits including edges
        _, bins = pd.cut(X, bins=self.n_bins, retbins=True)
        # bins includes min and max. We only need internal splits.
        # bins is array([min, s1, s2, ..., max])
        # We need [s1, s2, ..., sn-1]
        # BaseBinner transforms using [-inf] + splits + [inf]
        # So we return the internal boundaries.
        if len(bins) <= 2:
            return []
        return list(bins[1:-1])
Functions
__init__(**kwargs)

Initialize EqualWidthBinner.

Parameters:

Name Type Description Default
**kwargs

Arguments passed to BaseBinner.

{}
Source code in src/newt/features/binning/unsupervised.py
21
22
23
24
25
26
27
def __init__(self, **kwargs):
    """Initialize EqualWidthBinner.

    Args:
        **kwargs: Arguments passed to BaseBinner.
    """
    super().__init__(**kwargs)

EqualFrequencyBinner

Bases: BaseBinner

Discretizes continuous data into intervals with an equal number of samples.

Also known as quantile binning. This method ensures that each bin contains approximately the same number of observations.

Examples:

>>> binner = EqualFrequencyBinner(n_bins=5)
>>> binner.fit(X_series)
Source code in src/newt/features/binning/unsupervised.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class EqualFrequencyBinner(BaseBinner):
    """Discretizes continuous data into intervals with an equal number of samples.

    Also known as quantile binning. This method ensures that each bin contains
    approximately the same number of observations.

    Examples:
        >>> binner = EqualFrequencyBinner(n_bins=5)
        >>> binner.fit(X_series)
    """

    def __init__(self, **kwargs):
        """Initialize EqualFrequencyBinner.

        Args:
            **kwargs: Arguments passed to BaseBinner.
        """
        super().__init__(**kwargs)

    def _fit_splits(self, X: pd.Series, y: Optional[pd.Series] = None) -> List[float]:
        # Use pd.qcut
        try:
            _, bins = pd.qcut(X, q=self.n_bins, duplicates="drop", retbins=True)
        except Exception:
            # Fallback to cut if qcut fails (e.g. all same values)
            _, bins = pd.cut(X, bins=self.n_bins, retbins=True)

        if len(bins) <= 2:
            return []
        return list(bins[1:-1])
Functions
__init__(**kwargs)

Initialize EqualFrequencyBinner.

Parameters:

Name Type Description Default
**kwargs

Arguments passed to BaseBinner.

{}
Source code in src/newt/features/binning/unsupervised.py
53
54
55
56
57
58
59
def __init__(self, **kwargs):
    """Initialize EqualFrequencyBinner.

    Args:
        **kwargs: Arguments passed to BaseBinner.
    """
    super().__init__(**kwargs)

KMeansBinner

Bases: BaseBinner

Discretizes continuous data using K-Means clustering.

This method finds 'n_bins' clusters in the 1D space and chooses boundaries as the midpoints between adjacent cluster centers.

Examples:

>>> binner = KMeansBinner(n_bins=5)
>>> binner.fit(X_series)
Source code in src/newt/features/binning/unsupervised.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class KMeansBinner(BaseBinner):
    """Discretizes continuous data using K-Means clustering.

    This method finds 'n_bins' clusters in the 1D space and chooses boundaries
    as the midpoints between adjacent cluster centers.

    Examples:
        >>> binner = KMeansBinner(n_bins=5)
        >>> binner.fit(X_series)
    """

    def __init__(self, **kwargs):
        """Initialize KMeansBinner.

        Args:
            **kwargs: Arguments passed to BaseBinner.
        """
        super().__init__(**kwargs)

    def _fit_splits(self, X: pd.Series, y: Optional[pd.Series] = None) -> List[float]:
        # Reshape for sklearn
        mask = ~X.isna()
        X_clean = X[mask].values.reshape(-1, 1)

        if len(X_clean) < self.n_bins:
            # Not enough data
            return []

        kmeans = KMeans(n_clusters=self.n_bins, random_state=42, n_init=10)
        kmeans.fit(X_clean)

        # The splits are usually defined as the midpoints between cluster centers.
        centers = sorted(kmeans.cluster_centers_.flatten())
        splits = [(centers[i] + centers[i + 1]) / 2 for i in range(len(centers) - 1)]
        return splits
Functions
__init__(**kwargs)

Initialize KMeansBinner.

Parameters:

Name Type Description Default
**kwargs

Arguments passed to BaseBinner.

{}
Source code in src/newt/features/binning/unsupervised.py
85
86
87
88
89
90
91
def __init__(self, **kwargs):
    """Initialize KMeansBinner.

    Args:
        **kwargs: Arguments passed to BaseBinner.
    """
    super().__init__(**kwargs)

Feature Selection

newt.features.selection.selector

Compatibility facade around feature analysis and feature filtering.

Classes

FeatureSelector

Unified tool for exploratory data analysis (EDA) and feature filtering.

The FeatureSelector calculates various feature-level metrics (IV, KS, correlation, missing rates) and provides a simple interface to filter features based on business thresholds.

Attributes:

Name Type Description
metrics Set[str]

The set of metrics calculated by the selector.

eda_summary_ DataFrame

Summary table of calculated statistics.

selected_features_ List[str]

List of column names that passed selection.

removed_features_ Dict[str, str]

Mapping of removed features to the reason.

corr_removed_ List[str]

List of features removed due to high correlation.

Examples:

>>> from newt.features.selection import FeatureSelector
>>> selector = FeatureSelector(metrics=['iv', 'missing_rate', 'correlation'])
>>> selector.fit(X_train, y_train)
>>> selector.select(iv_threshold=0.02, corr_threshold=0.8)
>>> X_filtered = selector.transform(X_train)
Source code in src/newt/features/selection/selector.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
class FeatureSelector:
    """Unified tool for exploratory data analysis (EDA) and feature filtering.

    The FeatureSelector calculates various feature-level metrics (IV, KS, correlation,
    missing rates) and provides a simple interface to filter features based on
    business thresholds.

    Attributes:
        metrics (Set[str]): The set of metrics calculated by the selector.
        eda_summary_ (pd.DataFrame): Summary table of calculated statistics.
        selected_features_ (List[str]): List of column names that passed selection.
        removed_features_ (Dict[str, str]): Mapping of removed features to the reason.
        corr_removed_ (List[str]): List of features removed due to high correlation.

    Examples:
        >>> from newt.features.selection import FeatureSelector
        >>> selector = FeatureSelector(metrics=['iv', 'missing_rate', 'correlation'])
        >>> selector.fit(X_train, y_train)
        >>> selector.select(iv_threshold=0.02, corr_threshold=0.8)
        >>> X_filtered = selector.transform(X_train)
    """

    def __init__(
        self,
        metrics: Optional[List[str]] = None,
        iv_bins: int = BINNING.DEFAULT_BUCKETS,
        lift_k: float = 0.1,
        corr_method: str = "pearson",
        engine: str = "auto",
    ):
        """Initialize the FeatureSelector.

        Args:
            metrics: Metrics to calculate. Options: 'iv', 'missing_rate', 'ks',
                'correlation', 'lift'. If None, uses a default set.
            iv_bins: Number of bins for initial IV calculation.
            lift_k: Fraction of population to use for Lift calculation (e.g., top 10%).
            corr_method: Correlation method ('pearson', 'spearman', 'kendall').
            engine: Execution engine ('auto', 'rust', 'python').
        """
        self._analyzer = FeatureAnalyzer(
            metrics=metrics,
            iv_bins=iv_bins,
            lift_k=lift_k,
            corr_method=corr_method,
            engine=engine,
        )
        self._filter = FeatureSelectionFilter(engine=engine)
        self.metrics: Set[str] = set(self._analyzer.metrics)
        self.engine = engine

        self.eda_summary_: pd.DataFrame = pd.DataFrame()
        self.analysis_result_: Optional[FeatureAnalysisResult] = None

        self.selected_features_: List[str] = []
        self.removed_features_: dict = {}
        self.corr_removed_: list = []
        self.selection_result_: Optional[FeatureSelectionResult] = None
        self.is_fitted_: bool = False
        self.is_selected_: bool = False

    def fit(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> "FeatureSelector":
        """Calculate feature statistics for the input DataFrame.

        Args:
            X: Input dataset.
            y: Target binary labels. Required for supervised metrics like IV or KS.

        Returns:
            FeatureSelector: The fitted selector instance.
        """
        self.analysis_result_ = self._analyzer.analyze(X, y)
        self.eda_summary_ = self.analysis_result_.summary.copy()
        self.corr_matrix_ = self.analysis_result_.corr_matrix.copy()
        self.is_fitted_ = True

        self.selected_features_ = (
            list(self.eda_summary_["feature"]) if not self.eda_summary_.empty else []
        )
        self.removed_features_ = {}
        self.corr_removed_ = []
        self.selection_result_ = FeatureSelectionResult(
            selected_features=list(self.selected_features_),
        )
        self.is_selected_ = False
        return self

    def select(
        self,
        iv_threshold: float = FILTERING.DEFAULT_IV_THRESHOLD,
        missing_threshold: float = FILTERING.DEFAULT_MISSING_THRESHOLD,
        corr_threshold: float = FILTERING.DEFAULT_CORR_THRESHOLD,
    ) -> "FeatureSelector":
        """Filter features based on thresholds for IV, missing rate, and correlation.

        Args:
            iv_threshold: Minimum Information Value (IV) to keep a feature.
            missing_threshold: Maximum missing rate (fraction) to keep a feature.
            corr_threshold: Maximum absolute correlation coefficient. If a pair
                exceeds this, the one with lower IV is removed.

        Returns:
            FeatureSelector: The selector instance after selection.

        Raises:
            ValueError: If called before fit().
        """
        if not self.is_fitted_:
            raise ValueError("FeatureSelector is not fitted. Call fit() first.")
        if self.analysis_result_ is None:
            raise ValueError("Feature analysis result is missing. Call fit() first.")

        self.selection_result_ = self._filter.select(
            analysis=self.analysis_result_,
            iv_threshold=iv_threshold,
            missing_threshold=missing_threshold,
            corr_threshold=corr_threshold,
        )
        self.selected_features_ = list(self.selection_result_.selected_features)
        self.removed_features_ = dict(self.selection_result_.removed_features)
        self.corr_removed_ = list(self.selection_result_.corr_removed)
        self.is_selected_ = True
        return self

    @requires_fit()
    def report(self) -> pd.DataFrame:
        """Generate a report combining EDA stats and selection status."""
        if self.analysis_result_ is None:
            return pd.DataFrame()

        return self.analysis_result_.report(
            selected_features=self.selected_features_,
            removed_features=self.removed_features_,
        )

    @requires_fit()
    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """Return X with only selected features."""
        if self.selection_result_ is None:
            return X[self.selected_features_]
        return self.selection_result_.transform(X)

    @property
    @requires_fit()
    def corr_matrix(self) -> pd.DataFrame:
        """Get the feature-to-feature correlation matrix."""
        if self.analysis_result_ is None:
            return pd.DataFrame()
        return self.analysis_result_.corr_matrix.copy()
Attributes
corr_matrix property

Get the feature-to-feature correlation matrix.

Functions
__init__(metrics=None, iv_bins=BINNING.DEFAULT_BUCKETS, lift_k=0.1, corr_method='pearson', engine='auto')

Initialize the FeatureSelector.

Parameters:

Name Type Description Default
metrics Optional[List[str]]

Metrics to calculate. Options: 'iv', 'missing_rate', 'ks', 'correlation', 'lift'. If None, uses a default set.

None
iv_bins int

Number of bins for initial IV calculation.

DEFAULT_BUCKETS
lift_k float

Fraction of population to use for Lift calculation (e.g., top 10%).

0.1
corr_method str

Correlation method ('pearson', 'spearman', 'kendall').

'pearson'
engine str

Execution engine ('auto', 'rust', 'python').

'auto'
Source code in src/newt/features/selection/selector.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __init__(
    self,
    metrics: Optional[List[str]] = None,
    iv_bins: int = BINNING.DEFAULT_BUCKETS,
    lift_k: float = 0.1,
    corr_method: str = "pearson",
    engine: str = "auto",
):
    """Initialize the FeatureSelector.

    Args:
        metrics: Metrics to calculate. Options: 'iv', 'missing_rate', 'ks',
            'correlation', 'lift'. If None, uses a default set.
        iv_bins: Number of bins for initial IV calculation.
        lift_k: Fraction of population to use for Lift calculation (e.g., top 10%).
        corr_method: Correlation method ('pearson', 'spearman', 'kendall').
        engine: Execution engine ('auto', 'rust', 'python').
    """
    self._analyzer = FeatureAnalyzer(
        metrics=metrics,
        iv_bins=iv_bins,
        lift_k=lift_k,
        corr_method=corr_method,
        engine=engine,
    )
    self._filter = FeatureSelectionFilter(engine=engine)
    self.metrics: Set[str] = set(self._analyzer.metrics)
    self.engine = engine

    self.eda_summary_: pd.DataFrame = pd.DataFrame()
    self.analysis_result_: Optional[FeatureAnalysisResult] = None

    self.selected_features_: List[str] = []
    self.removed_features_: dict = {}
    self.corr_removed_: list = []
    self.selection_result_: Optional[FeatureSelectionResult] = None
    self.is_fitted_: bool = False
    self.is_selected_: bool = False
fit(X, y=None)

Calculate feature statistics for the input DataFrame.

Parameters:

Name Type Description Default
X DataFrame

Input dataset.

required
y Optional[Series]

Target binary labels. Required for supervised metrics like IV or KS.

None

Returns:

Name Type Description
FeatureSelector FeatureSelector

The fitted selector instance.

Source code in src/newt/features/selection/selector.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def fit(self, X: pd.DataFrame, y: Optional[pd.Series] = None) -> "FeatureSelector":
    """Calculate feature statistics for the input DataFrame.

    Args:
        X: Input dataset.
        y: Target binary labels. Required for supervised metrics like IV or KS.

    Returns:
        FeatureSelector: The fitted selector instance.
    """
    self.analysis_result_ = self._analyzer.analyze(X, y)
    self.eda_summary_ = self.analysis_result_.summary.copy()
    self.corr_matrix_ = self.analysis_result_.corr_matrix.copy()
    self.is_fitted_ = True

    self.selected_features_ = (
        list(self.eda_summary_["feature"]) if not self.eda_summary_.empty else []
    )
    self.removed_features_ = {}
    self.corr_removed_ = []
    self.selection_result_ = FeatureSelectionResult(
        selected_features=list(self.selected_features_),
    )
    self.is_selected_ = False
    return self
report()

Generate a report combining EDA stats and selection status.

Source code in src/newt/features/selection/selector.py
138
139
140
141
142
143
144
145
146
147
@requires_fit()
def report(self) -> pd.DataFrame:
    """Generate a report combining EDA stats and selection status."""
    if self.analysis_result_ is None:
        return pd.DataFrame()

    return self.analysis_result_.report(
        selected_features=self.selected_features_,
        removed_features=self.removed_features_,
    )
select(iv_threshold=FILTERING.DEFAULT_IV_THRESHOLD, missing_threshold=FILTERING.DEFAULT_MISSING_THRESHOLD, corr_threshold=FILTERING.DEFAULT_CORR_THRESHOLD)

Filter features based on thresholds for IV, missing rate, and correlation.

Parameters:

Name Type Description Default
iv_threshold float

Minimum Information Value (IV) to keep a feature.

DEFAULT_IV_THRESHOLD
missing_threshold float

Maximum missing rate (fraction) to keep a feature.

DEFAULT_MISSING_THRESHOLD
corr_threshold float

Maximum absolute correlation coefficient. If a pair exceeds this, the one with lower IV is removed.

DEFAULT_CORR_THRESHOLD

Returns:

Name Type Description
FeatureSelector FeatureSelector

The selector instance after selection.

Raises:

Type Description
ValueError

If called before fit().

Source code in src/newt/features/selection/selector.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def select(
    self,
    iv_threshold: float = FILTERING.DEFAULT_IV_THRESHOLD,
    missing_threshold: float = FILTERING.DEFAULT_MISSING_THRESHOLD,
    corr_threshold: float = FILTERING.DEFAULT_CORR_THRESHOLD,
) -> "FeatureSelector":
    """Filter features based on thresholds for IV, missing rate, and correlation.

    Args:
        iv_threshold: Minimum Information Value (IV) to keep a feature.
        missing_threshold: Maximum missing rate (fraction) to keep a feature.
        corr_threshold: Maximum absolute correlation coefficient. If a pair
            exceeds this, the one with lower IV is removed.

    Returns:
        FeatureSelector: The selector instance after selection.

    Raises:
        ValueError: If called before fit().
    """
    if not self.is_fitted_:
        raise ValueError("FeatureSelector is not fitted. Call fit() first.")
    if self.analysis_result_ is None:
        raise ValueError("Feature analysis result is missing. Call fit() first.")

    self.selection_result_ = self._filter.select(
        analysis=self.analysis_result_,
        iv_threshold=iv_threshold,
        missing_threshold=missing_threshold,
        corr_threshold=corr_threshold,
    )
    self.selected_features_ = list(self.selection_result_.selected_features)
    self.removed_features_ = dict(self.selection_result_.removed_features)
    self.corr_removed_ = list(self.selection_result_.corr_removed)
    self.is_selected_ = True
    return self
transform(X)

Return X with only selected features.

Source code in src/newt/features/selection/selector.py
149
150
151
152
153
154
@requires_fit()
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
    """Return X with only selected features."""
    if self.selection_result_ is None:
        return X[self.selected_features_]
    return self.selection_result_.transform(X)

newt.features.selection.stepwise

Stepwise regression feature selection.

Provides forward, backward, and bidirectional stepwise selection based on statistical significance (p-values) or information criteria (AIC/BIC).

Classes

StepwiseSelector

Stepwise regression feature selector.

Uses hypothesis testing to select optimal features for logistic regression. Supports forward selection, backward elimination, and bidirectional stepwise.

This is typically used after WOE transformation and before final model building.

Examples

selector = StepwiseSelector(direction='both', criterion='aic') selector.fit(X_woe, y) X_selected = selector.transform(X_woe) print(selector.selected_features_)

Source code in src/newt/features/selection/stepwise.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
class StepwiseSelector:
    """
    Stepwise regression feature selector.

    Uses hypothesis testing to select optimal features for logistic regression.
    Supports forward selection, backward elimination, and bidirectional stepwise.

    This is typically used after WOE transformation and before final model building.

    Examples
    --------
    >>> selector = StepwiseSelector(direction='both', criterion='aic')
    >>> selector.fit(X_woe, y)
    >>> X_selected = selector.transform(X_woe)
    >>> print(selector.selected_features_)
    """

    def __init__(
        self,
        direction: str = "both",
        criterion: str = "aic",
        p_enter: float = MODELING.DEFAULT_P_ENTER,
        p_remove: float = MODELING.DEFAULT_P_REMOVE,
        max_iter: int = 100,
        fit_intercept: bool = True,
        exclude: Optional[List[str]] = None,
        engine: str = "auto",
        verbose: bool = True,
    ):
        """
        Initialize StepwiseSelector.

        Parameters
        ----------
        direction : str
            Selection direction:
            - 'forward': Start with no features, add one at a time
            - 'backward': Start with all features, remove one at a time
            - 'both': Bidirectional stepwise (forward + backward)
            Default 'both'.
        criterion : str
            Selection criterion:
            - 'pvalue': Use p-value for selection
            - 'aic': Use Akaike Information Criterion
            - 'bic': Use Bayesian Information Criterion
            Default 'aic'.
        p_enter : float
            P-value threshold for entering a feature. Default 0.05.
            Used when direction='forward' or 'both'.
        p_remove : float
            P-value threshold for removing a feature. Default 0.10.
            Used when direction='backward' or 'both'.
        max_iter : int
            Maximum iterations. Default 100.
        fit_intercept : bool
            Whether to include intercept. Default True.
        exclude : List[str], optional
            Features to always keep in the model (force include).
        engine : str
            Computation engine: 'auto', 'rust', or 'python'. Defaults to 'auto'
            (prefer Rust, fallback to Python when Rust is unavailable).
        verbose : bool
            Whether to show progress bars during selection.
        """
        if direction not in ("forward", "backward", "both"):
            raise ValueError("direction must be 'forward', 'backward', or 'both'")
        if criterion not in ("pvalue", "aic", "bic"):
            raise ValueError("criterion must be 'pvalue', 'aic', or 'bic'")
        try:
            validate_engine(engine)
        except ValueError as exc:
            raise ValueError("engine must be 'auto', 'rust' or 'python'") from exc

        self.direction = direction
        self.criterion = criterion
        self.p_enter = p_enter
        self.p_remove = p_remove
        self.max_iter = max_iter
        self.fit_intercept = fit_intercept
        self.exclude = exclude or []
        self.engine = resolve_engine(
            engine,
            required_functions=[
                "batch_fit_logistic_regression_numpy",
                "fit_logistic_regression_numpy",
            ],
            component="Rust stepwise engine",
            loader=lambda: _RUST_MODULE if HAS_RUST else None,
        )
        self.verbose = verbose

        # Fitted attributes
        self.selected_features_: List[str] = []
        self.removed_features_: List[str] = []
        self.selection_history_: List[Dict] = []
        self.is_fitted_: bool = False

    def fit(self, X: pd.DataFrame, y: pd.Series) -> "StepwiseSelector":
        """
        Fit the stepwise selector.

        Parameters
        ----------
        X : pd.DataFrame
            Feature data (typically WOE transformed).
        y : pd.Series
            Binary target variable (0/1).

        Returns
        -------
        StepwiseSelector
            Fitted instance.
        """
        try:
            import statsmodels.api as sm
        except ImportError:
            raise ImportError(
                "statsmodels is required for StepwiseSelector. "
                "Install it with: pip install statsmodels"
            )

        X = X.copy()
        y = y.copy()

        all_features = X.columns.tolist()

        # Ensure exclude features are valid
        exclude_set = set(self.exclude) & set(all_features)

        if self.direction == "forward":
            selected = self._forward_selection(X, y, all_features, exclude_set, sm)
        elif self.direction == "backward":
            selected = self._backward_elimination(X, y, all_features, exclude_set, sm)
        else:  # both
            selected = self._bidirectional_selection(
                X, y, all_features, exclude_set, sm
            )

        self.selected_features_ = selected
        self.removed_features_ = [f for f in all_features if f not in selected]
        self.is_fitted_ = True

        return self

    def _fit_model(self, X: pd.DataFrame, y: pd.Series, features: List[str], sm):
        """Fit logistic regression model and return result."""
        if not features:
            return None

        if self.engine == "rust":
            X_subset = X[features].values
            if self.fit_intercept:
                X_subset = np.column_stack([np.ones(X_subset.shape[0]), X_subset])

            try:
                # Rust engine returns a dict resembling sm result structure
                # for AIC/BIC compatibility.
                result = fit_logistic_regression_numpy(
                    X_subset, y.values.astype(float), max_iter=self.max_iter
                )
                if not isinstance(result, dict):
                    return None
                if not np.isfinite(float(result.get("aic", np.inf))):
                    return None
                if not np.isfinite(float(result.get("bic", np.inf))):
                    return None
                return result
            except Exception:
                return None
        else:
            X_subset = X[features]
            if self.fit_intercept:
                X_subset = sm.add_constant(X_subset, has_constant="add")

            try:
                model = sm.Logit(y, X_subset)
                result = model.fit(disp=False, maxiter=self.max_iter)
                return result
            except Exception:
                return None

    def _get_criterion_value(self, result, criterion: str) -> float:
        """Get criterion value for model comparison."""
        if result is None:
            return np.inf

        if isinstance(result, dict):
            # Rust result
            if criterion == "aic":
                return result["aic"]
            elif criterion == "bic":
                return result["bic"]
            else:
                return max(result["p_values"])
        else:
            # Statsmodels result
            if criterion == "aic":
                return result.aic
            elif criterion == "bic":
                return result.bic
            else:  # pvalue - return max p-value (for backward)
                pvalues = result.pvalues
                if self.fit_intercept and "const" in pvalues.index:
                    pvalues = pvalues.drop("const")
                return pvalues.max() if len(pvalues) > 0 else 0.0

    def _get_pvalue(self, result, features: List[str], target_feature: str) -> float:
        """Extract p-value for a specific feature from model result."""
        if result is None:
            return 1.0

        if isinstance(result, dict):
            # If result is from batch_fit, it might have a singular 'p_value'
            # (which is the candidate feature's p-value)
            if "p_value" in result:
                return result["p_value"]

            # Rust engine returns a dict with 'p_values' list
            # Features are at index 1.. if intercept is present
            try:
                idx = features.index(target_feature)
                offset = 1 if self.fit_intercept else 0
                return result["p_values"][idx + offset]
            except (ValueError, IndexError, KeyError):
                return 1.0
        else:
            # Statsmodels result object
            try:
                return result.pvalues.get(target_feature, 1.0)
            except AttributeError:
                return 1.0

    def _is_invalid_rust_candidate(
        self, candidate_values: np.ndarray, fixed_x: np.ndarray
    ) -> bool:
        if not np.isfinite(candidate_values).all():
            return True

        if np.unique(candidate_values).size <= 1:
            return True

        if fixed_x.ndim == 2 and fixed_x.shape[1] > 0:
            for col_idx in range(fixed_x.shape[1]):
                if np.array_equal(candidate_values, fixed_x[:, col_idx]):
                    return True

        return False

    def _evaluate_rust_candidates(
        self,
        X: pd.DataFrame,
        y: pd.Series,
        fixed_x: np.ndarray,
        remaining: List[str],
    ) -> List[Dict[str, float]]:
        results = [_failed_rust_batch_result() for _ in remaining]
        valid_candidate_vecs: List[np.ndarray] = []
        valid_indices: List[int] = []

        for idx, feature in enumerate(remaining):
            candidate_values = X[feature].values.astype(float)
            if self._is_invalid_rust_candidate(candidate_values, fixed_x):
                continue
            valid_candidate_vecs.append(candidate_values)
            valid_indices.append(idx)

        if not valid_candidate_vecs:
            return results

        try:
            rust_results = batch_fit_logistic_regression_numpy(
                fixed_x,
                valid_candidate_vecs,
                y.values.astype(float),
                max_iter=self.max_iter,
            )
        except Exception:
            return results

        for idx, rust_result in zip(valid_indices, rust_results):
            results[idx] = _normalize_rust_batch_result(rust_result)

        return results

    def _fixed_design_matrix(
        self,
        X: pd.DataFrame,
        selected: List[str],
    ) -> np.ndarray:
        """Build the fixed design matrix used by Rust candidate evaluation."""
        fixed_x = X[selected].values
        if self.fit_intercept:
            fixed_x = np.column_stack([np.ones(fixed_x.shape[0]), fixed_x])
        return fixed_x

    def _record_selection_step(
        self,
        iteration: int,
        action: str,
        feature: str,
        value: float,
    ) -> None:
        """Append a normalized selection-history record."""
        self.selection_history_.append(
            {
                "iteration": iteration + 1,
                "action": action,
                "feature": feature,
                "criterion": self.criterion,
                "value": value,
            }
        )

    def _forward_selection(
        self,
        X: pd.DataFrame,
        y: pd.Series,
        all_features: List[str],
        exclude_set: set,
        sm,
    ) -> List[str]:
        """Forward selection: start empty, add features one by one."""
        selected = list(exclude_set)
        remaining = [f for f in all_features if f not in selected]

        # Initialize progress bar
        pbar = tqdm(
            total=len(all_features), desc="Forward Selection", disable=not self.verbose
        )
        pbar.update(len(selected))

        for iteration in range(self.max_iter):
            best_feature = None
            best_criterion = np.inf if self.criterion != "pvalue" else 1.0
            best_pvalue = 1.0

            if self.engine == "rust" and len(remaining) > 0:
                # Parallel Batch Testing with Rust
                results = self._evaluate_rust_candidates(
                    X,
                    y,
                    self._fixed_design_matrix(X, selected),
                    remaining,
                )

                current_model = self._fit_model(X, y, selected, sm)
                current_criterion = self._get_criterion_value(
                    current_model, self.criterion
                )

                for feature, res in zip(remaining, results):
                    if not res["converged"]:
                        continue

                    if self.criterion == "pvalue":
                        pvalue = res["p_value"]
                        if pvalue < best_pvalue and pvalue < self.p_enter:
                            best_pvalue = pvalue
                            best_feature = feature
                            best_criterion = pvalue
                    else:
                        criterion_val = res[self.criterion]
                        if (
                            criterion_val < current_criterion
                            and criterion_val < best_criterion
                        ):
                            best_criterion = criterion_val
                            best_feature = feature
            else:
                # Serial Testing (statsmodels or fallback)
                for feature in remaining:
                    candidate = selected + [feature]
                    result = self._fit_model(X, y, candidate, sm)

                    if result is None:
                        continue

                    if self.criterion == "pvalue":
                        if isinstance(result, dict):
                            pvalue = result["p_values"][-1]
                        else:
                            pvalue = result.pvalues.get(feature, 1.0)

                        if pvalue < best_pvalue and pvalue < self.p_enter:
                            best_pvalue = pvalue
                            best_feature = feature
                            best_criterion = pvalue
                    else:
                        criterion_val = self._get_criterion_value(
                            result, self.criterion
                        )
                        current_result = self._fit_model(X, y, selected, sm)
                        current_criterion = self._get_criterion_value(
                            current_result, self.criterion
                        )

                        if (
                            criterion_val < current_criterion
                            and criterion_val < best_criterion
                        ):
                            best_criterion = criterion_val
                            best_feature = feature

            if best_feature is None:
                break

            selected.append(best_feature)
            remaining.remove(best_feature)
            pbar.update(1)
            pbar.set_postfix(added=best_feature)

            self._record_selection_step(iteration, "add", best_feature, best_criterion)

        pbar.close()
        return selected

    def _backward_elimination(
        self,
        X: pd.DataFrame,
        y: pd.Series,
        all_features: List[str],
        exclude_set: set,
        sm,
    ) -> List[str]:
        """Backward elimination: start with all, remove features one by one."""
        selected = all_features.copy()
        pbar = tqdm(
            total=len(all_features),
            desc="Backward Elimination",
            disable=not self.verbose,
        )

        for iteration in range(self.max_iter):
            result = self._fit_model(X, y, selected, sm)

            if result is None or len(selected) <= len(exclude_set):
                break

            # Find feature to remove (highest p-value or worst criterion impact)
            removable = [f for f in selected if f not in exclude_set]
            if not removable:
                break

            worst_feature = None
            worst_pvalue = 0.0

            if self.criterion == "pvalue":
                for feature in removable:
                    pvalue = self._get_pvalue(result, selected, feature)
                    if pvalue > worst_pvalue:
                        worst_pvalue = pvalue
                        worst_feature = feature

                if worst_pvalue <= self.p_remove:
                    break
            else:
                # For AIC/BIC, try removing each feature and find best improvement
                current_criterion = self._get_criterion_value(result, self.criterion)
                best_improvement = 0

                for feature in removable:
                    candidate = [f for f in selected if f != feature]
                    test_result = self._fit_model(X, y, candidate, sm)
                    test_criterion = self._get_criterion_value(
                        test_result, self.criterion
                    )

                    improvement = current_criterion - test_criterion
                    if improvement > best_improvement:
                        best_improvement = improvement
                        worst_feature = feature
                        worst_pvalue = self._get_pvalue(result, selected, feature)

                if best_improvement <= 0:
                    break

            if worst_feature is None:
                break

            selected.remove(worst_feature)
            pbar.update(1)
            pbar.set_postfix(removed=worst_feature)

            self._record_selection_step(
                iteration, "remove", worst_feature, worst_pvalue
            )

        pbar.close()
        return selected

    def _bidirectional_selection(
        self,
        X: pd.DataFrame,
        y: pd.Series,
        all_features: List[str],
        exclude_set: set,
        sm,
    ) -> List[str]:
        """Bidirectional stepwise: combine forward and backward."""
        selected = list(exclude_set)
        remaining = [f for f in all_features if f not in selected]

        for iteration in range(self.max_iter):
            changed = False

            # Forward step: try to add a feature
            best_feature = None
            best_criterion = np.inf if self.criterion != "pvalue" else 1.0

            if self.engine == "rust" and len(remaining) > 0:
                results = self._evaluate_rust_candidates(
                    X,
                    y,
                    self._fixed_design_matrix(X, selected),
                    remaining,
                )

                current_model = self._fit_model(X, y, selected, sm)
                current_criterion = self._get_criterion_value(
                    current_model, self.criterion
                )

                for feature, res in zip(remaining, results):
                    if not res["converged"]:
                        continue

                    if self.criterion == "pvalue":
                        current_features = selected + [feature]
                        pvalue = self._get_pvalue(res, current_features, feature)
                        if pvalue < best_criterion and pvalue < self.p_enter:
                            best_criterion = pvalue
                            best_feature = feature
                    else:
                        criterion_val = res[self.criterion]
                        if (
                            criterion_val < current_criterion
                            and criterion_val < best_criterion
                        ):
                            best_criterion = criterion_val
                            best_feature = feature
            else:
                current_model = self._fit_model(X, y, selected, sm)
                current_criterion = self._get_criterion_value(
                    current_model, self.criterion
                )
                for feature in remaining:
                    candidate = selected + [feature]
                    result = self._fit_model(X, y, candidate, sm)

                    if result is None:
                        continue

                    if self.criterion == "pvalue":
                        pvalue = self._get_pvalue(result, candidate, feature)
                        if pvalue < best_criterion and pvalue < self.p_enter:
                            best_criterion = pvalue
                            best_feature = feature
                    else:
                        criterion_val = self._get_criterion_value(
                            result, self.criterion
                        )
                        if (
                            criterion_val < current_criterion
                            and criterion_val < best_criterion
                        ):
                            best_criterion = criterion_val
                            best_feature = feature

            if best_feature is not None:
                selected.append(best_feature)
                remaining.remove(best_feature)
                changed = True

                self._record_selection_step(
                    iteration,
                    "add",
                    best_feature,
                    best_criterion,
                )

            # Backward step: try to remove a feature
            if len(selected) > len(exclude_set):
                result = self._fit_model(X, y, selected, sm)

                if result is not None:
                    removable = [f for f in selected if f not in exclude_set]
                    worst_feature = None
                    worst_pvalue = 0.0

                    if self.criterion == "pvalue":
                        for feature in removable:
                            pvalue = self._get_pvalue(result, selected, feature)
                            if pvalue > worst_pvalue and pvalue > self.p_remove:
                                worst_pvalue = pvalue
                                worst_feature = feature
                    else:
                        current_criterion = self._get_criterion_value(
                            result, self.criterion
                        )
                        for feature in removable:
                            candidate = [f for f in selected if f != feature]
                            test_result = self._fit_model(X, y, candidate, sm)
                            test_criterion = self._get_criterion_value(
                                test_result, self.criterion
                            )

                            if test_criterion < current_criterion:
                                pvalue = self._get_pvalue(result, selected, feature)
                                if pvalue > worst_pvalue:
                                    worst_pvalue = pvalue
                                    worst_feature = feature

                    if worst_feature is not None:
                        selected.remove(worst_feature)
                        remaining.append(worst_feature)
                        changed = True

                        self._record_selection_step(
                            iteration,
                            "remove",
                            worst_feature,
                            worst_pvalue,
                        )

            if not changed:
                break

        return selected

    @requires_fit()
    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Filter columns based on fitted selection.

        Parameters
        ----------
        X : pd.DataFrame
            Data to transform.

        Returns
        -------
        pd.DataFrame
            Filtered data with only selected features.
        """
        cols_to_keep = [c for c in self.selected_features_ if c in X.columns]
        return X[cols_to_keep]

    def fit_transform(self, X: pd.DataFrame, y: pd.Series) -> pd.DataFrame:
        """Fit and transform in one step."""
        self.fit(X, y)
        return self.transform(X)

    @requires_fit()
    def report(self) -> pd.DataFrame:
        """
        Generate selection report.

        Returns
        -------
        pd.DataFrame
            Selection history with iterations, actions, and criteria values.
        """
        if not self.selection_history_:
            return pd.DataFrame(
                columns=["iteration", "action", "feature", "criterion", "value"]
            )

        return pd.DataFrame(self.selection_history_)

    @requires_fit()
    def summary(self) -> str:
        """
        Get selection summary.

        Returns
        -------
        str
            Summary of stepwise selection results.
        """
        lines = [
            "=" * 50,
            "Stepwise Selection Summary",
            "=" * 50,
            f"Direction: {self.direction}",
            f"Criterion: {self.criterion}",
            f"P-enter: {self.p_enter}, P-remove: {self.p_remove}",
            "-" * 50,
            f"Selected features: {len(self.selected_features_)}",
            f"Removed features: {len(self.removed_features_)}",
            "-" * 50,
            "Selected:",
        ]

        for f in self.selected_features_:
            lines.append(f"  - {f}")

        if self.removed_features_:
            lines.append("-" * 50)
            lines.append("Removed:")
            for f in self.removed_features_:
                lines.append(f"  - {f}")

        lines.append("=" * 50)
        return "\n".join(lines)
Functions
__init__(direction='both', criterion='aic', p_enter=MODELING.DEFAULT_P_ENTER, p_remove=MODELING.DEFAULT_P_REMOVE, max_iter=100, fit_intercept=True, exclude=None, engine='auto', verbose=True)

Initialize StepwiseSelector.

Parameters

direction : str Selection direction: - 'forward': Start with no features, add one at a time - 'backward': Start with all features, remove one at a time - 'both': Bidirectional stepwise (forward + backward) Default 'both'. criterion : str Selection criterion: - 'pvalue': Use p-value for selection - 'aic': Use Akaike Information Criterion - 'bic': Use Bayesian Information Criterion Default 'aic'. p_enter : float P-value threshold for entering a feature. Default 0.05. Used when direction='forward' or 'both'. p_remove : float P-value threshold for removing a feature. Default 0.10. Used when direction='backward' or 'both'. max_iter : int Maximum iterations. Default 100. fit_intercept : bool Whether to include intercept. Default True. exclude : List[str], optional Features to always keep in the model (force include). engine : str Computation engine: 'auto', 'rust', or 'python'. Defaults to 'auto' (prefer Rust, fallback to Python when Rust is unavailable). verbose : bool Whether to show progress bars during selection.

Source code in src/newt/features/selection/stepwise.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def __init__(
    self,
    direction: str = "both",
    criterion: str = "aic",
    p_enter: float = MODELING.DEFAULT_P_ENTER,
    p_remove: float = MODELING.DEFAULT_P_REMOVE,
    max_iter: int = 100,
    fit_intercept: bool = True,
    exclude: Optional[List[str]] = None,
    engine: str = "auto",
    verbose: bool = True,
):
    """
    Initialize StepwiseSelector.

    Parameters
    ----------
    direction : str
        Selection direction:
        - 'forward': Start with no features, add one at a time
        - 'backward': Start with all features, remove one at a time
        - 'both': Bidirectional stepwise (forward + backward)
        Default 'both'.
    criterion : str
        Selection criterion:
        - 'pvalue': Use p-value for selection
        - 'aic': Use Akaike Information Criterion
        - 'bic': Use Bayesian Information Criterion
        Default 'aic'.
    p_enter : float
        P-value threshold for entering a feature. Default 0.05.
        Used when direction='forward' or 'both'.
    p_remove : float
        P-value threshold for removing a feature. Default 0.10.
        Used when direction='backward' or 'both'.
    max_iter : int
        Maximum iterations. Default 100.
    fit_intercept : bool
        Whether to include intercept. Default True.
    exclude : List[str], optional
        Features to always keep in the model (force include).
    engine : str
        Computation engine: 'auto', 'rust', or 'python'. Defaults to 'auto'
        (prefer Rust, fallback to Python when Rust is unavailable).
    verbose : bool
        Whether to show progress bars during selection.
    """
    if direction not in ("forward", "backward", "both"):
        raise ValueError("direction must be 'forward', 'backward', or 'both'")
    if criterion not in ("pvalue", "aic", "bic"):
        raise ValueError("criterion must be 'pvalue', 'aic', or 'bic'")
    try:
        validate_engine(engine)
    except ValueError as exc:
        raise ValueError("engine must be 'auto', 'rust' or 'python'") from exc

    self.direction = direction
    self.criterion = criterion
    self.p_enter = p_enter
    self.p_remove = p_remove
    self.max_iter = max_iter
    self.fit_intercept = fit_intercept
    self.exclude = exclude or []
    self.engine = resolve_engine(
        engine,
        required_functions=[
            "batch_fit_logistic_regression_numpy",
            "fit_logistic_regression_numpy",
        ],
        component="Rust stepwise engine",
        loader=lambda: _RUST_MODULE if HAS_RUST else None,
    )
    self.verbose = verbose

    # Fitted attributes
    self.selected_features_: List[str] = []
    self.removed_features_: List[str] = []
    self.selection_history_: List[Dict] = []
    self.is_fitted_: bool = False
fit(X, y)

Fit the stepwise selector.

Parameters

X : pd.DataFrame Feature data (typically WOE transformed). y : pd.Series Binary target variable (0/1).

Returns

StepwiseSelector Fitted instance.

Source code in src/newt/features/selection/stepwise.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def fit(self, X: pd.DataFrame, y: pd.Series) -> "StepwiseSelector":
    """
    Fit the stepwise selector.

    Parameters
    ----------
    X : pd.DataFrame
        Feature data (typically WOE transformed).
    y : pd.Series
        Binary target variable (0/1).

    Returns
    -------
    StepwiseSelector
        Fitted instance.
    """
    try:
        import statsmodels.api as sm
    except ImportError:
        raise ImportError(
            "statsmodels is required for StepwiseSelector. "
            "Install it with: pip install statsmodels"
        )

    X = X.copy()
    y = y.copy()

    all_features = X.columns.tolist()

    # Ensure exclude features are valid
    exclude_set = set(self.exclude) & set(all_features)

    if self.direction == "forward":
        selected = self._forward_selection(X, y, all_features, exclude_set, sm)
    elif self.direction == "backward":
        selected = self._backward_elimination(X, y, all_features, exclude_set, sm)
    else:  # both
        selected = self._bidirectional_selection(
            X, y, all_features, exclude_set, sm
        )

    self.selected_features_ = selected
    self.removed_features_ = [f for f in all_features if f not in selected]
    self.is_fitted_ = True

    return self
fit_transform(X, y)

Fit and transform in one step.

Source code in src/newt/features/selection/stepwise.py
722
723
724
725
def fit_transform(self, X: pd.DataFrame, y: pd.Series) -> pd.DataFrame:
    """Fit and transform in one step."""
    self.fit(X, y)
    return self.transform(X)
report()

Generate selection report.

Returns

pd.DataFrame Selection history with iterations, actions, and criteria values.

Source code in src/newt/features/selection/stepwise.py
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
@requires_fit()
def report(self) -> pd.DataFrame:
    """
    Generate selection report.

    Returns
    -------
    pd.DataFrame
        Selection history with iterations, actions, and criteria values.
    """
    if not self.selection_history_:
        return pd.DataFrame(
            columns=["iteration", "action", "feature", "criterion", "value"]
        )

    return pd.DataFrame(self.selection_history_)
summary()

Get selection summary.

Returns

str Summary of stepwise selection results.

Source code in src/newt/features/selection/stepwise.py
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
@requires_fit()
def summary(self) -> str:
    """
    Get selection summary.

    Returns
    -------
    str
        Summary of stepwise selection results.
    """
    lines = [
        "=" * 50,
        "Stepwise Selection Summary",
        "=" * 50,
        f"Direction: {self.direction}",
        f"Criterion: {self.criterion}",
        f"P-enter: {self.p_enter}, P-remove: {self.p_remove}",
        "-" * 50,
        f"Selected features: {len(self.selected_features_)}",
        f"Removed features: {len(self.removed_features_)}",
        "-" * 50,
        "Selected:",
    ]

    for f in self.selected_features_:
        lines.append(f"  - {f}")

    if self.removed_features_:
        lines.append("-" * 50)
        lines.append("Removed:")
        for f in self.removed_features_:
            lines.append(f"  - {f}")

    lines.append("=" * 50)
    return "\n".join(lines)
transform(X)

Filter columns based on fitted selection.

Parameters

X : pd.DataFrame Data to transform.

Returns

pd.DataFrame Filtered data with only selected features.

Source code in src/newt/features/selection/stepwise.py
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
@requires_fit()
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
    """
    Filter columns based on fitted selection.

    Parameters
    ----------
    X : pd.DataFrame
        Data to transform.

    Returns
    -------
    pd.DataFrame
        Filtered data with only selected features.
    """
    cols_to_keep = [c for c in self.selected_features_ if c in X.columns]
    return X[cols_to_keep]

Modeling

newt.modeling.logistic

Logistic Regression model wrapper using statsmodels.

Provides a scikit-learn-like interface for statsmodels Logit.

Classes

LogisticModel

Logistic Regression model wrapper using statsmodels.

Provides a familiar fit/predict interface while leveraging statsmodels for detailed statistical output (p-values, confidence intervals, etc.).

Examples

model = LogisticModel() model.fit(X_woe, y) print(model.summary()) predictions = model.predict_proba(X_woe)

Source code in src/newt/modeling/logistic.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
class LogisticModel:
    """
    Logistic Regression model wrapper using statsmodels.

    Provides a familiar fit/predict interface while leveraging statsmodels
    for detailed statistical output (p-values, confidence intervals, etc.).

    Examples
    --------
    >>> model = LogisticModel()
    >>> model.fit(X_woe, y)
    >>> print(model.summary())
    >>> predictions = model.predict_proba(X_woe)
    """

    SERIALIZATION_VERSION = 1

    def __init__(
        self,
        fit_intercept: bool = True,
        method: str = "bfgs",
        maxiter: int = 100,
        regularization: Optional[str] = None,
        alpha: float = 0.0,
        **kwargs,
    ):
        """
        Initialize LogisticModel.

        Parameters
        ----------
        fit_intercept : bool
            Whether to fit an intercept term. Default True.
        method : str
            Optimization method for statsmodels. Default 'bfgs'.
            Options: 'newton', 'bfgs', 'lbfgs', 'powell', 'cg', 'ncg'.
        maxiter : int
            Maximum iterations for optimization. Default 100.
        regularization : str, optional
            Regularization type: 'l1' or 'l2'. Default None (no regularization).
        alpha : float
            Regularization strength. Default 0.0.
        **kwargs
            Additional arguments passed to statsmodels fit method.
        """
        self.fit_intercept = fit_intercept
        self.method = method
        self.maxiter = maxiter
        self.regularization = regularization
        self.alpha = alpha
        self.extra_kwargs = kwargs

        # Fitted attributes
        self.model_ = None
        self.result_ = None
        self.feature_names_: List[str] = []
        self.coefficients_: pd.DataFrame = pd.DataFrame()
        self.summary_text_: str = ""
        self.model_statistics_: Dict[str, float] = {}
        self.is_fitted_: bool = False

    def fit(
        self,
        X: pd.DataFrame,
        y: pd.Series,
        sample_weight: Optional[np.ndarray] = None,
    ) -> "LogisticModel":
        """
        Fit the logistic regression model.

        Parameters
        ----------
        X : pd.DataFrame
            Feature data (typically WOE transformed).
        y : pd.Series
            Binary target variable (0/1).
        sample_weight : np.ndarray, optional
            Sample weights. Not directly supported by statsmodels Logit,
            but can be approximated using frequency weights.

        Returns
        -------
        LogisticModel
            Fitted instance.
        """
        try:
            import statsmodels.api as sm
        except ImportError:
            raise ImportError(
                "statsmodels is required for LogisticModel. "
                "Install it with: pip install statsmodels"
            )

        X = X.copy()
        y = y.copy()

        # Store feature names
        self.feature_names_ = X.columns.tolist()

        # Add constant if fitting intercept
        if self.fit_intercept:
            X = sm.add_constant(X, has_constant="add")

        # Build model
        if sample_weight is not None:
            # Use frequency weights (approximate)
            self.model_ = sm.Logit(y, X, freq_weights=sample_weight)
        else:
            self.model_ = sm.Logit(y, X)

        # Fit model
        fit_kwargs = {
            "method": self.method,
            "maxiter": self.maxiter,
            "disp": False,
            **self.extra_kwargs,
        }

        if self.regularization == "l1":
            self.result_ = self.model_.fit_regularized(
                method="l1",
                alpha=self.alpha,
                disp=False,
            )
        elif self.regularization == "l2":
            # L2 not directly supported, use ridge approximation
            fit_kwargs["cov_type"] = "HC0"  # Robust standard errors
            self.result_ = self.model_.fit(**fit_kwargs)
        else:
            self.result_ = self.model_.fit(**fit_kwargs)

        # Extract coefficients
        self._extract_coefficients()
        self._cache_fit_diagnostics()

        self.is_fitted_ = True
        return self

    def _extract_coefficients(self) -> None:
        """Extract coefficients into a DataFrame."""
        if self.result_ is None:
            return

        params = self.result_.params
        if hasattr(params, "index"):
            feature_index = [str(name) for name in params.index]
            coefficient_values = [float(value) for value in params.values]
        else:
            feature_index = self.feature_names_.copy()
            if self.fit_intercept:
                feature_index = ["const"] + feature_index
            coefficient_values = [float(value) for value in np.asarray(params).ravel()]

        coef_df = pd.DataFrame(
            {
                "feature": feature_index,
                "coefficient": coefficient_values,
                "std_error": [float(value) for value in self.result_.bse.values],
                "z_value": [float(value) for value in self.result_.tvalues.values],
                "p_value": [float(value) for value in self.result_.pvalues.values],
            }
        )

        # Add confidence intervals
        conf_int = self.result_.conf_int()
        coef_df["ci_lower"] = conf_int[0].values
        coef_df["ci_upper"] = conf_int[1].values

        # Add odds ratio
        coef_df["odds_ratio"] = np.exp(coef_df["coefficient"])

        self.coefficients_ = coef_df

    def _cache_fit_diagnostics(self) -> None:
        """Cache summary text and model-level diagnostics for lightweight restore."""
        self.model_statistics_ = self._extract_model_statistics()
        if self.result_ is None:
            self.summary_text_ = ""
            return
        try:
            self.summary_text_ = str(self.result_.summary().as_text())
        except Exception:
            self.summary_text_ = ""

    def _extract_model_statistics(self) -> Dict[str, float]:
        """Extract finite model-level summary statistics."""
        if self.result_ is None:
            return {}

        mapping = {
            "aic": "aic",
            "bic": "bic",
            "llf": "log_likelihood",
            "prsquared": "pseudo_r2",
            "nobs": "nobs",
        }
        output: Dict[str, float] = {}
        for attr_name, output_name in mapping.items():
            value = getattr(self.result_, attr_name, None)
            numeric = self._as_finite_float(value)
            if numeric is None:
                continue
            output[output_name] = numeric
        return output

    def _intercept(self) -> float:
        """Return intercept coefficient (const) if present."""
        if not self.fit_intercept or self.coefficients_.empty:
            return 0.0

        const_row = self.coefficients_[self.coefficients_["feature"] == "const"]
        if const_row.empty:
            return 0.0
        return float(const_row["coefficient"].iloc[0])

    def _coefficient_map(self) -> Dict[str, float]:
        """Return feature coefficient mapping excluding intercept."""
        if self.coefficients_.empty:
            return {}
        coef_frame = self.coefficients_[self.coefficients_["feature"] != "const"]
        return {
            str(row["feature"]): float(row["coefficient"])
            for _, row in coef_frame.iterrows()
        }

    def _feature_statistics(self) -> Dict[str, Dict[str, float]]:
        """Return finite feature-level statistics from the coefficient table."""
        if self.coefficients_.empty:
            return {}

        fields = [
            "coefficient",
            "std_error",
            "z_value",
            "p_value",
            "ci_lower",
            "ci_upper",
            "odds_ratio",
        ]
        output: Dict[str, Dict[str, float]] = {}
        coef_frame = self.coefficients_[self.coefficients_["feature"] != "const"]
        for _, row in coef_frame.iterrows():
            feature = str(row["feature"])
            stats: Dict[str, float] = {}
            for field in fields:
                if field not in row:
                    continue
                numeric = self._as_finite_float(row[field])
                if numeric is None:
                    continue
                stats[field] = numeric
            if stats:
                output[feature] = stats
        return output

    @staticmethod
    def _as_finite_float(value: Any) -> Optional[float]:
        """Convert value to finite float if possible."""
        if value is None:
            return None
        try:
            numeric = float(value)
        except (TypeError, ValueError):
            return None
        if not np.isfinite(numeric):
            return None
        return numeric

    @staticmethod
    def _normalize_model_statistics(raw: Any) -> Dict[str, float]:
        """Normalize persisted model statistics."""
        if not isinstance(raw, dict):
            return {}
        normalized: Dict[str, float] = {}
        for key, value in raw.items():
            numeric = LogisticModel._as_finite_float(value)
            if numeric is None:
                continue
            normalized[str(key)] = numeric
        return normalized

    @staticmethod
    def _serialize_extra_kwargs(raw: Any) -> Dict[str, Any]:
        """Keep only scalar fit kwargs that are safe to serialize."""
        if not isinstance(raw, dict):
            return {}
        output: Dict[str, Any] = {}
        for key, value in raw.items():
            if isinstance(value, (bool, int, str)):
                output[str(key)] = value
                continue
            if isinstance(value, float) and np.isfinite(value):
                output[str(key)] = float(value)
        return output

    @staticmethod
    def _resolve_newt_version() -> str:
        """Resolve installed package version if available."""
        try:
            return version("newt")
        except PackageNotFoundError:
            return "unknown"

    @classmethod
    def _build_coefficients_frame(
        cls,
        intercept: float,
        coefficients: Dict[str, Any],
        feature_names: List[str],
        feature_statistics: Any,
        fit_intercept: bool,
    ) -> pd.DataFrame:
        """Build coefficient frame for lightweight restored model."""
        stats_by_feature = (
            feature_statistics if isinstance(feature_statistics, dict) else {}
        )
        ordered_features: List[str] = [str(feature) for feature in feature_names]
        for feature in coefficients:
            feature_name = str(feature)
            if feature_name not in ordered_features:
                ordered_features.append(feature_name)

        records: List[Dict[str, Any]] = []
        if fit_intercept:
            records.append(
                {
                    "feature": "const",
                    "coefficient": float(intercept),
                    "std_error": np.nan,
                    "z_value": np.nan,
                    "p_value": np.nan,
                    "ci_lower": np.nan,
                    "ci_upper": np.nan,
                    "odds_ratio": float(np.exp(intercept)),
                }
            )

        for feature in ordered_features:
            coefficient = cls._as_finite_float(coefficients.get(feature))
            if coefficient is None:
                coefficient = 0.0
            stats = stats_by_feature.get(feature, {})
            if not isinstance(stats, dict):
                stats = {}
            record = {
                "feature": feature,
                "coefficient": float(coefficient),
                "std_error": cls._as_finite_float(stats.get("std_error")),
                "z_value": cls._as_finite_float(stats.get("z_value")),
                "p_value": cls._as_finite_float(stats.get("p_value")),
                "ci_lower": cls._as_finite_float(stats.get("ci_lower")),
                "ci_upper": cls._as_finite_float(stats.get("ci_upper")),
                "odds_ratio": cls._as_finite_float(stats.get("odds_ratio")),
            }
            if record["odds_ratio"] is None:
                record["odds_ratio"] = float(np.exp(coefficient))
            records.append(record)

        return pd.DataFrame.from_records(records)

    @requires_fit()
    def predict_proba(self, X: pd.DataFrame) -> np.ndarray:
        """
        Predict probability of positive class.

        Parameters
        ----------
        X : pd.DataFrame
            Feature data.

        Returns
        -------
        np.ndarray
            Predicted probabilities for positive class.
        """
        X = X.copy()

        # Ensure same columns as training
        X = X[self.feature_names_]

        if self.result_ is not None:
            try:
                import statsmodels.api as sm
            except ImportError:
                raise ImportError("statsmodels is required.")

            if self.fit_intercept:
                X = sm.add_constant(X, has_constant="add")

            return np.asarray(self.result_.predict(X), dtype=float)

        coefficients = self._coefficient_map()
        coef_vector = np.asarray(
            [coefficients.get(feature, 0.0) for feature in self.feature_names_],
            dtype=float,
        )
        linear_part = X.to_numpy(dtype=float) @ coef_vector + self._intercept()
        stabilized = np.clip(linear_part, -500.0, 500.0)
        return 1.0 / (1.0 + np.exp(-stabilized))

    def predict(
        self,
        X: pd.DataFrame,
        threshold: float = MODELING.DEFAULT_CLASSIFICATION_THRESHOLD,
    ) -> np.ndarray:
        """
        Predict class labels.

        Parameters
        ----------
        X : pd.DataFrame
            Feature data.
        threshold : float
            Classification threshold. Default 0.5.

        Returns
        -------
        np.ndarray
            Predicted class labels (0 or 1).
        """
        proba = self.predict_proba(X)
        return (proba >= threshold).astype(int)

    @requires_fit()
    def summary(self) -> str:
        """
        Get statsmodels summary.

        Returns
        -------
        str
            Model summary as string.
        """
        if self.result_ is not None:
            return self.result_.summary().as_text()
        if self.summary_text_:
            return self.summary_text_
        return (
            "Model restored from serialized coefficients; "
            "statsmodels summary output is unavailable."
        )

    @requires_fit()
    def get_coefficients(self) -> pd.DataFrame:
        """
        Get coefficients DataFrame.

        Returns
        -------
        pd.DataFrame
            DataFrame with coefficient details.
        """
        return self.coefficients_.copy()

    def get_significant_features(
        self,
        p_threshold: float = MODELING.DEFAULT_P_ENTER,
    ) -> pd.DataFrame:
        """
        Get features with p-value below threshold.

        Parameters
        ----------
        p_threshold : float
            P-value threshold. Default 0.05.

        Returns
        -------
        pd.DataFrame
            Significant coefficients.
        """
        coef = self.get_coefficients()
        return coef[coef["p_value"] < p_threshold]

    @requires_fit()
    def to_dict(self) -> Dict[str, Any]:
        """
        Export model parameters as dictionary.

        Returns
        -------
        Dict
            Model parameters including coefficients.
        """
        coefficients = self._coefficient_map()
        ordered_coefficients = {
            feature: float(coefficients.get(feature, 0.0))
            for feature in self.feature_names_
        }
        for feature, coefficient in coefficients.items():
            if feature in ordered_coefficients:
                continue
            ordered_coefficients[feature] = float(coefficient)

        model_statistics = (
            self._extract_model_statistics()
            if self.result_ is not None
            else dict(self.model_statistics_)
        )
        summary_text = self.summary_text_
        if self.result_ is not None and not summary_text:
            try:
                summary_text = str(self.result_.summary().as_text())
            except Exception:
                summary_text = ""

        return {
            "schema_version": self.SERIALIZATION_VERSION,
            "newt_version": self._resolve_newt_version(),
            "fit_intercept": bool(self.fit_intercept),
            "method": str(self.method),
            "maxiter": int(self.maxiter),
            "regularization": self.regularization,
            "alpha": float(self.alpha),
            "extra_kwargs": self._serialize_extra_kwargs(self.extra_kwargs),
            "intercept": float(self._intercept()),
            "coefficients": ordered_coefficients,
            "feature_names": list(self.feature_names_),
            "feature_statistics": self._feature_statistics(),
            "model_statistics": model_statistics,
            "summary_text": summary_text,
        }

    @classmethod
    def from_dict(cls, payload: Dict[str, Any]) -> "LogisticModel":
        """
        Restore a fitted LogisticModel from serialized payload.

        Parameters
        ----------
        payload : Dict[str, Any]
            Dictionary exported by ``to_dict``.

        Returns
        -------
        LogisticModel
            Restored fitted model instance.
        """
        if not isinstance(payload, dict):
            raise ValueError("payload must be a dictionary.")
        raw_coefficients = payload.get("coefficients", {})
        if not isinstance(raw_coefficients, dict):
            raise ValueError("payload['coefficients'] must be a dictionary.")

        fit_intercept = bool(payload.get("fit_intercept", "intercept" in payload))
        method = str(payload.get("method", "bfgs"))
        maxiter = int(payload.get("maxiter", 100))
        regularization = payload.get("regularization")
        alpha = float(payload.get("alpha", 0.0))
        extra_kwargs = cls._serialize_extra_kwargs(payload.get("extra_kwargs", {}))

        model = cls(
            fit_intercept=fit_intercept,
            method=method,
            maxiter=maxiter,
            regularization=regularization,
            alpha=alpha,
            **extra_kwargs,
        )

        features = payload.get("feature_names", list(raw_coefficients.keys()))
        if not isinstance(features, list):
            raise ValueError("payload['feature_names'] must be a list if provided.")
        feature_names = [str(feature) for feature in features]
        for feature in raw_coefficients:
            feature_name = str(feature)
            if feature_name not in feature_names:
                feature_names.append(feature_name)

        intercept = cls._as_finite_float(payload.get("intercept"))
        if intercept is None:
            intercept = 0.0

        coefficients: Dict[str, float] = {}
        for feature, value in raw_coefficients.items():
            numeric = cls._as_finite_float(value)
            coefficients[str(feature)] = float(numeric) if numeric is not None else 0.0

        model.feature_names_ = feature_names
        model.model_ = None
        model.result_ = None
        model.coefficients_ = cls._build_coefficients_frame(
            intercept=intercept,
            coefficients=coefficients,
            feature_names=feature_names,
            feature_statistics=payload.get("feature_statistics", {}),
            fit_intercept=fit_intercept,
        )
        model.model_statistics_ = cls._normalize_model_statistics(
            payload.get("model_statistics", {})
        )
        model.summary_text_ = str(payload.get("summary_text", "") or "")
        model.is_fitted_ = True
        return model

    @requires_fit()
    def dump(self, path: Union[str, Path]) -> None:
        """
        Dump the model payload to a JSON file.

        Parameters
        ----------
        path : Union[str, Path]
            Output JSON path.
        """
        target = Path(path)
        if target.parent and not target.parent.exists():
            target.parent.mkdir(parents=True, exist_ok=True)
        with target.open("w", encoding="utf-8") as file:
            json.dump(self.to_dict(), file, ensure_ascii=False, indent=2)

    @classmethod
    def load(cls, path: Union[str, Path]) -> "LogisticModel":
        """
        Load a model payload from JSON file.

        Parameters
        ----------
        path : Union[str, Path]
            Input JSON path.

        Returns
        -------
        LogisticModel
            Restored fitted model instance.
        """
        with Path(path).open("r", encoding="utf-8") as file:
            payload = json.load(file)
        return cls.from_dict(payload)
Functions
__init__(fit_intercept=True, method='bfgs', maxiter=100, regularization=None, alpha=0.0, **kwargs)

Initialize LogisticModel.

Parameters

fit_intercept : bool Whether to fit an intercept term. Default True. method : str Optimization method for statsmodels. Default 'bfgs'. Options: 'newton', 'bfgs', 'lbfgs', 'powell', 'cg', 'ncg'. maxiter : int Maximum iterations for optimization. Default 100. regularization : str, optional Regularization type: 'l1' or 'l2'. Default None (no regularization). alpha : float Regularization strength. Default 0.0. **kwargs Additional arguments passed to statsmodels fit method.

Source code in src/newt/modeling/logistic.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def __init__(
    self,
    fit_intercept: bool = True,
    method: str = "bfgs",
    maxiter: int = 100,
    regularization: Optional[str] = None,
    alpha: float = 0.0,
    **kwargs,
):
    """
    Initialize LogisticModel.

    Parameters
    ----------
    fit_intercept : bool
        Whether to fit an intercept term. Default True.
    method : str
        Optimization method for statsmodels. Default 'bfgs'.
        Options: 'newton', 'bfgs', 'lbfgs', 'powell', 'cg', 'ncg'.
    maxiter : int
        Maximum iterations for optimization. Default 100.
    regularization : str, optional
        Regularization type: 'l1' or 'l2'. Default None (no regularization).
    alpha : float
        Regularization strength. Default 0.0.
    **kwargs
        Additional arguments passed to statsmodels fit method.
    """
    self.fit_intercept = fit_intercept
    self.method = method
    self.maxiter = maxiter
    self.regularization = regularization
    self.alpha = alpha
    self.extra_kwargs = kwargs

    # Fitted attributes
    self.model_ = None
    self.result_ = None
    self.feature_names_: List[str] = []
    self.coefficients_: pd.DataFrame = pd.DataFrame()
    self.summary_text_: str = ""
    self.model_statistics_: Dict[str, float] = {}
    self.is_fitted_: bool = False
dump(path)

Dump the model payload to a JSON file.

Parameters

path : Union[str, Path] Output JSON path.

Source code in src/newt/modeling/logistic.py
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
@requires_fit()
def dump(self, path: Union[str, Path]) -> None:
    """
    Dump the model payload to a JSON file.

    Parameters
    ----------
    path : Union[str, Path]
        Output JSON path.
    """
    target = Path(path)
    if target.parent and not target.parent.exists():
        target.parent.mkdir(parents=True, exist_ok=True)
    with target.open("w", encoding="utf-8") as file:
        json.dump(self.to_dict(), file, ensure_ascii=False, indent=2)
fit(X, y, sample_weight=None)

Fit the logistic regression model.

Parameters

X : pd.DataFrame Feature data (typically WOE transformed). y : pd.Series Binary target variable (0/1). sample_weight : np.ndarray, optional Sample weights. Not directly supported by statsmodels Logit, but can be approximated using frequency weights.

Returns

LogisticModel Fitted instance.

Source code in src/newt/modeling/logistic.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def fit(
    self,
    X: pd.DataFrame,
    y: pd.Series,
    sample_weight: Optional[np.ndarray] = None,
) -> "LogisticModel":
    """
    Fit the logistic regression model.

    Parameters
    ----------
    X : pd.DataFrame
        Feature data (typically WOE transformed).
    y : pd.Series
        Binary target variable (0/1).
    sample_weight : np.ndarray, optional
        Sample weights. Not directly supported by statsmodels Logit,
        but can be approximated using frequency weights.

    Returns
    -------
    LogisticModel
        Fitted instance.
    """
    try:
        import statsmodels.api as sm
    except ImportError:
        raise ImportError(
            "statsmodels is required for LogisticModel. "
            "Install it with: pip install statsmodels"
        )

    X = X.copy()
    y = y.copy()

    # Store feature names
    self.feature_names_ = X.columns.tolist()

    # Add constant if fitting intercept
    if self.fit_intercept:
        X = sm.add_constant(X, has_constant="add")

    # Build model
    if sample_weight is not None:
        # Use frequency weights (approximate)
        self.model_ = sm.Logit(y, X, freq_weights=sample_weight)
    else:
        self.model_ = sm.Logit(y, X)

    # Fit model
    fit_kwargs = {
        "method": self.method,
        "maxiter": self.maxiter,
        "disp": False,
        **self.extra_kwargs,
    }

    if self.regularization == "l1":
        self.result_ = self.model_.fit_regularized(
            method="l1",
            alpha=self.alpha,
            disp=False,
        )
    elif self.regularization == "l2":
        # L2 not directly supported, use ridge approximation
        fit_kwargs["cov_type"] = "HC0"  # Robust standard errors
        self.result_ = self.model_.fit(**fit_kwargs)
    else:
        self.result_ = self.model_.fit(**fit_kwargs)

    # Extract coefficients
    self._extract_coefficients()
    self._cache_fit_diagnostics()

    self.is_fitted_ = True
    return self
from_dict(payload) classmethod

Restore a fitted LogisticModel from serialized payload.

Parameters

payload : Dict[str, Any] Dictionary exported by to_dict.

Returns

LogisticModel Restored fitted model instance.

Source code in src/newt/modeling/logistic.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
@classmethod
def from_dict(cls, payload: Dict[str, Any]) -> "LogisticModel":
    """
    Restore a fitted LogisticModel from serialized payload.

    Parameters
    ----------
    payload : Dict[str, Any]
        Dictionary exported by ``to_dict``.

    Returns
    -------
    LogisticModel
        Restored fitted model instance.
    """
    if not isinstance(payload, dict):
        raise ValueError("payload must be a dictionary.")
    raw_coefficients = payload.get("coefficients", {})
    if not isinstance(raw_coefficients, dict):
        raise ValueError("payload['coefficients'] must be a dictionary.")

    fit_intercept = bool(payload.get("fit_intercept", "intercept" in payload))
    method = str(payload.get("method", "bfgs"))
    maxiter = int(payload.get("maxiter", 100))
    regularization = payload.get("regularization")
    alpha = float(payload.get("alpha", 0.0))
    extra_kwargs = cls._serialize_extra_kwargs(payload.get("extra_kwargs", {}))

    model = cls(
        fit_intercept=fit_intercept,
        method=method,
        maxiter=maxiter,
        regularization=regularization,
        alpha=alpha,
        **extra_kwargs,
    )

    features = payload.get("feature_names", list(raw_coefficients.keys()))
    if not isinstance(features, list):
        raise ValueError("payload['feature_names'] must be a list if provided.")
    feature_names = [str(feature) for feature in features]
    for feature in raw_coefficients:
        feature_name = str(feature)
        if feature_name not in feature_names:
            feature_names.append(feature_name)

    intercept = cls._as_finite_float(payload.get("intercept"))
    if intercept is None:
        intercept = 0.0

    coefficients: Dict[str, float] = {}
    for feature, value in raw_coefficients.items():
        numeric = cls._as_finite_float(value)
        coefficients[str(feature)] = float(numeric) if numeric is not None else 0.0

    model.feature_names_ = feature_names
    model.model_ = None
    model.result_ = None
    model.coefficients_ = cls._build_coefficients_frame(
        intercept=intercept,
        coefficients=coefficients,
        feature_names=feature_names,
        feature_statistics=payload.get("feature_statistics", {}),
        fit_intercept=fit_intercept,
    )
    model.model_statistics_ = cls._normalize_model_statistics(
        payload.get("model_statistics", {})
    )
    model.summary_text_ = str(payload.get("summary_text", "") or "")
    model.is_fitted_ = True
    return model
get_coefficients()

Get coefficients DataFrame.

Returns

pd.DataFrame DataFrame with coefficient details.

Source code in src/newt/modeling/logistic.py
461
462
463
464
465
466
467
468
469
470
471
@requires_fit()
def get_coefficients(self) -> pd.DataFrame:
    """
    Get coefficients DataFrame.

    Returns
    -------
    pd.DataFrame
        DataFrame with coefficient details.
    """
    return self.coefficients_.copy()
get_significant_features(p_threshold=MODELING.DEFAULT_P_ENTER)

Get features with p-value below threshold.

Parameters

p_threshold : float P-value threshold. Default 0.05.

Returns

pd.DataFrame Significant coefficients.

Source code in src/newt/modeling/logistic.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
def get_significant_features(
    self,
    p_threshold: float = MODELING.DEFAULT_P_ENTER,
) -> pd.DataFrame:
    """
    Get features with p-value below threshold.

    Parameters
    ----------
    p_threshold : float
        P-value threshold. Default 0.05.

    Returns
    -------
    pd.DataFrame
        Significant coefficients.
    """
    coef = self.get_coefficients()
    return coef[coef["p_value"] < p_threshold]
load(path) classmethod

Load a model payload from JSON file.

Parameters

path : Union[str, Path] Input JSON path.

Returns

LogisticModel Restored fitted model instance.

Source code in src/newt/modeling/logistic.py
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
@classmethod
def load(cls, path: Union[str, Path]) -> "LogisticModel":
    """
    Load a model payload from JSON file.

    Parameters
    ----------
    path : Union[str, Path]
        Input JSON path.

    Returns
    -------
    LogisticModel
        Restored fitted model instance.
    """
    with Path(path).open("r", encoding="utf-8") as file:
        payload = json.load(file)
    return cls.from_dict(payload)
predict(X, threshold=MODELING.DEFAULT_CLASSIFICATION_THRESHOLD)

Predict class labels.

Parameters

X : pd.DataFrame Feature data. threshold : float Classification threshold. Default 0.5.

Returns

np.ndarray Predicted class labels (0 or 1).

Source code in src/newt/modeling/logistic.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
def predict(
    self,
    X: pd.DataFrame,
    threshold: float = MODELING.DEFAULT_CLASSIFICATION_THRESHOLD,
) -> np.ndarray:
    """
    Predict class labels.

    Parameters
    ----------
    X : pd.DataFrame
        Feature data.
    threshold : float
        Classification threshold. Default 0.5.

    Returns
    -------
    np.ndarray
        Predicted class labels (0 or 1).
    """
    proba = self.predict_proba(X)
    return (proba >= threshold).astype(int)
predict_proba(X)

Predict probability of positive class.

Parameters

X : pd.DataFrame Feature data.

Returns

np.ndarray Predicted probabilities for positive class.

Source code in src/newt/modeling/logistic.py
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
@requires_fit()
def predict_proba(self, X: pd.DataFrame) -> np.ndarray:
    """
    Predict probability of positive class.

    Parameters
    ----------
    X : pd.DataFrame
        Feature data.

    Returns
    -------
    np.ndarray
        Predicted probabilities for positive class.
    """
    X = X.copy()

    # Ensure same columns as training
    X = X[self.feature_names_]

    if self.result_ is not None:
        try:
            import statsmodels.api as sm
        except ImportError:
            raise ImportError("statsmodels is required.")

        if self.fit_intercept:
            X = sm.add_constant(X, has_constant="add")

        return np.asarray(self.result_.predict(X), dtype=float)

    coefficients = self._coefficient_map()
    coef_vector = np.asarray(
        [coefficients.get(feature, 0.0) for feature in self.feature_names_],
        dtype=float,
    )
    linear_part = X.to_numpy(dtype=float) @ coef_vector + self._intercept()
    stabilized = np.clip(linear_part, -500.0, 500.0)
    return 1.0 / (1.0 + np.exp(-stabilized))
summary()

Get statsmodels summary.

Returns

str Model summary as string.

Source code in src/newt/modeling/logistic.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
@requires_fit()
def summary(self) -> str:
    """
    Get statsmodels summary.

    Returns
    -------
    str
        Model summary as string.
    """
    if self.result_ is not None:
        return self.result_.summary().as_text()
    if self.summary_text_:
        return self.summary_text_
    return (
        "Model restored from serialized coefficients; "
        "statsmodels summary output is unavailable."
    )
to_dict()

Export model parameters as dictionary.

Returns

Dict Model parameters including coefficients.

Source code in src/newt/modeling/logistic.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
@requires_fit()
def to_dict(self) -> Dict[str, Any]:
    """
    Export model parameters as dictionary.

    Returns
    -------
    Dict
        Model parameters including coefficients.
    """
    coefficients = self._coefficient_map()
    ordered_coefficients = {
        feature: float(coefficients.get(feature, 0.0))
        for feature in self.feature_names_
    }
    for feature, coefficient in coefficients.items():
        if feature in ordered_coefficients:
            continue
        ordered_coefficients[feature] = float(coefficient)

    model_statistics = (
        self._extract_model_statistics()
        if self.result_ is not None
        else dict(self.model_statistics_)
    )
    summary_text = self.summary_text_
    if self.result_ is not None and not summary_text:
        try:
            summary_text = str(self.result_.summary().as_text())
        except Exception:
            summary_text = ""

    return {
        "schema_version": self.SERIALIZATION_VERSION,
        "newt_version": self._resolve_newt_version(),
        "fit_intercept": bool(self.fit_intercept),
        "method": str(self.method),
        "maxiter": int(self.maxiter),
        "regularization": self.regularization,
        "alpha": float(self.alpha),
        "extra_kwargs": self._serialize_extra_kwargs(self.extra_kwargs),
        "intercept": float(self._intercept()),
        "coefficients": ordered_coefficients,
        "feature_names": list(self.feature_names_),
        "feature_statistics": self._feature_statistics(),
        "model_statistics": model_statistics,
        "summary_text": summary_text,
    }

newt.modeling.scorecard

Scorecard facade that builds and scores reusable specifications.

Classes

Scorecard

Scorecard generator from logistic regression model coefficients.

The Scorecard class converts the continuous probability output of a logistic regression model into an additive point-based scoring system. It manages scaliing parameters (base score, PDO) and provides methods for scoring new data, exporting definitions, and summarizing findings.

Attributes:

Name Type Description
base_score int

The target score at 'base_odds'.

pdo int

Points to Double the Odds.

base_odds float

The odds (Good:Bad) at 'base_score'.

factor float

Calculated scaling factor.

offset float

Calculated scaling offset.

Source code in src/newt/modeling/scorecard.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
class Scorecard:
    """Scorecard generator from logistic regression model coefficients.

    The Scorecard class converts the continuous probability output of a logistic
    regression model into an additive point-based scoring system. It manages
    scaliing parameters (base score, PDO) and provides methods for scoring new data,
    exporting definitions, and summarizing findings.

    Attributes:
        base_score (int): The target score at 'base_odds'.
        pdo (int): Points to Double the Odds.
        base_odds (float): The odds (Good:Bad) at 'base_score'.
        factor (float): Calculated scaling factor.
        offset (float): Calculated scaling offset.
    """

    SERIALIZATION_VERSION = 1

    def __init__(
        self,
        base_score: int = SCORECARD.DEFAULT_BASE_SCORE,
        pdo: int = SCORECARD.DEFAULT_PDO,
        base_odds: float = SCORECARD.DEFAULT_BASE_ODDS,
        points_decimals: Optional[int] = None,
    ):
        """Initialize the Scorecard instance.

        Args:
            base_score: Target score at the given base_odds.
            pdo: Points to Double the Odds (PDO).
            base_odds: Target odds at the given base_score.
            points_decimals: Optional decimal precision for scorecard points.
        """
        self.points_decimals = self._validate_points_decimals(points_decimals)
        self.base_score = base_score
        self.pdo = pdo
        self.base_odds = base_odds

        self.factor = pdo / np.log(2)
        self.offset = base_score - self.factor * np.log(base_odds)

        self.scorecard_: Dict[str, pd.DataFrame] = {}
        self.intercept_points_: float = 0.0
        self.feature_names_: List[str] = []
        self.is_built_: bool = False

        self.spec_: Optional[ScorecardSpec] = None
        self.scorer_: Optional[ScorecardScorer] = None
        self._binner = None
        self._model_coefs: Dict[str, float] = {}
        self.feature_statistics_: pd.DataFrame = pd.DataFrame()
        self.model_statistics_: Dict[str, float] = {}
        self.lr_model_: Optional[object] = None
        self.lr_parameters_: Dict[str, object] = {}
        self.lr_snapshot_: Dict[str, object] = {}

    def from_model(
        self,
        model: ScorecardModelInput,
        binner: BinnerArtifact,
        *,
        keep_training_artifacts: bool = False,
    ) -> "Scorecard":
        """Build a scorecard from a fitted model and its binning/encoding artifacts.

        Args:
            model: A fitted model object (scikit-learn, statsmodels, or dict).
            binner: A fitted Binner instance.
            keep_training_artifacts: Whether to keep direct runtime references
                to the original model and binner objects.

        Returns:
            Scorecard: The built Scorecard instance.

        Examples:
            >>> scorecard = Scorecard(base_score=600, pdo=20)
            >>> scorecard.from_model(lr_model, binner)
        """
        builder = ScorecardBuilder(
            base_score=self.base_score,
            pdo=self.pdo,
            base_odds=self.base_odds,
        )
        (
            spec,
            model_coefs,
            feature_statistics,
            model_statistics,
            lr_parameters,
        ) = builder.build(model, binner)

        if keep_training_artifacts:
            self._binner = binner
            self.lr_model_ = model if not isinstance(model, dict) else None
        else:
            self._binner = None
            self.lr_model_ = None

        self._model_coefs = dict(model_coefs)
        spec.lr_parameters = self._build_enriched_lr_parameters(
            lr_parameters=lr_parameters,
            model_coefs=model_coefs,
            summary_text=self._extract_model_summary_text(model),
            intercept=self._estimate_intercept(spec, model),
        )
        spec.points_decimals = self.points_decimals
        self._normalize_scorecard_spec(spec)
        scorecard = self._load_spec(spec)
        scorecard.lr_snapshot_ = self._build_lr_snapshot(
            spec=spec,
            model_coefs=model_coefs,
            feature_statistics=feature_statistics,
            model_statistics=model_statistics,
        )
        return scorecard

    def from_dict(self, payload: Dict[str, object]) -> "Scorecard":
        """Restore a scorecard from a serialized specification.

        Args:
            payload: A dictionary representing a serialized ScorecardSpec.

        Returns:
            Scorecard: The restored Scorecard instance.
        """
        spec = ScorecardSpec.from_dict(payload)
        self.lr_model_ = None
        self._binner = None
        self.points_decimals = self._validate_points_decimals(spec.points_decimals)
        self._normalize_scorecard_spec(spec)
        scorecard = self._load_spec(spec)
        scorecard.lr_snapshot_ = self._normalize_lr_snapshot(payload.get("lr_snapshot"))
        return scorecard

    def _load_spec(self, spec: ScorecardSpec) -> "Scorecard":
        """Internal helper to load a specification into the facade properties."""
        self.spec_ = spec
        self.scorer_ = ScorecardScorer(spec)
        self.base_score = spec.base_score
        self.pdo = spec.pdo
        self.base_odds = spec.base_odds
        self.factor = spec.factor
        self.offset = spec.offset
        self.intercept_points_ = spec.intercept_points
        self.points_decimals = self._validate_points_decimals(spec.points_decimals)
        self.feature_names_ = list(spec.feature_names)
        self.scorecard_ = {
            feature: feature_spec.to_frame()
            for feature, feature_spec in spec.feature_scores.items()
        }
        if spec.feature_statistics:
            self.feature_statistics_ = (
                pd.DataFrame.from_dict(spec.feature_statistics, orient="index")
                .reset_index()
                .rename(columns={"index": "feature"})
            )
        else:
            self.feature_statistics_ = pd.DataFrame()
        self.model_statistics_ = dict(spec.model_statistics)
        self.lr_parameters_ = dict(spec.lr_parameters)
        self.lr_snapshot_ = {}
        self.is_built_ = True
        return self

    def score(self, X: pd.DataFrame) -> pd.Series:
        """Calculate scores for input raw data.

        Args:
            X: Input DataFrame containing raw (un-binned) features.

        Returns:
            pd.Series: Calculated scores for each row.

        Raises:
            ValueError: If the scorecard has not been built.
        """
        if not self.is_built_ or self.scorer_ is None:
            raise ValueError("Scorecard is not built. Call from_model() first.")
        scores = self.scorer_.score(X)
        if self.points_decimals is not None:
            rounded = np.round(scores.to_numpy(dtype=float), self.points_decimals)
            return pd.Series(rounded, index=scores.index, name=scores.name)
        return scores

    def export(self) -> pd.DataFrame:
        """Export the scorecard as a single flat DataFrame.

        Returns:
            pd.DataFrame: A DataFrame containing bin ranges and corresponding points
                for all features.
        """
        if not self.is_built_ or self.spec_ is None:
            raise ValueError("Scorecard is not built. Call from_model() first.")
        return self.spec_.export()

    def to_dict(self) -> Dict[str, object]:
        """Export the scorecard specification as a serializable dictionary.

        Returns:
            Dict[str, object]: The scorecard definition payload.
        """
        if not self.is_built_ or self.spec_ is None:
            raise ValueError("Scorecard is not built. Call from_model() first.")
        payload = self.spec_.to_dict()
        if self.lr_snapshot_:
            payload["lr_snapshot"] = self._normalize_lr_snapshot(self.lr_snapshot_)
        return payload

    def dump(self, path: Union[str, Path]) -> None:
        """Dump scorecard payload to a JSON file."""
        target = Path(path)
        if target.parent and not target.parent.exists():
            target.parent.mkdir(parents=True, exist_ok=True)
        with target.open("w", encoding="utf-8") as file:
            json.dump(self.to_dict(), file, ensure_ascii=False, indent=2)

    @classmethod
    def load(cls, path: Union[str, Path]) -> "Scorecard":
        """Load scorecard from a JSON file."""
        with Path(path).open("r", encoding="utf-8") as file:
            payload = json.load(file)
        return cls().from_dict(payload)

    def to_sql(
        self,
        table_name: str = "input_table",
        score_alias: str = "score",
        include_breakdown: bool = False,
    ) -> str:
        """Render the scorecard as an ANSI SQL scoring query.

        Args:
            table_name: Source table name used in the FROM clause.
            score_alias: Alias of the output score column.
            include_breakdown: Whether to include per-feature points columns.

        Returns:
            str: ANSI SQL query for score calculation.
        """
        if not self.is_built_ or self.spec_ is None:
            raise ValueError("Scorecard is not built. Call from_model() first.")

        return ScorecardSQLBuilder(self.spec_).build(
            table_name=table_name,
            score_alias=score_alias,
            include_breakdown=include_breakdown,
        )

    def summary(self) -> str:
        """Generate a human-readable summary of the scorecard configuration and points.

        Returns:
            str: The summary text.
        """
        if not self.is_built_ or self.spec_ is None:
            raise ValueError("Scorecard is not built. Call from_model() first.")

        lines = [
            "=" * 50,
            "Scorecard Summary",
            "=" * 50,
            f"Base Score: {self.base_score}",
            f"PDO: {self.pdo}",
            f"Base Odds: {self.base_odds:.4f}",
            f"Factor: {self.factor:.4f}",
            f"Offset: {self.offset:.4f}",
            f"Intercept Points: {self.intercept_points_:.2f}",
            f"Number of Features: {len(self.feature_names_)}",
            "-" * 50,
            "Features:",
        ]

        for feature in self.feature_names_:
            if feature in self.scorecard_:
                n_bins = len(self.scorecard_[feature])
                min_pts = self.scorecard_[feature]["points"].min()
                max_pts = self.scorecard_[feature]["points"].max()
                pts_range = f"[{min_pts:.1f}, {max_pts:.1f}]"
                lines.append(f"  {feature}: {n_bins} bins, points range {pts_range}")

        lines.append("=" * 50)
        return "\n".join(lines)

    def _build_enriched_lr_parameters(
        self,
        lr_parameters: Dict[str, object],
        model_coefs: Dict[str, float],
        summary_text: str,
        intercept: float,
    ) -> Dict[str, object]:
        """Build compact scalar LR metadata for ScorecardSpec persistence."""
        enriched: Dict[str, object] = {}
        for key, value in dict(lr_parameters).items():
            normalized = self._as_supported_lr_scalar(value)
            if normalized is None:
                continue
            enriched[str(key)] = normalized

        intercept_value = self._as_finite_float(intercept)
        if intercept_value is not None:
            enriched["intercept"] = intercept_value

        if summary_text:
            enriched["summary_text"] = summary_text

        for feature, coefficient in dict(model_coefs).items():
            numeric = self._as_finite_float(coefficient)
            if numeric is None:
                continue
            enriched[f"coef__{feature}"] = numeric
        return enriched

    def _extract_model_summary_text(self, model: object) -> str:
        """Extract summary text from a fitted model when available."""
        if isinstance(model, dict):
            value = model.get("summary_text")
            return value if isinstance(value, str) else ""
        if hasattr(model, "summary") and callable(model.summary):
            try:
                value = model.summary()
            except Exception:
                return ""
            return value if isinstance(value, str) else str(value)
        return ""

    def _estimate_intercept(self, spec: ScorecardSpec, model: object) -> float:
        """Estimate intercept from model payload or score scaling parameters."""
        if isinstance(model, dict):
            numeric = self._as_finite_float(model.get("intercept"))
            if numeric is not None:
                return numeric
        elif hasattr(model, "to_dict") and callable(model.to_dict):
            try:
                payload = model.to_dict()
            except Exception:
                payload = {}
            if isinstance(payload, dict):
                numeric = self._as_finite_float(payload.get("intercept"))
                if numeric is not None:
                    return numeric

        if spec.factor == 0:
            return 0.0
        return float((spec.offset - spec.intercept_points) / spec.factor)

    def _build_lr_snapshot(
        self,
        spec: ScorecardSpec,
        model_coefs: Dict[str, float],
        feature_statistics: Dict[str, Dict[str, float]],
        model_statistics: Dict[str, float],
    ) -> Dict[str, object]:
        """Build lightweight LR snapshot without training samples."""
        snapshot: Dict[str, object] = {
            "schema_version": self.SERIALIZATION_VERSION,
            "fit_intercept": bool(self.lr_parameters_.get("fit_intercept", True)),
            "method": self.lr_parameters_.get("method"),
            "maxiter": self.lr_parameters_.get("maxiter"),
            "regularization": self.lr_parameters_.get("regularization"),
            "alpha": self.lr_parameters_.get("alpha"),
            "intercept": self._as_finite_float(self.lr_parameters_.get("intercept")),
            "coefficients": {
                str(feature): float(coef)
                for feature, coef in dict(model_coefs).items()
                if self._as_finite_float(coef) is not None
            },
            "feature_names": list(spec.feature_names),
            "feature_statistics": self._normalize_feature_statistics(
                feature_statistics
            ),
            "model_statistics": self._normalize_model_statistics(model_statistics),
            "summary_text": str(self.lr_parameters_.get("summary_text", "") or ""),
        }
        if snapshot["intercept"] is None:
            snapshot["intercept"] = self._estimate_intercept(spec, {})
        return self._normalize_lr_snapshot(snapshot)

    def _normalize_feature_statistics(self, raw: object) -> Dict[str, Dict[str, float]]:
        """Normalize nested feature statistics dictionary."""
        if not isinstance(raw, dict):
            return {}
        output: Dict[str, Dict[str, float]] = {}
        for feature, stats in raw.items():
            if not isinstance(stats, dict):
                continue
            normalized_stats: Dict[str, float] = {}
            for metric, value in stats.items():
                numeric = self._as_finite_float(value)
                if numeric is None:
                    continue
                normalized_stats[str(metric)] = numeric
            if normalized_stats:
                output[str(feature)] = normalized_stats
        return output

    def _normalize_model_statistics(self, raw: object) -> Dict[str, float]:
        """Normalize model-level statistics dictionary."""
        if not isinstance(raw, dict):
            return {}
        output: Dict[str, float] = {}
        for metric, value in raw.items():
            numeric = self._as_finite_float(value)
            if numeric is None:
                continue
            output[str(metric)] = numeric
        return output

    def _normalize_lr_snapshot(self, raw: object) -> Dict[str, object]:
        """Normalize persisted LR snapshot payload."""
        if not isinstance(raw, dict):
            return {}

        coefficients = raw.get("coefficients", {})
        if not isinstance(coefficients, dict):
            coefficients = {}
        feature_names = raw.get("feature_names", [])
        if not isinstance(feature_names, list):
            feature_names = []
        summary_text = raw.get("summary_text", "")
        if not isinstance(summary_text, str):
            summary_text = str(summary_text)

        normalized = {
            "schema_version": int(
                raw.get("schema_version", self.SERIALIZATION_VERSION)
            ),
            "fit_intercept": bool(raw.get("fit_intercept", True)),
            "method": self._as_supported_lr_scalar(raw.get("method")),
            "maxiter": self._as_supported_lr_scalar(raw.get("maxiter")),
            "regularization": self._as_supported_lr_scalar(raw.get("regularization")),
            "alpha": self._as_supported_lr_scalar(raw.get("alpha")),
            "intercept": self._as_finite_float(raw.get("intercept")),
            "coefficients": {
                str(feature): float(value)
                for feature, value in coefficients.items()
                if self._as_finite_float(value) is not None
            },
            "feature_names": [str(feature) for feature in feature_names],
            "feature_statistics": self._normalize_feature_statistics(
                raw.get("feature_statistics", {})
            ),
            "model_statistics": self._normalize_model_statistics(
                raw.get("model_statistics", {})
            ),
            "summary_text": summary_text,
        }
        return normalized

    def _as_supported_lr_scalar(self, value: object) -> Optional[object]:
        """Keep scalar values that are safe to persist in ScorecardSpec."""
        if isinstance(value, bool):
            return bool(value)
        if isinstance(value, int):
            return int(value)
        if isinstance(value, float):
            if not np.isfinite(value):
                return None
            return float(value)
        if isinstance(value, str):
            return value
        return None

    def _as_finite_float(self, value: object) -> Optional[float]:
        """Convert value to finite float when possible."""
        if value is None:
            return None
        try:
            numeric = float(value)
        except (TypeError, ValueError):
            return None
        if not np.isfinite(numeric):
            return None
        return numeric

    def _validate_points_decimals(self, value: Optional[int]) -> Optional[int]:
        """Validate optional score decimal precision."""
        if value is None:
            return None
        if isinstance(value, bool) or not isinstance(value, int):
            raise ValueError("points_decimals must be a non-negative integer or None")
        if value < 0:
            raise ValueError("points_decimals must be a non-negative integer or None")
        return int(value)

    def _normalize_scorecard_spec(self, spec: ScorecardSpec) -> None:
        """Normalize scorecard rows for stable ordering and optional precision."""
        spec.points_decimals = self.points_decimals
        spec.normalize_feature_row_order()
        spec.normalize_points_precision()
Functions
__init__(base_score=SCORECARD.DEFAULT_BASE_SCORE, pdo=SCORECARD.DEFAULT_PDO, base_odds=SCORECARD.DEFAULT_BASE_ODDS, points_decimals=None)

Initialize the Scorecard instance.

Parameters:

Name Type Description Default
base_score int

Target score at the given base_odds.

DEFAULT_BASE_SCORE
pdo int

Points to Double the Odds (PDO).

DEFAULT_PDO
base_odds float

Target odds at the given base_score.

DEFAULT_BASE_ODDS
points_decimals Optional[int]

Optional decimal precision for scorecard points.

None
Source code in src/newt/modeling/scorecard.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def __init__(
    self,
    base_score: int = SCORECARD.DEFAULT_BASE_SCORE,
    pdo: int = SCORECARD.DEFAULT_PDO,
    base_odds: float = SCORECARD.DEFAULT_BASE_ODDS,
    points_decimals: Optional[int] = None,
):
    """Initialize the Scorecard instance.

    Args:
        base_score: Target score at the given base_odds.
        pdo: Points to Double the Odds (PDO).
        base_odds: Target odds at the given base_score.
        points_decimals: Optional decimal precision for scorecard points.
    """
    self.points_decimals = self._validate_points_decimals(points_decimals)
    self.base_score = base_score
    self.pdo = pdo
    self.base_odds = base_odds

    self.factor = pdo / np.log(2)
    self.offset = base_score - self.factor * np.log(base_odds)

    self.scorecard_: Dict[str, pd.DataFrame] = {}
    self.intercept_points_: float = 0.0
    self.feature_names_: List[str] = []
    self.is_built_: bool = False

    self.spec_: Optional[ScorecardSpec] = None
    self.scorer_: Optional[ScorecardScorer] = None
    self._binner = None
    self._model_coefs: Dict[str, float] = {}
    self.feature_statistics_: pd.DataFrame = pd.DataFrame()
    self.model_statistics_: Dict[str, float] = {}
    self.lr_model_: Optional[object] = None
    self.lr_parameters_: Dict[str, object] = {}
    self.lr_snapshot_: Dict[str, object] = {}
dump(path)

Dump scorecard payload to a JSON file.

Source code in src/newt/modeling/scorecard.py
229
230
231
232
233
234
235
def dump(self, path: Union[str, Path]) -> None:
    """Dump scorecard payload to a JSON file."""
    target = Path(path)
    if target.parent and not target.parent.exists():
        target.parent.mkdir(parents=True, exist_ok=True)
    with target.open("w", encoding="utf-8") as file:
        json.dump(self.to_dict(), file, ensure_ascii=False, indent=2)
export()

Export the scorecard as a single flat DataFrame.

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame containing bin ranges and corresponding points for all features.

Source code in src/newt/modeling/scorecard.py
205
206
207
208
209
210
211
212
213
214
def export(self) -> pd.DataFrame:
    """Export the scorecard as a single flat DataFrame.

    Returns:
        pd.DataFrame: A DataFrame containing bin ranges and corresponding points
            for all features.
    """
    if not self.is_built_ or self.spec_ is None:
        raise ValueError("Scorecard is not built. Call from_model() first.")
    return self.spec_.export()
from_dict(payload)

Restore a scorecard from a serialized specification.

Parameters:

Name Type Description Default
payload Dict[str, object]

A dictionary representing a serialized ScorecardSpec.

required

Returns:

Name Type Description
Scorecard Scorecard

The restored Scorecard instance.

Source code in src/newt/modeling/scorecard.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def from_dict(self, payload: Dict[str, object]) -> "Scorecard":
    """Restore a scorecard from a serialized specification.

    Args:
        payload: A dictionary representing a serialized ScorecardSpec.

    Returns:
        Scorecard: The restored Scorecard instance.
    """
    spec = ScorecardSpec.from_dict(payload)
    self.lr_model_ = None
    self._binner = None
    self.points_decimals = self._validate_points_decimals(spec.points_decimals)
    self._normalize_scorecard_spec(spec)
    scorecard = self._load_spec(spec)
    scorecard.lr_snapshot_ = self._normalize_lr_snapshot(payload.get("lr_snapshot"))
    return scorecard
from_model(model, binner, *, keep_training_artifacts=False)

Build a scorecard from a fitted model and its binning/encoding artifacts.

Parameters:

Name Type Description Default
model ScorecardModelInput

A fitted model object (scikit-learn, statsmodels, or dict).

required
binner BinnerArtifact

A fitted Binner instance.

required
keep_training_artifacts bool

Whether to keep direct runtime references to the original model and binner objects.

False

Returns:

Name Type Description
Scorecard Scorecard

The built Scorecard instance.

Examples:

>>> scorecard = Scorecard(base_score=600, pdo=20)
>>> scorecard.from_model(lr_model, binner)
Source code in src/newt/modeling/scorecard.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def from_model(
    self,
    model: ScorecardModelInput,
    binner: BinnerArtifact,
    *,
    keep_training_artifacts: bool = False,
) -> "Scorecard":
    """Build a scorecard from a fitted model and its binning/encoding artifacts.

    Args:
        model: A fitted model object (scikit-learn, statsmodels, or dict).
        binner: A fitted Binner instance.
        keep_training_artifacts: Whether to keep direct runtime references
            to the original model and binner objects.

    Returns:
        Scorecard: The built Scorecard instance.

    Examples:
        >>> scorecard = Scorecard(base_score=600, pdo=20)
        >>> scorecard.from_model(lr_model, binner)
    """
    builder = ScorecardBuilder(
        base_score=self.base_score,
        pdo=self.pdo,
        base_odds=self.base_odds,
    )
    (
        spec,
        model_coefs,
        feature_statistics,
        model_statistics,
        lr_parameters,
    ) = builder.build(model, binner)

    if keep_training_artifacts:
        self._binner = binner
        self.lr_model_ = model if not isinstance(model, dict) else None
    else:
        self._binner = None
        self.lr_model_ = None

    self._model_coefs = dict(model_coefs)
    spec.lr_parameters = self._build_enriched_lr_parameters(
        lr_parameters=lr_parameters,
        model_coefs=model_coefs,
        summary_text=self._extract_model_summary_text(model),
        intercept=self._estimate_intercept(spec, model),
    )
    spec.points_decimals = self.points_decimals
    self._normalize_scorecard_spec(spec)
    scorecard = self._load_spec(spec)
    scorecard.lr_snapshot_ = self._build_lr_snapshot(
        spec=spec,
        model_coefs=model_coefs,
        feature_statistics=feature_statistics,
        model_statistics=model_statistics,
    )
    return scorecard
load(path) classmethod

Load scorecard from a JSON file.

Source code in src/newt/modeling/scorecard.py
237
238
239
240
241
242
@classmethod
def load(cls, path: Union[str, Path]) -> "Scorecard":
    """Load scorecard from a JSON file."""
    with Path(path).open("r", encoding="utf-8") as file:
        payload = json.load(file)
    return cls().from_dict(payload)
score(X)

Calculate scores for input raw data.

Parameters:

Name Type Description Default
X DataFrame

Input DataFrame containing raw (un-binned) features.

required

Returns:

Type Description
Series

pd.Series: Calculated scores for each row.

Raises:

Type Description
ValueError

If the scorecard has not been built.

Source code in src/newt/modeling/scorecard.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def score(self, X: pd.DataFrame) -> pd.Series:
    """Calculate scores for input raw data.

    Args:
        X: Input DataFrame containing raw (un-binned) features.

    Returns:
        pd.Series: Calculated scores for each row.

    Raises:
        ValueError: If the scorecard has not been built.
    """
    if not self.is_built_ or self.scorer_ is None:
        raise ValueError("Scorecard is not built. Call from_model() first.")
    scores = self.scorer_.score(X)
    if self.points_decimals is not None:
        rounded = np.round(scores.to_numpy(dtype=float), self.points_decimals)
        return pd.Series(rounded, index=scores.index, name=scores.name)
    return scores
summary()

Generate a human-readable summary of the scorecard configuration and points.

Returns:

Name Type Description
str str

The summary text.

Source code in src/newt/modeling/scorecard.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def summary(self) -> str:
    """Generate a human-readable summary of the scorecard configuration and points.

    Returns:
        str: The summary text.
    """
    if not self.is_built_ or self.spec_ is None:
        raise ValueError("Scorecard is not built. Call from_model() first.")

    lines = [
        "=" * 50,
        "Scorecard Summary",
        "=" * 50,
        f"Base Score: {self.base_score}",
        f"PDO: {self.pdo}",
        f"Base Odds: {self.base_odds:.4f}",
        f"Factor: {self.factor:.4f}",
        f"Offset: {self.offset:.4f}",
        f"Intercept Points: {self.intercept_points_:.2f}",
        f"Number of Features: {len(self.feature_names_)}",
        "-" * 50,
        "Features:",
    ]

    for feature in self.feature_names_:
        if feature in self.scorecard_:
            n_bins = len(self.scorecard_[feature])
            min_pts = self.scorecard_[feature]["points"].min()
            max_pts = self.scorecard_[feature]["points"].max()
            pts_range = f"[{min_pts:.1f}, {max_pts:.1f}]"
            lines.append(f"  {feature}: {n_bins} bins, points range {pts_range}")

    lines.append("=" * 50)
    return "\n".join(lines)
to_dict()

Export the scorecard specification as a serializable dictionary.

Returns:

Type Description
Dict[str, object]

Dict[str, object]: The scorecard definition payload.

Source code in src/newt/modeling/scorecard.py
216
217
218
219
220
221
222
223
224
225
226
227
def to_dict(self) -> Dict[str, object]:
    """Export the scorecard specification as a serializable dictionary.

    Returns:
        Dict[str, object]: The scorecard definition payload.
    """
    if not self.is_built_ or self.spec_ is None:
        raise ValueError("Scorecard is not built. Call from_model() first.")
    payload = self.spec_.to_dict()
    if self.lr_snapshot_:
        payload["lr_snapshot"] = self._normalize_lr_snapshot(self.lr_snapshot_)
    return payload
to_sql(table_name='input_table', score_alias='score', include_breakdown=False)

Render the scorecard as an ANSI SQL scoring query.

Parameters:

Name Type Description Default
table_name str

Source table name used in the FROM clause.

'input_table'
score_alias str

Alias of the output score column.

'score'
include_breakdown bool

Whether to include per-feature points columns.

False

Returns:

Name Type Description
str str

ANSI SQL query for score calculation.

Source code in src/newt/modeling/scorecard.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def to_sql(
    self,
    table_name: str = "input_table",
    score_alias: str = "score",
    include_breakdown: bool = False,
) -> str:
    """Render the scorecard as an ANSI SQL scoring query.

    Args:
        table_name: Source table name used in the FROM clause.
        score_alias: Alias of the output score column.
        include_breakdown: Whether to include per-feature points columns.

    Returns:
        str: ANSI SQL query for score calculation.
    """
    if not self.is_built_ or self.spec_ is None:
        raise ValueError("Scorecard is not built. Call from_model() first.")

    return ScorecardSQLBuilder(self.spec_).build(
        table_name=table_name,
        score_alias=score_alias,
        include_breakdown=include_breakdown,
    )

Pipeline

newt.pipeline.pipeline

Scorecard pipeline implemented as a thin coordinator around step objects.

Classes

ScorecardPipeline

Chainable pipeline for end-to-end credit scorecard development.

The ScorecardPipeline provides a fluent, high-level API to orchestrate the entire modeling workflow—from initial feature filtering to final scorecard generation. It manages internal state transitions and provides access to intermediate artifacts (e.g., binning results, WOE encoders) at each step.

Examples:

>>> from newt.pipeline import ScorecardPipeline
>>> pipeline = (
...     ScorecardPipeline(X_train, y_train, X_test, y_test)
...     .prefilter(iv_threshold=0.02)
...     .bin(method='chi', n_bins=5)
...     .woe_transform()
...     .postfilter(psi_threshold=0.1)
...     .build_model()
...     .generate_scorecard(base_score=600, pdo=20)
... )
>>> scores = pipeline.score(X_val)
Source code in src/newt/pipeline/pipeline.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
class ScorecardPipeline:
    """Chainable pipeline for end-to-end credit scorecard development.

    The ScorecardPipeline provides a fluent, high-level API to orchestrate the entire
    modeling workflow—from initial feature filtering to final scorecard generation.
    It manages internal state transitions and provides access to intermediate artifacts
    (e.g., binning results, WOE encoders) at each step.

    Examples:
        >>> from newt.pipeline import ScorecardPipeline
        >>> pipeline = (
        ...     ScorecardPipeline(X_train, y_train, X_test, y_test)
        ...     .prefilter(iv_threshold=0.02)
        ...     .bin(method='chi', n_bins=5)
        ...     .woe_transform()
        ...     .postfilter(psi_threshold=0.1)
        ...     .build_model()
        ...     .generate_scorecard(base_score=600, pdo=20)
        ... )
        >>> scores = pipeline.score(X_val)
    """

    def __init__(
        self,
        X: pd.DataFrame,
        y: pd.Series,
        X_test: Optional[pd.DataFrame] = None,
        y_test: Optional[pd.Series] = None,
    ):
        """Initialize the pipeline with training and optional testing data.

        Args:
            X: Training feature DataFrame.
            y: Training target Series (binary 0/1).
            X_test: Optional testing feature DataFrame for validation and
                PSI calculation.
            y_test: Optional testing target Series.
        """
        self._state = PipelineState(X, y, X_test, y_test)

    def prefilter(
        self,
        iv_threshold: float = FILTERING.DEFAULT_IV_THRESHOLD,
        missing_threshold: float = FILTERING.DEFAULT_MISSING_THRESHOLD,
        corr_threshold: float = FILTERING.DEFAULT_CORR_THRESHOLD,
        iv_bins: int = BINNING.DEFAULT_BUCKETS,
        **kwargs,
    ) -> "ScorecardPipeline":
        """Apply pre-modeling filters based on EDA metrics.

        Filters features using Information Value (IV), missing rate, and
        feature-to-feature correlation. This step is typically the first in
        the pipeline to reduce dimensionality before expensive operations
        like binning.

        Args:
            iv_threshold: Minimum IV required to keep a feature.
            missing_threshold: Maximum allowed missing rate (0.0 to 1.0).
            corr_threshold: Maximum allowed correlation between feature pairs.
            iv_bins: Number of buckets used for temporary auto-binning
                during IV compute.
            **kwargs: Additional arguments passed to FeatureSelector.

        Returns:
            ScorecardPipeline: The pipeline instance (self) for chaining.

        Examples:
            >>> pipeline.prefilter(iv_threshold=0.05, corr_threshold=0.7)
        """
        step = PrefilterStep(
            iv_threshold=iv_threshold,
            missing_threshold=missing_threshold,
            corr_threshold=corr_threshold,
            iv_bins=iv_bins,
            **kwargs,
        )
        self._state = step.run(self._state)
        return self

    def bin(
        self,
        method: str = "chi",
        n_bins: int = BINNING.DEFAULT_N_BINS,
        cols: Optional[List[str]] = None,
        **kwargs,
    ) -> "ScorecardPipeline":
        """Discretize continuous variables into discrete bins.

        Supported methods include 'chi' (ChiMerge), 'dt' (Decision Tree),
        'opt' (Optimal), 'quantile' (Equal Frequency), 'step' (Equal Width),
        and 'kmean'.

        Args:
            method: Binning algorithm name. Defaults to 'chi'.
            n_bins: Target number of bins for each feature.
            cols: Optional list of features to bin. If None, all numeric
                features are used.
            **kwargs: Additional parameters for the chosen binner (e.g.,
                monotonic=True).

        Returns:
            ScorecardPipeline: The pipeline instance (self) for chaining.

        Examples:
            >>> pipeline.bin(method='opt', n_bins=5, monotonic='auto')
        """
        step = BinningStep(method=method, n_bins=n_bins, cols=cols, **kwargs)
        self._state = step.run(self._state)
        return self

    def woe_transform(
        self,
        epsilon: float = BINNING.DEFAULT_EPSILON,
        **kwargs,
    ) -> "ScorecardPipeline":
        """Apply Weight of Evidence (WOE) encoding to binned features.

        Converts binned categorical/ordinal values into numeric WOE values based on the
        distribution of good and bad labels in each bin.

        Args:
            epsilon: Small constant to prevent log(0) or division by zero.
            **kwargs: Additional arguments passed to WOEEncoder.

        Returns:
            ScorecardPipeline: The pipeline instance (self) for chaining.

        Examples:
            >>> pipeline.woe_transform(epsilon=1e-10)
        """
        step = WoeTransformStep(epsilon=epsilon, **kwargs)
        self._state = step.run(self._state)
        return self

    def postfilter(
        self,
        psi_threshold: float = FILTERING.DEFAULT_PSI_THRESHOLD,
        vif_threshold: float = FILTERING.DEFAULT_VIF_THRESHOLD,
        X_test: Optional[pd.DataFrame] = None,
        **kwargs,
    ) -> "ScorecardPipeline":
        """Apply post-transformation filters like PSI stability and VIF
        multicollinearity.

        Typically run after WOE transformation to ensure the selected features are
        stable over time (PSI) and not redundant (VIF).

        Args:
            psi_threshold: Maximum allowed Population Stability Index
                between train/test.
            vif_threshold: Maximum allowed Variance Inflation Factor.
            X_test: Optional override for the test set used for PSI compute.
            **kwargs: Additional parameters passed to PostFilter.

        Returns:
            ScorecardPipeline: The pipeline instance (self) for chaining.

        Examples:
            >>> pipeline.postfilter(psi_threshold=0.1, vif_threshold=5.0)
        """
        step = PostfilterStep(
            psi_threshold=psi_threshold,
            vif_threshold=vif_threshold,
            X_test=X_test,
            **kwargs,
        )
        self._state = step.run(self._state)
        return self

    def stepwise(
        self,
        direction: str = "both",
        criterion: str = "aic",
        p_enter: float = MODELING.DEFAULT_P_ENTER,
        p_remove: float = MODELING.DEFAULT_P_REMOVE,
        exclude: Optional[List[str]] = None,
        **kwargs,
    ) -> "ScorecardPipeline":
        """Perform automated feature selection via stepwise regression.

        Successively adds or removes features based on statistical significance or
        information criteria (AIC/BIC).

        Args:
            direction: Search direction: 'forward', 'backward', or 'both'.
            criterion: Selection criterion: 'p-value', 'aic', or 'bic'.
            p_enter: P-value threshold to enter the model (if using 'p-value').
            p_remove: P-value threshold to be removed from the model.
            exclude: Optional list of features to always keep in the model.
            **kwargs: Additional parameters passed to StepwiseSelector.

        Returns:
            ScorecardPipeline: The pipeline instance (self) for chaining.

        Examples:
            >>> pipeline.stepwise(direction='both', criterion='aic')
        """
        step = StepwiseStep(
            direction=direction,
            criterion=criterion,
            p_enter=p_enter,
            p_remove=p_remove,
            exclude=exclude,
            **kwargs,
        )
        self._state = step.run(self._state)
        return self

    def build_model(
        self,
        fit_intercept: bool = True,
        **kwargs,
    ) -> "ScorecardPipeline":
        """Train the final logistic regression model on selected WOE features.

        Args:
            fit_intercept: Whether to calculate the intercept for this model.
            **kwargs: Additional parameters passed to LogisticModel.

        Returns:
            ScorecardPipeline: The pipeline instance (self) for chaining.

        Examples:
            >>> pipeline.build_model(method='bfgs')
        """
        step = ModelingStep(fit_intercept=fit_intercept, **kwargs)
        self._state = step.run(self._state)
        return self

    def generate_scorecard(
        self,
        base_score: int = SCORECARD.DEFAULT_BASE_SCORE,
        pdo: int = SCORECARD.DEFAULT_PDO,
        base_odds: float = SCORECARD.DEFAULT_BASE_ODDS,
        points_decimals: Optional[int] = None,
        **kwargs,
    ) -> "ScorecardPipeline":
        """Convert the fitted logistic model into a point-based scorecard.

        Args:
            base_score: The target score at 'base_odds'.
            pdo: Points to Double the Odds.
            base_odds: The odds (Good:Bad) at 'base_score'.
            points_decimals: Optional decimal precision for scorecard points.
            **kwargs: Additional parameters passed to Scorecard.

        Returns:
            ScorecardPipeline: The pipeline instance (self) for chaining.

        Examples:
            >>> pipeline.generate_scorecard(base_score=600, pdo=20)
        """
        step = ScorecardStep(
            base_score=base_score,
            pdo=pdo,
            base_odds=base_odds,
            points_decimals=points_decimals,
            **kwargs,
        )
        self._state = step.run(self._state)
        return self

    def score(self, X: pd.DataFrame) -> pd.Series:
        """Apply the finished scorecard to new raw data to produce scores.

        Args:
            X: Raw feature DataFrame (un-binned, un-encoded).

        Returns:
            pd.Series: Calculated scores for each row.

        Raises:
            ValueError: If the scorecard has not been generated yet.
        """
        if self.scorecard_ is None:
            raise ValueError("Scorecard not built. Call generate_scorecard() first.")
        return self.scorecard_.score(X)

    @property
    def X_train(self) -> pd.DataFrame:
        return self._state.X_train

    @property
    def y_train(self) -> pd.Series:
        return self._state.y_train

    @property
    def X_test(self) -> Optional[pd.DataFrame]:
        return self._state.X_test

    @property
    def y_test(self) -> Optional[pd.Series]:
        """Get the test target series."""
        return self._state.y_test

    @property
    def X_current(self) -> pd.DataFrame:
        """Get the current training feature set (after transformations)."""
        return self._state.X_current

    @property
    def X_test_current(self) -> Optional[pd.DataFrame]:
        """Get the current test feature set (after transformations)."""
        return self._state.X_test_current

    @property
    def steps_(self) -> List[str]:
        """List of step names that have been executed."""
        return self._state.steps

    @property
    def prefilter_(self) -> Optional["FeatureSelector"]:
        """The FeatureSelector instance from the prefilter step."""
        return self._state.prefilter

    @prefilter_.setter
    def prefilter_(self, value: Optional["FeatureSelector"]) -> None:
        self._state.prefilter = value

    @property
    def binner_(self) -> Optional["Binner"]:
        """The Binner instance from the bin step."""
        return self._state.binner

    @binner_.setter
    def binner_(self, value: Optional["Binner"]) -> None:
        self._state.binner = value

    @property
    def woe_encoders_(self) -> Dict[str, "WOEEncoder"]:
        """Dictionary mapping feature names to WOEEncoder instances."""
        return self._state.woe_encoders

    @woe_encoders_.setter
    def woe_encoders_(self, value: Dict[str, "WOEEncoder"]) -> None:
        self._state.woe_encoders = value

    @property
    def postfilter_(self) -> Optional["PostFilter"]:
        """The PostFilter instance from the postfilter step."""
        return self._state.postfilter

    @postfilter_.setter
    def postfilter_(self, value: Optional["PostFilter"]) -> None:
        self._state.postfilter = value

    @property
    def stepwise_(self) -> Optional["StepwiseSelector"]:
        """The StepwiseSelector instance from the stepwise step."""
        return self._state.stepwise

    @stepwise_.setter
    def stepwise_(self, value: Optional["StepwiseSelector"]) -> None:
        self._state.stepwise = value

    @property
    def model_(self) -> Optional["LogisticModel"]:
        """The fitted LogisticModel instance."""
        return self._state.model

    @model_.setter
    def model_(self, value: Optional["LogisticModel"]) -> None:
        self._state.model = value

    @property
    def scorecard_(self) -> Optional["Scorecard"]:
        """The generated Scorecard instance."""
        return self._state.scorecard

    @scorecard_.setter
    def scorecard_(self, value: Optional["Scorecard"]) -> None:
        self._state.scorecard = value

    @property
    def X_binned_(self) -> Optional[pd.DataFrame]:
        """Training data after binning transformation."""
        return self._state.X_binned

    @X_binned_.setter
    def X_binned_(self, value: Optional[pd.DataFrame]) -> None:
        self._state.X_binned = value

    @property
    def X_woe_(self) -> Optional[pd.DataFrame]:
        """Training data after WOE transformation."""
        return self._state.X_woe

    @X_woe_.setter
    def X_woe_(self, value: Optional[pd.DataFrame]) -> None:
        self._state.X_woe = value

    @property
    def X_test_binned_(self) -> Optional[pd.DataFrame]:
        return self._state.X_test_binned

    @X_test_binned_.setter
    def X_test_binned_(self, value: Optional[pd.DataFrame]) -> None:
        self._state.X_test_binned = value

    @property
    def X_test_woe_(self) -> Optional[pd.DataFrame]:
        return self._state.X_test_woe

    @X_test_woe_.setter
    def X_test_woe_(self, value: Optional[pd.DataFrame]) -> None:
        self._state.X_test_woe = value

    @property
    def prefilter_result(self) -> Optional["FeatureSelector"]:
        """Alias for prefilter_."""
        return self.prefilter_

    @property
    def binner(self) -> Optional["Binner"]:
        """Alias for binner_."""
        return self.binner_

    @property
    def woe_encoders(self) -> Dict[str, "WOEEncoder"]:
        """Alias for woe_encoders_."""
        return self.woe_encoders_

    @property
    def postfilter_result(self) -> Optional["PostFilter"]:
        """Alias for postfilter_."""
        return self.postfilter_

    @property
    def model(self) -> Optional["LogisticModel"]:
        """Alias for model_."""
        return self.model_

    @property
    def scorecard(self) -> Optional["Scorecard"]:
        """Alias for scorecard_."""
        return self.scorecard_

    @property
    def selected_features(self) -> List[str]:
        """Get the list of features currently selected in the pipeline."""
        return self._state.selected_features

    def summary(self) -> Dict[str, object]:
        """Get pipeline summary."""
        summary = {
            "steps": self.steps_,
            "n_features_initial": len(self.X_train.columns),
            "n_features_final": len(self.X_current.columns),
            "selected_features": self.selected_features,
        }

        if self.prefilter_ is not None:
            summary["prefilter_selected"] = len(self.prefilter_.selected_features_)
            summary["prefilter_removed"] = len(self.prefilter_.removed_features_)

        if self.postfilter_ is not None:
            summary["postfilter_selected"] = len(self.postfilter_.selected_features_)
            summary["postfilter_removed"] = len(self.postfilter_.removed_features_)

        if self.model_ is not None:
            summary["model_fitted"] = True

        if self.scorecard_ is not None:
            summary["scorecard_built"] = True

        return summary
Attributes
X_binned_ property writable

Training data after binning transformation.

X_current property

Get the current training feature set (after transformations).

X_test_current property

Get the current test feature set (after transformations).

X_woe_ property writable

Training data after WOE transformation.

binner property

Alias for binner_.

binner_ property writable

The Binner instance from the bin step.

model property

Alias for model_.

model_ property writable

The fitted LogisticModel instance.

postfilter_ property writable

The PostFilter instance from the postfilter step.

postfilter_result property

Alias for postfilter_.

prefilter_ property writable

The FeatureSelector instance from the prefilter step.

prefilter_result property

Alias for prefilter_.

scorecard property

Alias for scorecard_.

scorecard_ property writable

The generated Scorecard instance.

selected_features property

Get the list of features currently selected in the pipeline.

steps_ property

List of step names that have been executed.

stepwise_ property writable

The StepwiseSelector instance from the stepwise step.

woe_encoders property

Alias for woe_encoders_.

woe_encoders_ property writable

Dictionary mapping feature names to WOEEncoder instances.

y_test property

Get the test target series.

Functions
__init__(X, y, X_test=None, y_test=None)

Initialize the pipeline with training and optional testing data.

Parameters:

Name Type Description Default
X DataFrame

Training feature DataFrame.

required
y Series

Training target Series (binary 0/1).

required
X_test Optional[DataFrame]

Optional testing feature DataFrame for validation and PSI calculation.

None
y_test Optional[Series]

Optional testing target Series.

None
Source code in src/newt/pipeline/pipeline.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    X: pd.DataFrame,
    y: pd.Series,
    X_test: Optional[pd.DataFrame] = None,
    y_test: Optional[pd.Series] = None,
):
    """Initialize the pipeline with training and optional testing data.

    Args:
        X: Training feature DataFrame.
        y: Training target Series (binary 0/1).
        X_test: Optional testing feature DataFrame for validation and
            PSI calculation.
        y_test: Optional testing target Series.
    """
    self._state = PipelineState(X, y, X_test, y_test)
bin(method='chi', n_bins=BINNING.DEFAULT_N_BINS, cols=None, **kwargs)

Discretize continuous variables into discrete bins.

Supported methods include 'chi' (ChiMerge), 'dt' (Decision Tree), 'opt' (Optimal), 'quantile' (Equal Frequency), 'step' (Equal Width), and 'kmean'.

Parameters:

Name Type Description Default
method str

Binning algorithm name. Defaults to 'chi'.

'chi'
n_bins int

Target number of bins for each feature.

DEFAULT_N_BINS
cols Optional[List[str]]

Optional list of features to bin. If None, all numeric features are used.

None
**kwargs

Additional parameters for the chosen binner (e.g., monotonic=True).

{}

Returns:

Name Type Description
ScorecardPipeline 'ScorecardPipeline'

The pipeline instance (self) for chaining.

Examples:

>>> pipeline.bin(method='opt', n_bins=5, monotonic='auto')
Source code in src/newt/pipeline/pipeline.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def bin(
    self,
    method: str = "chi",
    n_bins: int = BINNING.DEFAULT_N_BINS,
    cols: Optional[List[str]] = None,
    **kwargs,
) -> "ScorecardPipeline":
    """Discretize continuous variables into discrete bins.

    Supported methods include 'chi' (ChiMerge), 'dt' (Decision Tree),
    'opt' (Optimal), 'quantile' (Equal Frequency), 'step' (Equal Width),
    and 'kmean'.

    Args:
        method: Binning algorithm name. Defaults to 'chi'.
        n_bins: Target number of bins for each feature.
        cols: Optional list of features to bin. If None, all numeric
            features are used.
        **kwargs: Additional parameters for the chosen binner (e.g.,
            monotonic=True).

    Returns:
        ScorecardPipeline: The pipeline instance (self) for chaining.

    Examples:
        >>> pipeline.bin(method='opt', n_bins=5, monotonic='auto')
    """
    step = BinningStep(method=method, n_bins=n_bins, cols=cols, **kwargs)
    self._state = step.run(self._state)
    return self
build_model(fit_intercept=True, **kwargs)

Train the final logistic regression model on selected WOE features.

Parameters:

Name Type Description Default
fit_intercept bool

Whether to calculate the intercept for this model.

True
**kwargs

Additional parameters passed to LogisticModel.

{}

Returns:

Name Type Description
ScorecardPipeline 'ScorecardPipeline'

The pipeline instance (self) for chaining.

Examples:

>>> pipeline.build_model(method='bfgs')
Source code in src/newt/pipeline/pipeline.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def build_model(
    self,
    fit_intercept: bool = True,
    **kwargs,
) -> "ScorecardPipeline":
    """Train the final logistic regression model on selected WOE features.

    Args:
        fit_intercept: Whether to calculate the intercept for this model.
        **kwargs: Additional parameters passed to LogisticModel.

    Returns:
        ScorecardPipeline: The pipeline instance (self) for chaining.

    Examples:
        >>> pipeline.build_model(method='bfgs')
    """
    step = ModelingStep(fit_intercept=fit_intercept, **kwargs)
    self._state = step.run(self._state)
    return self
generate_scorecard(base_score=SCORECARD.DEFAULT_BASE_SCORE, pdo=SCORECARD.DEFAULT_PDO, base_odds=SCORECARD.DEFAULT_BASE_ODDS, points_decimals=None, **kwargs)

Convert the fitted logistic model into a point-based scorecard.

Parameters:

Name Type Description Default
base_score int

The target score at 'base_odds'.

DEFAULT_BASE_SCORE
pdo int

Points to Double the Odds.

DEFAULT_PDO
base_odds float

The odds (Good:Bad) at 'base_score'.

DEFAULT_BASE_ODDS
points_decimals Optional[int]

Optional decimal precision for scorecard points.

None
**kwargs

Additional parameters passed to Scorecard.

{}

Returns:

Name Type Description
ScorecardPipeline 'ScorecardPipeline'

The pipeline instance (self) for chaining.

Examples:

>>> pipeline.generate_scorecard(base_score=600, pdo=20)
Source code in src/newt/pipeline/pipeline.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def generate_scorecard(
    self,
    base_score: int = SCORECARD.DEFAULT_BASE_SCORE,
    pdo: int = SCORECARD.DEFAULT_PDO,
    base_odds: float = SCORECARD.DEFAULT_BASE_ODDS,
    points_decimals: Optional[int] = None,
    **kwargs,
) -> "ScorecardPipeline":
    """Convert the fitted logistic model into a point-based scorecard.

    Args:
        base_score: The target score at 'base_odds'.
        pdo: Points to Double the Odds.
        base_odds: The odds (Good:Bad) at 'base_score'.
        points_decimals: Optional decimal precision for scorecard points.
        **kwargs: Additional parameters passed to Scorecard.

    Returns:
        ScorecardPipeline: The pipeline instance (self) for chaining.

    Examples:
        >>> pipeline.generate_scorecard(base_score=600, pdo=20)
    """
    step = ScorecardStep(
        base_score=base_score,
        pdo=pdo,
        base_odds=base_odds,
        points_decimals=points_decimals,
        **kwargs,
    )
    self._state = step.run(self._state)
    return self
postfilter(psi_threshold=FILTERING.DEFAULT_PSI_THRESHOLD, vif_threshold=FILTERING.DEFAULT_VIF_THRESHOLD, X_test=None, **kwargs)

Apply post-transformation filters like PSI stability and VIF multicollinearity.

Typically run after WOE transformation to ensure the selected features are stable over time (PSI) and not redundant (VIF).

Parameters:

Name Type Description Default
psi_threshold float

Maximum allowed Population Stability Index between train/test.

DEFAULT_PSI_THRESHOLD
vif_threshold float

Maximum allowed Variance Inflation Factor.

DEFAULT_VIF_THRESHOLD
X_test Optional[DataFrame]

Optional override for the test set used for PSI compute.

None
**kwargs

Additional parameters passed to PostFilter.

{}

Returns:

Name Type Description
ScorecardPipeline 'ScorecardPipeline'

The pipeline instance (self) for chaining.

Examples:

>>> pipeline.postfilter(psi_threshold=0.1, vif_threshold=5.0)
Source code in src/newt/pipeline/pipeline.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def postfilter(
    self,
    psi_threshold: float = FILTERING.DEFAULT_PSI_THRESHOLD,
    vif_threshold: float = FILTERING.DEFAULT_VIF_THRESHOLD,
    X_test: Optional[pd.DataFrame] = None,
    **kwargs,
) -> "ScorecardPipeline":
    """Apply post-transformation filters like PSI stability and VIF
    multicollinearity.

    Typically run after WOE transformation to ensure the selected features are
    stable over time (PSI) and not redundant (VIF).

    Args:
        psi_threshold: Maximum allowed Population Stability Index
            between train/test.
        vif_threshold: Maximum allowed Variance Inflation Factor.
        X_test: Optional override for the test set used for PSI compute.
        **kwargs: Additional parameters passed to PostFilter.

    Returns:
        ScorecardPipeline: The pipeline instance (self) for chaining.

    Examples:
        >>> pipeline.postfilter(psi_threshold=0.1, vif_threshold=5.0)
    """
    step = PostfilterStep(
        psi_threshold=psi_threshold,
        vif_threshold=vif_threshold,
        X_test=X_test,
        **kwargs,
    )
    self._state = step.run(self._state)
    return self
prefilter(iv_threshold=FILTERING.DEFAULT_IV_THRESHOLD, missing_threshold=FILTERING.DEFAULT_MISSING_THRESHOLD, corr_threshold=FILTERING.DEFAULT_CORR_THRESHOLD, iv_bins=BINNING.DEFAULT_BUCKETS, **kwargs)

Apply pre-modeling filters based on EDA metrics.

Filters features using Information Value (IV), missing rate, and feature-to-feature correlation. This step is typically the first in the pipeline to reduce dimensionality before expensive operations like binning.

Parameters:

Name Type Description Default
iv_threshold float

Minimum IV required to keep a feature.

DEFAULT_IV_THRESHOLD
missing_threshold float

Maximum allowed missing rate (0.0 to 1.0).

DEFAULT_MISSING_THRESHOLD
corr_threshold float

Maximum allowed correlation between feature pairs.

DEFAULT_CORR_THRESHOLD
iv_bins int

Number of buckets used for temporary auto-binning during IV compute.

DEFAULT_BUCKETS
**kwargs

Additional arguments passed to FeatureSelector.

{}

Returns:

Name Type Description
ScorecardPipeline 'ScorecardPipeline'

The pipeline instance (self) for chaining.

Examples:

>>> pipeline.prefilter(iv_threshold=0.05, corr_threshold=0.7)
Source code in src/newt/pipeline/pipeline.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def prefilter(
    self,
    iv_threshold: float = FILTERING.DEFAULT_IV_THRESHOLD,
    missing_threshold: float = FILTERING.DEFAULT_MISSING_THRESHOLD,
    corr_threshold: float = FILTERING.DEFAULT_CORR_THRESHOLD,
    iv_bins: int = BINNING.DEFAULT_BUCKETS,
    **kwargs,
) -> "ScorecardPipeline":
    """Apply pre-modeling filters based on EDA metrics.

    Filters features using Information Value (IV), missing rate, and
    feature-to-feature correlation. This step is typically the first in
    the pipeline to reduce dimensionality before expensive operations
    like binning.

    Args:
        iv_threshold: Minimum IV required to keep a feature.
        missing_threshold: Maximum allowed missing rate (0.0 to 1.0).
        corr_threshold: Maximum allowed correlation between feature pairs.
        iv_bins: Number of buckets used for temporary auto-binning
            during IV compute.
        **kwargs: Additional arguments passed to FeatureSelector.

    Returns:
        ScorecardPipeline: The pipeline instance (self) for chaining.

    Examples:
        >>> pipeline.prefilter(iv_threshold=0.05, corr_threshold=0.7)
    """
    step = PrefilterStep(
        iv_threshold=iv_threshold,
        missing_threshold=missing_threshold,
        corr_threshold=corr_threshold,
        iv_bins=iv_bins,
        **kwargs,
    )
    self._state = step.run(self._state)
    return self
score(X)

Apply the finished scorecard to new raw data to produce scores.

Parameters:

Name Type Description Default
X DataFrame

Raw feature DataFrame (un-binned, un-encoded).

required

Returns:

Type Description
Series

pd.Series: Calculated scores for each row.

Raises:

Type Description
ValueError

If the scorecard has not been generated yet.

Source code in src/newt/pipeline/pipeline.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
def score(self, X: pd.DataFrame) -> pd.Series:
    """Apply the finished scorecard to new raw data to produce scores.

    Args:
        X: Raw feature DataFrame (un-binned, un-encoded).

    Returns:
        pd.Series: Calculated scores for each row.

    Raises:
        ValueError: If the scorecard has not been generated yet.
    """
    if self.scorecard_ is None:
        raise ValueError("Scorecard not built. Call generate_scorecard() first.")
    return self.scorecard_.score(X)
stepwise(direction='both', criterion='aic', p_enter=MODELING.DEFAULT_P_ENTER, p_remove=MODELING.DEFAULT_P_REMOVE, exclude=None, **kwargs)

Perform automated feature selection via stepwise regression.

Successively adds or removes features based on statistical significance or information criteria (AIC/BIC).

Parameters:

Name Type Description Default
direction str

Search direction: 'forward', 'backward', or 'both'.

'both'
criterion str

Selection criterion: 'p-value', 'aic', or 'bic'.

'aic'
p_enter float

P-value threshold to enter the model (if using 'p-value').

DEFAULT_P_ENTER
p_remove float

P-value threshold to be removed from the model.

DEFAULT_P_REMOVE
exclude Optional[List[str]]

Optional list of features to always keep in the model.

None
**kwargs

Additional parameters passed to StepwiseSelector.

{}

Returns:

Name Type Description
ScorecardPipeline 'ScorecardPipeline'

The pipeline instance (self) for chaining.

Examples:

>>> pipeline.stepwise(direction='both', criterion='aic')
Source code in src/newt/pipeline/pipeline.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def stepwise(
    self,
    direction: str = "both",
    criterion: str = "aic",
    p_enter: float = MODELING.DEFAULT_P_ENTER,
    p_remove: float = MODELING.DEFAULT_P_REMOVE,
    exclude: Optional[List[str]] = None,
    **kwargs,
) -> "ScorecardPipeline":
    """Perform automated feature selection via stepwise regression.

    Successively adds or removes features based on statistical significance or
    information criteria (AIC/BIC).

    Args:
        direction: Search direction: 'forward', 'backward', or 'both'.
        criterion: Selection criterion: 'p-value', 'aic', or 'bic'.
        p_enter: P-value threshold to enter the model (if using 'p-value').
        p_remove: P-value threshold to be removed from the model.
        exclude: Optional list of features to always keep in the model.
        **kwargs: Additional parameters passed to StepwiseSelector.

    Returns:
        ScorecardPipeline: The pipeline instance (self) for chaining.

    Examples:
        >>> pipeline.stepwise(direction='both', criterion='aic')
    """
    step = StepwiseStep(
        direction=direction,
        criterion=criterion,
        p_enter=p_enter,
        p_remove=p_remove,
        exclude=exclude,
        **kwargs,
    )
    self._state = step.run(self._state)
    return self
summary()

Get pipeline summary.

Source code in src/newt/pipeline/pipeline.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
def summary(self) -> Dict[str, object]:
    """Get pipeline summary."""
    summary = {
        "steps": self.steps_,
        "n_features_initial": len(self.X_train.columns),
        "n_features_final": len(self.X_current.columns),
        "selected_features": self.selected_features,
    }

    if self.prefilter_ is not None:
        summary["prefilter_selected"] = len(self.prefilter_.selected_features_)
        summary["prefilter_removed"] = len(self.prefilter_.removed_features_)

    if self.postfilter_ is not None:
        summary["postfilter_selected"] = len(self.postfilter_.selected_features_)
        summary["postfilter_removed"] = len(self.postfilter_.removed_features_)

    if self.model_ is not None:
        summary["model_fitted"] = True

    if self.scorecard_ is not None:
        summary["scorecard_built"] = True

    return summary
woe_transform(epsilon=BINNING.DEFAULT_EPSILON, **kwargs)

Apply Weight of Evidence (WOE) encoding to binned features.

Converts binned categorical/ordinal values into numeric WOE values based on the distribution of good and bad labels in each bin.

Parameters:

Name Type Description Default
epsilon float

Small constant to prevent log(0) or division by zero.

DEFAULT_EPSILON
**kwargs

Additional arguments passed to WOEEncoder.

{}

Returns:

Name Type Description
ScorecardPipeline 'ScorecardPipeline'

The pipeline instance (self) for chaining.

Examples:

>>> pipeline.woe_transform(epsilon=1e-10)
Source code in src/newt/pipeline/pipeline.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def woe_transform(
    self,
    epsilon: float = BINNING.DEFAULT_EPSILON,
    **kwargs,
) -> "ScorecardPipeline":
    """Apply Weight of Evidence (WOE) encoding to binned features.

    Converts binned categorical/ordinal values into numeric WOE values based on the
    distribution of good and bad labels in each bin.

    Args:
        epsilon: Small constant to prevent log(0) or division by zero.
        **kwargs: Additional arguments passed to WOEEncoder.

    Returns:
        ScorecardPipeline: The pipeline instance (self) for chaining.

    Examples:
        >>> pipeline.woe_transform(epsilon=1e-10)
    """
    step = WoeTransformStep(epsilon=epsilon, **kwargs)
    self._state = step.run(self._state)
    return self

Reporting

newt.reporting.report

Public report orchestration API.

Classes

Report dataclass

Orchestrator for generating multi-sheet Excel model reports.

The Report class serves as the primary entry point for creating professional, styled Excel workbooks that summarize model performance, variable distributions, and dimensional comparisons.

Attributes:

Name Type Description
data DataFrame

The input dataset containing scores, labels, and features.

model object

A fitted model object (scikit-learn, LightGBM, XGBoost, etc.) used to extract feature importance and parameters.

tag str

Column name identifying sample segments (e.g., 'train', 'oot').

score_col str

Column name for the primary model score to be analyzed.

date_col str

Column name for the observation date (used for monthly trends).

label_list Sequence[str]

List of target column names (binary 0/1).

score_list Sequence[str]

Optional list of secondary/benchmark scores.

dim_list Sequence[str]

Optional list of columns for dimensional comparison.

var_list Sequence[str]

Optional list of columns for portrait/feature analysis.

sheet_list Sequence[object]

Optional list of sheets to include (names or indices).

feature_df DataFrame

Feature dictionary DataFrame used for variable metadata mapping.

report_out_path str

File path where the Excel workbook will be saved.

engine str

Calculation engine: 'auto' (default), 'rust', or 'python'.

max_workers int

Maximum parallel workers for computation.

parallel_sheets bool

Whether to calculate different sheets in parallel.

memory_mode str

Memory usage strategy: 'compact' (default) or 'standard'.

metrics_mode str

Calculation mode: 'exact' (default) or 'binned' (approximate).

prin_bal_amount_col str

Column name for principal-balance amount used by optional amount-based report metrics.

loan_amount_col str

Column name for loan amount used by optional amount-based report metrics.

Examples:

>>> from newt import Report
>>> report = Report(
...     data=df,
...     model=fitted_model,
...     tag="segment",
...     score_col="new_score",
...     date_col="report_date",
...     label_list=["target"],
...     report_out_path="./final_report.xlsx"
... )
>>> report.generate()
Source code in src/newt/reporting/report.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
@dataclass
class Report:
    """Orchestrator for generating multi-sheet Excel model reports.

    The Report class serves as the primary entry point for creating professional,
    styled Excel workbooks that summarize model performance, variable distributions,
    and dimensional comparisons.

    Attributes:
        data (pd.DataFrame): The input dataset containing scores, labels, and features.
        model (object): A fitted model object (scikit-learn, LightGBM, XGBoost, etc.)
            used to extract feature importance and parameters.
        tag (str): Column name identifying sample segments (e.g., 'train', 'oot').
        score_col (str): Column name for the primary model score to be analyzed.
        date_col (str): Column name for the observation date (used for monthly trends).
        label_list (Sequence[str]): List of target column names (binary 0/1).
        score_list (Sequence[str]): Optional list of secondary/benchmark scores.
        dim_list (Sequence[str]): Optional list of columns for dimensional comparison.
        var_list (Sequence[str]): Optional list of columns for portrait/feature
            analysis.
        sheet_list (Sequence[object]): Optional list of sheets to include
            (names or indices).
        feature_df (pd.DataFrame, optional): Feature dictionary DataFrame used
            for variable metadata mapping.
        report_out_path (str): File path where the Excel workbook will be saved.
        engine (str): Calculation engine: 'auto' (default), 'rust', or 'python'.
        max_workers (int, optional): Maximum parallel workers for computation.
        parallel_sheets (bool): Whether to calculate different sheets in parallel.
        memory_mode (str): Memory usage strategy: 'compact' (default) or 'standard'.
        metrics_mode (str): Calculation mode: 'exact' (default) or
            'binned' (approximate).
        prin_bal_amount_col (str, optional): Column name for principal-balance
            amount used by optional amount-based report metrics.
        loan_amount_col (str, optional): Column name for loan amount used by
            optional amount-based report metrics.

    Examples:
        >>> from newt import Report
        >>> report = Report(
        ...     data=df,
        ...     model=fitted_model,
        ...     tag="segment",
        ...     score_col="new_score",
        ...     date_col="report_date",
        ...     label_list=["target"],
        ...     report_out_path="./final_report.xlsx"
        ... )
        >>> report.generate()
    """

    data: pd.DataFrame
    model: object
    tag: str
    score_col: str
    date_col: str
    label_list: Sequence[str]
    score_list: Sequence[str] = field(default_factory=list)
    dim_list: Sequence[str] = field(default_factory=list)
    var_list: Sequence[str] = field(default_factory=list)
    sheet_list: Sequence[object] = field(default_factory=list)
    feature_df: Optional[pd.DataFrame] = None
    report_out_path: str = "./out/model_report.xlsx"
    engine: str = "auto"
    max_workers: Optional[int] = None
    parallel_sheets: bool = True
    memory_mode: str = "compact"
    metrics_mode: str = "exact"
    prin_bal_amount_col: Optional[str] = None
    loan_amount_col: Optional[str] = None

    result_: Optional[ModelReportResult] = field(default=None, init=False)

    def generate(self) -> str:
        """Generate the report and return the output path."""
        _configure_report_logger()
        self._validate_runtime_options()
        resolved_workers = self._resolve_max_workers()
        resolved_engine = self._resolve_engine()
        build_options = ReportBuildOptions(
            engine=resolved_engine,
            max_workers=resolved_workers,
            parallel_sheets=bool(self.parallel_sheets),
            memory_mode=self.memory_mode,
            metrics_mode=self.metrics_mode,
        )
        stage_timings: List[Tuple[str, float]] = []
        total_start = time.perf_counter()
        LOGGER.debug(
            "Report generation started | rows=%d cols=%d primary_score=%s labels=%s "
            "output=%s engine=%s workers=%d parallel_sheets=%s memory_mode=%s "
            "metrics_mode=%s "
            "peak_rss_mb=%s",
            len(self.data),
            len(self.data.columns),
            self.score_col,
            list(self.label_list),
            self.report_out_path,
            build_options.engine,
            build_options.max_workers,
            build_options.parallel_sheets,
            build_options.memory_mode,
            build_options.metrics_mode,
            _format_peak_rss(),
        )

        step_start = time.perf_counter()
        prepared = self._prepare_data()
        _log_stage(
            stage_timings,
            "prepare_data",
            time.perf_counter() - step_start,
            extra=f"rows={len(prepared)} peak_rss_mb={_format_peak_rss()}",
        )

        step_start = time.perf_counter()
        prepared, report_score_columns, score_direction_summary = prepare_report_scores(
            data=prepared,
            tag_col=self.tag,
            label_col=self.label_list[0],
            score_names=[self.score_col, *self.score_list],
        )
        if build_options.memory_mode == "compact":
            _downcast_float_columns(prepared, report_score_columns.values())
        _log_stage(
            stage_timings,
            "prepare_report_scores",
            time.perf_counter() - step_start,
            extra=(
                "report_scores="
                f"{sorted(report_score_columns.keys())} "
                f"peak_rss_mb={_format_peak_rss()}"
            ),
        )

        step_start = time.perf_counter()
        selected_sheets = resolve_sheet_keys(self.sheet_list)
        _log_stage(
            stage_timings,
            "resolve_sheet_keys",
            time.perf_counter() - step_start,
            extra=f"selected_sheet_keys={selected_sheets}",
        )

        step_start = time.perf_counter()
        adapter = ModelAdapter(self.model)
        _log_stage(
            stage_timings,
            "model_adapter_init",
            time.perf_counter() - step_start,
            extra=f"model_family={adapter.model_family}",
        )

        step_start = time.perf_counter()
        result = build_report_result(
            data=prepared,
            model_adapter=adapter,
            tag_col=self.tag,
            month_col="_report_month",
            raw_date_col=self.date_col,
            label_list=self.label_list,
            score_list=self.score_list,
            primary_score_name=self.score_col,
            report_score_columns=report_score_columns,
            score_direction_summary=score_direction_summary,
            dim_list=self.dim_list,
            var_list=self.var_list,
            feature_df=self.feature_df,
            selected_sheets=selected_sheets,
            prin_bal_amount_col=self.prin_bal_amount_col,
            loan_amount_col=self.loan_amount_col,
            options=build_options,
        )
        _log_stage(
            stage_timings,
            "build_report_result",
            time.perf_counter() - step_start,
            extra=(
                f"sheet_count={len(result.sheet_names)} "
                f"peak_rss_mb={_format_peak_rss()}"
            ),
        )

        step_start = time.perf_counter()
        writer = ExcelReportWriter()
        output_path = writer.write(result, self.report_out_path)
        _log_stage(
            stage_timings,
            "write_excel",
            time.perf_counter() - step_start,
            extra=f"output={output_path} peak_rss_mb={_format_peak_rss()}",
        )

        self.result_ = result
        total_elapsed = time.perf_counter() - total_start
        _log_stage(stage_timings, "total", total_elapsed)
        _log_top_slowest_steps(stage_timings)
        LOGGER.debug(
            "Report generation completed | total_elapsed=%.3fs output=%s "
            "peak_rss_mb=%s",
            total_elapsed,
            output_path,
            _format_peak_rss(),
        )
        return output_path

    def _prepare_data(self) -> pd.DataFrame:
        self._validate_columns()
        normalized_tag = _normalize_report_tag_values(self.data[self.tag])
        prepared = self.data.copy(deep=False)
        prepared = prepared.assign(
            **{
                self.tag: normalized_tag,
                "_report_month": _vectorized_normalize_month(self.data[self.date_col]),
            }
        )
        return prepared

    def _validate_columns(self) -> None:
        required = [self.tag, self.score_col, self.date_col, *self.label_list]
        optional = [
            *self.score_list,
            *self.dim_list,
            *self.var_list,
            self.prin_bal_amount_col,
            self.loan_amount_col,
        ]
        if (self.prin_bal_amount_col is None) ^ (self.loan_amount_col is None):
            raise ValueError(
                "prin_bal_amount_col and loan_amount_col must be provided together"
            )
        missing = [
            column
            for column in [*required, *optional]
            if column and column not in self.data.columns
        ]
        if missing:
            raise ValueError(f"Missing required columns: {sorted(set(missing))}")

    def _validate_runtime_options(self) -> None:
        if self.engine not in {"auto", "rust", "python"}:
            raise ValueError("engine must be 'auto', 'rust' or 'python'")
        if self.memory_mode not in {"compact", "standard"}:
            raise ValueError("memory_mode must be 'compact' or 'standard'")
        if self.metrics_mode not in {"exact", "binned"}:
            raise ValueError("metrics_mode must be 'exact' or 'binned'")
        if self.max_workers is not None and int(self.max_workers) < 1:
            raise ValueError("max_workers must be >= 1")
        if self.feature_df is not None and not isinstance(
            self.feature_df, pd.DataFrame
        ):
            raise ValueError("feature_df must be a pandas DataFrame when provided")

    def _resolve_max_workers(self) -> int:
        if self.max_workers is not None:
            return max(1, int(self.max_workers))
        cpu_total = os.cpu_count() or 1
        return max(1, min(8, cpu_total))

    def _resolve_engine(self) -> str:
        """Resolve user-facing engine option to concrete runtime engine."""
        return resolve_engine(self.engine, loader=load_native_module)
Functions
generate()

Generate the report and return the output path.

Source code in src/newt/reporting/report.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def generate(self) -> str:
    """Generate the report and return the output path."""
    _configure_report_logger()
    self._validate_runtime_options()
    resolved_workers = self._resolve_max_workers()
    resolved_engine = self._resolve_engine()
    build_options = ReportBuildOptions(
        engine=resolved_engine,
        max_workers=resolved_workers,
        parallel_sheets=bool(self.parallel_sheets),
        memory_mode=self.memory_mode,
        metrics_mode=self.metrics_mode,
    )
    stage_timings: List[Tuple[str, float]] = []
    total_start = time.perf_counter()
    LOGGER.debug(
        "Report generation started | rows=%d cols=%d primary_score=%s labels=%s "
        "output=%s engine=%s workers=%d parallel_sheets=%s memory_mode=%s "
        "metrics_mode=%s "
        "peak_rss_mb=%s",
        len(self.data),
        len(self.data.columns),
        self.score_col,
        list(self.label_list),
        self.report_out_path,
        build_options.engine,
        build_options.max_workers,
        build_options.parallel_sheets,
        build_options.memory_mode,
        build_options.metrics_mode,
        _format_peak_rss(),
    )

    step_start = time.perf_counter()
    prepared = self._prepare_data()
    _log_stage(
        stage_timings,
        "prepare_data",
        time.perf_counter() - step_start,
        extra=f"rows={len(prepared)} peak_rss_mb={_format_peak_rss()}",
    )

    step_start = time.perf_counter()
    prepared, report_score_columns, score_direction_summary = prepare_report_scores(
        data=prepared,
        tag_col=self.tag,
        label_col=self.label_list[0],
        score_names=[self.score_col, *self.score_list],
    )
    if build_options.memory_mode == "compact":
        _downcast_float_columns(prepared, report_score_columns.values())
    _log_stage(
        stage_timings,
        "prepare_report_scores",
        time.perf_counter() - step_start,
        extra=(
            "report_scores="
            f"{sorted(report_score_columns.keys())} "
            f"peak_rss_mb={_format_peak_rss()}"
        ),
    )

    step_start = time.perf_counter()
    selected_sheets = resolve_sheet_keys(self.sheet_list)
    _log_stage(
        stage_timings,
        "resolve_sheet_keys",
        time.perf_counter() - step_start,
        extra=f"selected_sheet_keys={selected_sheets}",
    )

    step_start = time.perf_counter()
    adapter = ModelAdapter(self.model)
    _log_stage(
        stage_timings,
        "model_adapter_init",
        time.perf_counter() - step_start,
        extra=f"model_family={adapter.model_family}",
    )

    step_start = time.perf_counter()
    result = build_report_result(
        data=prepared,
        model_adapter=adapter,
        tag_col=self.tag,
        month_col="_report_month",
        raw_date_col=self.date_col,
        label_list=self.label_list,
        score_list=self.score_list,
        primary_score_name=self.score_col,
        report_score_columns=report_score_columns,
        score_direction_summary=score_direction_summary,
        dim_list=self.dim_list,
        var_list=self.var_list,
        feature_df=self.feature_df,
        selected_sheets=selected_sheets,
        prin_bal_amount_col=self.prin_bal_amount_col,
        loan_amount_col=self.loan_amount_col,
        options=build_options,
    )
    _log_stage(
        stage_timings,
        "build_report_result",
        time.perf_counter() - step_start,
        extra=(
            f"sheet_count={len(result.sheet_names)} "
            f"peak_rss_mb={_format_peak_rss()}"
        ),
    )

    step_start = time.perf_counter()
    writer = ExcelReportWriter()
    output_path = writer.write(result, self.report_out_path)
    _log_stage(
        stage_timings,
        "write_excel",
        time.perf_counter() - step_start,
        extra=f"output={output_path} peak_rss_mb={_format_peak_rss()}",
    )

    self.result_ = result
    total_elapsed = time.perf_counter() - total_start
    _log_stage(stage_timings, "total", total_elapsed)
    _log_top_slowest_steps(stage_timings)
    LOGGER.debug(
        "Report generation completed | total_elapsed=%.3fs output=%s "
        "peak_rss_mb=%s",
        total_elapsed,
        output_path,
        _format_peak_rss(),
    )
    return output_path

newt.reporting.interactive

Interactive reporting wrappers for use in Jupyter Notebooks.

Functions

calculate_tag_metrics(data, tag_col, date_col, label_list, score_model_columns, metrics_mode='exact', score_type='auto', prin_bal_amount_col=None, loan_amount_col=None)

Calculate split performance metrics by tag.

Parameters:

Name Type Description Default
data DataFrame

Input DataFrame containing the predictions.

required
tag_col str

Column name indicating sample set (e.g., 'train', 'oot').

required
date_col str

Date column name; used to build tag observation windows.

required
label_list Sequence[str]

List of label column names.

required
score_model_columns Sequence[Tuple[str, str]]

List of (model_name, score_column) tuples.

required
metrics_mode str

'exact' or 'binned'. Default is 'exact'.

'exact'
score_type str

Score semantics: 'auto', 'probability' (higher=more risky), or 'score' (higher=less risky).

'auto'
prin_bal_amount_col Optional[str]

Optional principal-balance amount column.

None
loan_amount_col Optional[str]

Optional total-loan amount column.

None

Returns:

Type Description
DataFrame

DataFrame containing metrics grouped by tag.

Source code in src/newt/reporting/interactive.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def calculate_tag_metrics(
    data: pd.DataFrame,
    tag_col: str,
    date_col: str,
    label_list: Sequence[str],
    score_model_columns: Sequence[Tuple[str, str]],
    metrics_mode: str = "exact",
    score_type: str = "auto",
    prin_bal_amount_col: Optional[str] = None,
    loan_amount_col: Optional[str] = None,
) -> pd.DataFrame:
    """Calculate split performance metrics by tag.

    Args:
        data: Input DataFrame containing the predictions.
        tag_col: Column name indicating sample set (e.g., 'train', 'oot').
        date_col: Date column name; used to build tag observation windows.
        label_list: List of label column names.
        score_model_columns: List of (model_name, score_column) tuples.
        metrics_mode: 'exact' or 'binned'. Default is 'exact'.
        score_type: Score semantics: 'auto', 'probability' (higher=more risky),
            or 'score' (higher=less risky).
        prin_bal_amount_col: Optional principal-balance amount column.
        loan_amount_col: Optional total-loan amount column.

    Returns:
        DataFrame containing metrics grouped by tag.
    """
    resolved_score_models = _resolve_score_model_columns(score_model_columns)
    score_direction_options = _build_score_direction_options(
        data=data,
        label_list=label_list,
        score_model_columns=resolved_score_models,
        score_type=score_type,
        tag_col=tag_col,
    )
    tag_df, _ = _build_tag_month_metrics_for_models(
        data=data,
        tag_col=tag_col,
        date_col=date_col,
        label_list=label_list,
        score_model_columns=resolved_score_models,
        score_direction_options=score_direction_options,
        metrics_mode=metrics_mode,
        prin_bal_amount_col=prin_bal_amount_col,
        loan_amount_col=loan_amount_col,
    )
    return tag_df

calculate_month_metrics(data, date_col, label_list, score_model_columns, metrics_mode='exact', score_type='auto', prin_bal_amount_col=None, loan_amount_col=None)

Calculate performance metrics by month using the first month as PSI base.

Parameters:

Name Type Description Default
data DataFrame

Input DataFrame containing the predictions.

required
date_col str

Date column name; used to automatically generate month column.

required
label_list Sequence[str]

List of label column names.

required
score_model_columns Sequence[Tuple[str, str]]

List of (model_name, score_column) tuples.

required
metrics_mode str

'exact' or 'binned'. Default is 'exact'.

'exact'
score_type str

Score semantics: 'auto', 'probability' (higher=more risky), or 'score' (higher=less risky).

'auto'
prin_bal_amount_col Optional[str]

Optional principal-balance amount column.

None
loan_amount_col Optional[str]

Optional total-loan amount column.

None

Returns:

Type Description
DataFrame

DataFrame containing metrics grouped by auto-derived month.

Source code in src/newt/reporting/interactive.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
def calculate_month_metrics(
    data: pd.DataFrame,
    date_col: str,
    label_list: Sequence[str],
    score_model_columns: Sequence[Tuple[str, str]],
    metrics_mode: str = "exact",
    score_type: str = "auto",
    prin_bal_amount_col: Optional[str] = None,
    loan_amount_col: Optional[str] = None,
) -> pd.DataFrame:
    """Calculate performance metrics by month using the first month as PSI base.

    Args:
        data: Input DataFrame containing the predictions.
        date_col: Date column name; used to automatically generate month column.
        label_list: List of label column names.
        score_model_columns: List of (model_name, score_column) tuples.
        metrics_mode: 'exact' or 'binned'. Default is 'exact'.
        score_type: Score semantics: 'auto', 'probability' (higher=more risky),
            or 'score' (higher=less risky).
        prin_bal_amount_col: Optional principal-balance amount column.
        loan_amount_col: Optional total-loan amount column.

    Returns:
        DataFrame containing metrics grouped by auto-derived month.
    """
    resolved_score_models = _resolve_score_model_columns(score_model_columns)
    score_direction_options = _build_score_direction_options(
        data=data,
        label_list=label_list,
        score_model_columns=resolved_score_models,
        score_type=score_type,
    )
    amount_prin_col, amount_loan_col = _validate_amount_metric_columns(
        data=data,
        prin_bal_amount_col=prin_bal_amount_col,
        loan_amount_col=loan_amount_col,
    )

    month_df = _build_first_month_metrics_for_models(
        data=data,
        date_col=date_col,
        label_list=label_list,
        score_model_columns=resolved_score_models,
        score_direction_options=score_direction_options,
        metrics_mode=metrics_mode,
        prin_bal_amount_col=None,
        loan_amount_col=None,
    )
    if amount_prin_col is None or amount_loan_col is None:
        return month_df

    amount_month_df = _build_first_month_metrics_for_models(
        data=data,
        date_col=date_col,
        label_list=label_list,
        score_model_columns=resolved_score_models,
        score_direction_options=score_direction_options,
        metrics_mode=metrics_mode,
        metric_basis="amount",
        prin_bal_amount_col=amount_prin_col,
        loan_amount_col=amount_loan_col,
    )
    key_columns = ["样本标签", "模型", "样本集", "观察点月"]
    return _merge_amount_extension_columns(
        base_frame=month_df,
        amount_metrics_frame=amount_month_df,
        key_columns=key_columns,
        leading_columns=key_columns,
    )

calculate_split_metrics(data, tag_col, date_col, label_list, score_col=None, model_name=None, metrics_mode='exact', score_type='auto', prin_bal_amount_col=None, loan_amount_col=None, *, score_model_columns=None)

Calculate split performance metrics by tag and month.

Parameters:

Name Type Description Default
data DataFrame

Input DataFrame containing the predictions.

required
tag_col str

Column name indicating sample set (e.g., 'train', 'oot').

required
date_col str

Date column name; used to automatically generate month column.

required
label_list Sequence[str]

List of label column names.

required
score_col Optional[str]

Legacy score column name. Use score_model_columns for new code.

None
model_name Optional[str]

Legacy model name. Use score_model_columns for new code.

None
score_model_columns Optional[Sequence[Tuple[str, str]]]

List of (model_name, score_column) tuples.

None
metrics_mode str

'exact' or 'binned'. Default is 'exact'.

'exact'
score_type str

Score semantics: 'auto', 'probability' (higher=more risky), or 'score' (higher=less risky).

'auto'
prin_bal_amount_col Optional[str]

Optional principal-balance amount column.

None
loan_amount_col Optional[str]

Optional total-loan amount column.

None

Returns:

Type Description
Tuple[DataFrame, DataFrame]

Tuple of two DataFrames: - tag_df: Metrics grouped by tag. - month_df: Metrics grouped by auto-derived month.

Source code in src/newt/reporting/interactive.py
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
def calculate_split_metrics(
    data: pd.DataFrame,
    tag_col: str,
    date_col: str,
    label_list: Sequence[str],
    score_col: Optional[str] = None,
    model_name: Optional[str] = None,
    metrics_mode: str = "exact",
    score_type: str = "auto",
    prin_bal_amount_col: Optional[str] = None,
    loan_amount_col: Optional[str] = None,
    *,
    score_model_columns: Optional[Sequence[Tuple[str, str]]] = None,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Calculate split performance metrics by tag and month.

    Args:
        data: Input DataFrame containing the predictions.
        tag_col: Column name indicating sample set (e.g., 'train', 'oot').
        date_col: Date column name; used to automatically generate month column.
        label_list: List of label column names.
        score_col: Legacy score column name. Use score_model_columns for new code.
        model_name: Legacy model name. Use score_model_columns for new code.
        score_model_columns: List of (model_name, score_column) tuples.
        metrics_mode: 'exact' or 'binned'. Default is 'exact'.
        score_type: Score semantics: 'auto', 'probability' (higher=more risky),
            or 'score' (higher=less risky).
        prin_bal_amount_col: Optional principal-balance amount column.
        loan_amount_col: Optional total-loan amount column.

    Returns:
        Tuple of two DataFrames:
            - tag_df: Metrics grouped by tag.
            - month_df: Metrics grouped by auto-derived month.
    """
    resolved_score_models = _resolve_score_model_columns(
        score_model_columns=score_model_columns,
        score_col=score_col,
        model_name=model_name,
    )
    score_direction_options = _build_score_direction_options(
        data=data,
        label_list=label_list,
        score_model_columns=resolved_score_models,
        score_type=score_type,
        tag_col=tag_col,
    )
    return _build_tag_month_metrics_for_models(
        data=data,
        tag_col=tag_col,
        date_col=date_col,
        label_list=label_list,
        score_model_columns=resolved_score_models,
        score_direction_options=score_direction_options,
        metrics_mode=metrics_mode,
        prin_bal_amount_col=prin_bal_amount_col,
        loan_amount_col=loan_amount_col,
    )

calculate_dimensional_comparison(data, dim_list, label_list, score_model_columns, metrics_mode='exact', score_type='auto', prin_bal_amount_col=None, loan_amount_col=None)

Calculate dimensional comparison metrics.

Parameters:

Name Type Description Default
data DataFrame

Input DataFrame.

required
dim_list Sequence[str]

List of dimension column names to split by.

required
label_list Sequence[str]

List of label column names.

required
score_model_columns Sequence[Tuple[str, str]]

List of (model_name, score_column) tuples.

required
metrics_mode str

'exact' or 'binned'. Default is 'exact'.

'exact'
score_type str

Score semantics: 'auto', 'probability' (higher=more risky), or 'score' (higher=less risky).

'auto'
prin_bal_amount_col Optional[str]

Optional principal-balance amount column.

None
loan_amount_col Optional[str]

Optional total-loan amount column.

None

Returns:

Type Description
DataFrame

DataFrame containing metrics grouped by dimensions.

Source code in src/newt/reporting/interactive.py
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
def calculate_dimensional_comparison(
    data: pd.DataFrame,
    dim_list: Sequence[str],
    label_list: Sequence[str],
    score_model_columns: Sequence[Tuple[str, str]],
    metrics_mode: str = "exact",
    score_type: str = "auto",
    prin_bal_amount_col: Optional[str] = None,
    loan_amount_col: Optional[str] = None,
) -> pd.DataFrame:
    """Calculate dimensional comparison metrics.

    Args:
        data: Input DataFrame.
        dim_list: List of dimension column names to split by.
        label_list: List of label column names.
        score_model_columns: List of (model_name, score_column) tuples.
        metrics_mode: 'exact' or 'binned'. Default is 'exact'.
        score_type: Score semantics: 'auto', 'probability' (higher=more risky),
            or 'score' (higher=less risky).
        prin_bal_amount_col: Optional principal-balance amount column.
        loan_amount_col: Optional total-loan amount column.

    Returns:
        DataFrame containing metrics grouped by dimensions.
    """
    score_direction_options = _build_score_direction_options(
        data=data,
        label_list=label_list,
        score_model_columns=score_model_columns,
        score_type=score_type,
    )
    amount_prin_col, amount_loan_col = _validate_amount_metric_columns(
        data=data,
        prin_bal_amount_col=prin_bal_amount_col,
        loan_amount_col=loan_amount_col,
    )

    dim_frames: List[pd.DataFrame] = []
    for label_col in label_list:
        dim_frames.append(
            _build_dimensional_comparison(
                data=data,
                dim_list=dim_list,
                label_list=[label_col],
                score_model_columns=score_model_columns,
                score_metric_options=_build_model_score_metric_options_for_label(
                    score_direction_options,
                    label_col=label_col,
                    score_model_columns=score_model_columns,
                ),
                metrics_mode=metrics_mode,
                prin_bal_amount_col=None,
                loan_amount_col=None,
            )
        )
    dim_df = pd.concat(dim_frames, ignore_index=True) if dim_frames else pd.DataFrame()
    if amount_prin_col is None or amount_loan_col is None:
        return dim_df

    amount_dim_frames: List[pd.DataFrame] = []
    for label_col in label_list:
        amount_dim_frames.append(
            _build_dimensional_comparison(
                data=data,
                dim_list=dim_list,
                label_list=[label_col],
                score_model_columns=score_model_columns,
                score_metric_options=_build_model_score_metric_options_for_label(
                    score_direction_options,
                    label_col=label_col,
                    score_model_columns=score_model_columns,
                ),
                metrics_mode=metrics_mode,
                metric_basis="amount",
                prin_bal_amount_col=amount_prin_col,
                loan_amount_col=amount_loan_col,
            )
        )
    amount_dim_df = (
        pd.concat(amount_dim_frames, ignore_index=True)
        if amount_dim_frames
        else pd.DataFrame()
    )
    key_columns = ["维度列", "维度值", "样本标签", "模型"]
    return _merge_amount_extension_columns(
        base_frame=dim_df,
        amount_metrics_frame=amount_dim_df,
        key_columns=key_columns,
        leading_columns=key_columns,
    )

calculate_model_comparison(data, tag_col, date_col, label_list, model_columns, group_mode='month', metrics_mode='exact', score_type='auto', prin_bal_amount_col=None, loan_amount_col=None)

Compare multiple models directly.

Parameters:

Name Type Description Default
data DataFrame

Input DataFrame.

required
tag_col str

Column name indicating sample set (e.g., 'train', 'oot').

required
date_col str

Date column name; used to generate month column.

required
label_list Sequence[str]

List of label column names.

required
model_columns Sequence[Tuple[str, str]]

List of (model_name, score_column) tuples.

required
group_mode str

Mode to group by, either 'month' or 'tag'. Default is 'month'.

'month'
metrics_mode str

'exact' or 'binned'. Default is 'exact'.

'exact'
score_type str

Score semantics: 'auto', 'probability' (higher=more risky), or 'score' (higher=less risky).

'auto'
prin_bal_amount_col Optional[str]

Optional principal-balance amount column.

None
loan_amount_col Optional[str]

Optional total-loan amount column.

None

Returns:

Type Description
DataFrame

DataFrame containing model comparison metrics.

Source code in src/newt/reporting/interactive.py
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
def calculate_model_comparison(
    data: pd.DataFrame,
    tag_col: str,
    date_col: str,
    label_list: Sequence[str],
    model_columns: Sequence[Tuple[str, str]],
    group_mode: str = "month",
    metrics_mode: str = "exact",
    score_type: str = "auto",
    prin_bal_amount_col: Optional[str] = None,
    loan_amount_col: Optional[str] = None,
) -> pd.DataFrame:
    """Compare multiple models directly.

    Args:
        data: Input DataFrame.
        tag_col: Column name indicating sample set (e.g., 'train', 'oot').
        date_col: Date column name; used to generate month column.
        label_list: List of label column names.
        model_columns: List of (model_name, score_column) tuples.
        group_mode: Mode to group by, either 'month' or 'tag'. Default is 'month'.
        metrics_mode: 'exact' or 'binned'. Default is 'exact'.
        score_type: Score semantics: 'auto', 'probability' (higher=more risky),
            or 'score' (higher=less risky).
        prin_bal_amount_col: Optional principal-balance amount column.
        loan_amount_col: Optional total-loan amount column.

    Returns:
        DataFrame containing model comparison metrics.
    """
    amount_prin_col, amount_loan_col = _validate_amount_metric_columns(
        data=data,
        prin_bal_amount_col=prin_bal_amount_col,
        loan_amount_col=loan_amount_col,
    )

    working_data = data.copy()
    working_data["_report_month"] = _vectorized_normalize_month(working_data[date_col])
    score_direction_options = _build_score_direction_options(
        data=working_data,
        label_list=label_list,
        score_model_columns=model_columns,
        score_type=score_type,
        tag_col=tag_col,
    )

    comparison_frames: List[pd.DataFrame] = []
    for label_col in label_list:
        comparison_frames.append(
            _build_model_pair_comparison(
                data=working_data,
                group_mode=group_mode,
                label_list=[label_col],
                model_columns=model_columns,
                tag_col=tag_col,
                month_col="_report_month",
                raw_date_col=date_col,
                score_metric_options=_build_model_score_metric_options_for_label(
                    score_direction_options,
                    label_col=label_col,
                    score_model_columns=model_columns,
                ),
                metrics_mode=metrics_mode,
                prin_bal_amount_col=None,
                loan_amount_col=None,
                build_context=None,
            )
        )
    comparison_df = (
        pd.concat(comparison_frames, ignore_index=True)
        if comparison_frames
        else pd.DataFrame()
    )
    if amount_prin_col is None or amount_loan_col is None:
        return comparison_df

    amount_comparison_frames: List[pd.DataFrame] = []
    for label_col in label_list:
        amount_comparison_frames.append(
            _build_model_pair_comparison(
                data=working_data,
                group_mode=group_mode,
                label_list=[label_col],
                model_columns=model_columns,
                tag_col=tag_col,
                month_col="_report_month",
                raw_date_col=date_col,
                score_metric_options=_build_model_score_metric_options_for_label(
                    score_direction_options,
                    label_col=label_col,
                    score_model_columns=model_columns,
                ),
                metrics_mode=metrics_mode,
                metric_basis="amount",
                prin_bal_amount_col=amount_prin_col,
                loan_amount_col=amount_loan_col,
                build_context=None,
            )
        )
    amount_comparison_df = (
        pd.concat(amount_comparison_frames, ignore_index=True)
        if amount_comparison_frames
        else pd.DataFrame()
    )
    key_columns = ["样本标签", "模型", "样本集", "观察点月"]
    return _merge_amount_extension_columns(
        base_frame=comparison_df,
        amount_metrics_frame=amount_comparison_df,
        key_columns=key_columns,
        leading_columns=key_columns,
    )

calculate_bin_metrics(data, label_col, score_col, q=10, bins=None, prin_bal_amount_col=None, loan_amount_col=None)

Calculate bin-level sample and optional amount metrics.

Parameters:

Name Type Description Default
data DataFrame

Input DataFrame.

required
label_col str

Binary label column name.

required
score_col str

Score column name.

required
q int

Number of quantile bins when bins is not provided.

10
bins Optional[Sequence[float]]

Optional custom split edges.

None
prin_bal_amount_col Optional[str]

Optional principal-balance amount column.

None
loan_amount_col Optional[str]

Optional total-loan amount column.

None

Returns:

Type Description
DataFrame

DataFrame containing per-bin sample metrics and optional amount metrics.

Source code in src/newt/reporting/interactive.py
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
def calculate_bin_metrics(
    data: pd.DataFrame,
    label_col: str,
    score_col: str,
    q: int = 10,
    bins: Optional[Sequence[float]] = None,
    prin_bal_amount_col: Optional[str] = None,
    loan_amount_col: Optional[str] = None,
) -> pd.DataFrame:
    """Calculate bin-level sample and optional amount metrics.

    Args:
        data: Input DataFrame.
        label_col: Binary label column name.
        score_col: Score column name.
        q: Number of quantile bins when ``bins`` is not provided.
        bins: Optional custom split edges.
        prin_bal_amount_col: Optional principal-balance amount column.
        loan_amount_col: Optional total-loan amount column.

    Returns:
        DataFrame containing per-bin sample metrics and optional amount metrics.
    """
    amount_prin_col, amount_loan_col = _validate_amount_metric_columns(
        data=data,
        prin_bal_amount_col=prin_bal_amount_col,
        loan_amount_col=loan_amount_col,
    )

    if bins is None:
        if int(q) < 2:
            raise ValueError("q must be >= 2")
        edges = build_reference_quantile_bins(data[score_col], bins=int(q))
    else:
        edge_values = np.asarray(list(bins), dtype=float)
        if edge_values.ndim != 1 or edge_values.size < 2:
            raise ValueError("bins must contain at least two edges")
        if not np.all(np.diff(edge_values) > 0):
            raise ValueError("bins must be strictly increasing")
        edges = edge_values

    result = calculate_bin_performance_table(
        data=data,
        label_col=label_col,
        score_col=score_col,
        edges=edges,
    )
    if result.empty or amount_prin_col is None or amount_loan_col is None:
        return result

    amount_frame = data.loc[
        data[label_col].isin([0, 1]),
        [score_col, amount_prin_col, amount_loan_col],
    ].copy()
    if amount_frame.empty:
        return result

    amount_frame["bin"] = assign_reference_bins(amount_frame[score_col], edges).astype(
        str
    )
    amount_frame["_逾期本金"] = pd.to_numeric(
        amount_frame[amount_prin_col], errors="coerce"
    )
    amount_frame["_放款金额"] = pd.to_numeric(
        amount_frame[amount_loan_col], errors="coerce"
    )

    amount_grouped = (
        amount_frame.groupby("bin", dropna=False, sort=False)[
            ["_逾期本金", "_放款金额"]
        ]
        .sum()
        .reset_index()
        .rename(columns={"_逾期本金": "逾期本金", "_放款金额": "放款金额"})
    )
    merged = result.merge(amount_grouped, on="bin", how="left")

    total_prin_bal = float(amount_frame["_逾期本金"].sum())
    total_loan = float(amount_frame["_放款金额"].sum())
    overall_amount_bad_rate = _safe_divide(total_prin_bal, total_loan)

    merged["金额坏占比"] = _safe_divide_series(merged["逾期本金"], merged["放款金额"])
    merged["放款金额占比"] = _safe_divide_scalar_series(merged["放款金额"], total_loan)
    merged["逾期本金占比"] = _safe_divide_scalar_series(
        merged["逾期本金"], total_prin_bal
    )
    if pd.isna(overall_amount_bad_rate) or overall_amount_bad_rate == 0:
        merged["金额lift"] = np.nan
    else:
        merged["金额lift"] = merged["金额坏占比"] / overall_amount_bad_rate
    merged["金额lift"] = merged["金额lift"].replace([np.inf, -np.inf], np.nan)

    ordered_columns = [
        *result.columns.tolist(),
        "逾期本金",
        "放款金额",
        "金额坏占比",
        "放款金额占比",
        "逾期本金占比",
        "金额lift",
    ]
    return merged.reindex(columns=ordered_columns)