import posixpath
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
import torch
import torch.utils.data as torchdata
from deprecate_kwargs import deprecate_kwargs
from sklearn.datasets import load_svmlight_file
from ..models import nn as mnn
from ..models.utils import top_n_accuracy
from ..utils._download_data import http_get
from ..utils.const import CACHED_DATA_DIR
from ..utils.misc import set_seed
from .fed_dataset import FedDataset
__all__ = [
"FedLibSVMDataset",
"libsvmread",
]
_libsvm_domain = "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/"
# NOT fully listed
_libsvm_datasets = {f"a{i}a": [f"binary/a{i}a", f"binary/a{i}a.t"] for i in range(1, 10)}
_libsvm_datasets.update({f"w{i}a": [f"binary/w{i}a", f"binary/w{i}a.t"] for i in range(1, 9)})
_libsvm_datasets = {k: [posixpath.join(_libsvm_domain, item) for item in v] for k, v in _libsvm_datasets.items()}
[docs]class FedLibSVMDataset(FedDataset):
__name__ = "FedLibSVMDataset"
@deprecate_kwargs([["criterion_name", "criterion"]])
def __init__(
self,
dataset_name: str,
num_clients: int,
iid: bool = True,
criterion: str = "svm",
seed: int = 0,
) -> None:
self.dataset_name = dataset_name
assert self.dataset_name in _libsvm_datasets, (
f"dataset {self.dataset_name} not supported, " f"supported datasets: {list(_libsvm_datasets.keys())}"
)
self.num_clients = num_clients
self.iid = iid
if not self.iid:
# ``non_iid_partition_with_dirichlet_distribution`` is too slow
raise NotImplementedError("non-iid not implemented yet")
self.criterion_name = criterion.lower()
self.datadir = CACHED_DATA_DIR / "libsvm_datasets" / dataset_name
self.criterion = None
self._data = None
self.__num_features, self.__num_classes = None, None
self._preload(seed=seed)
def _preload(self, seed: int = 0) -> None:
"""Preload the dataset.
Parameters
----------
seed : int, default 0
Random seed for data partitioning.
Returns
-------
None
"""
self.seed = seed
set_seed(self.seed)
rng = np.random.default_rng(self.seed)
self.datadir.mkdir(parents=True, exist_ok=True)
self.download_if_needed()
self.criterion = self.criteria_mapping[self.criterion_name]
train_X, train_y = libsvmread(self.datadir / self.dataset_name, toarray=True)
shuffled_indices = np.arange(len(train_y))
rng.shuffle(shuffled_indices)
train_X = train_X[shuffled_indices]
train_y = train_y[shuffled_indices]
test_X, test_y = libsvmread(self.datadir / f"{self.dataset_name}.t", toarray=True)
shuffled_indices = np.arange(len(test_y))
rng.shuffle(shuffled_indices)
test_X = test_X[shuffled_indices]
test_y = test_y[shuffled_indices]
self.__num_features = train_X.shape[1]
self.__num_classes = len(np.unique(train_y))
# do partition
min_gap = int(np.ceil(self.num_clients * test_X.shape[0] / train_X.shape[0]) + 1)
train_split_indices = np.sort(
rng.choice(
train_X.shape[0] - self.num_clients * min_gap,
self.num_clients,
replace=False,
)
) + min_gap * np.arange(self.num_clients)
train_split_indices = np.append(train_split_indices, train_X.shape[0])
test_split_indices = (train_split_indices / train_X.shape[0] * test_X.shape[0]).astype(int)
test_split_indices[-1] = test_X.shape[0]
self._data = [
{
"train_X": train_X[train_split_indices[i] : train_split_indices[i + 1]],
"train_y": train_y[train_split_indices[i] : train_split_indices[i + 1]],
"test_X": test_X[test_split_indices[i] : test_split_indices[i + 1]],
"test_y": test_y[test_split_indices[i] : test_split_indices[i + 1]],
}
for i in range(self.num_clients)
]
self.DEFAULT_BATCH_SIZE = -1
self.DEFAULT_TRAIN_CLIENTS_NUM = self.num_clients
self.DEFAULT_TEST_CLIENTS_NUM = self.num_clients
[docs] def reset_seed(self, seed: int) -> None:
"""Reset the seed and re-partition the data.
Parameters
----------
seed : int
Random seed for data partitioning.
Returns
-------
None
"""
self._preload(seed)
[docs] def get_dataloader(
self,
train_bs: Optional[int] = None,
test_bs: Optional[int] = None,
client_idx: Optional[int] = None,
) -> Tuple[torchdata.DataLoader, torchdata.DataLoader]:
"""Get local dataloader at client `client_idx` or get the global dataloader.
Parameters
----------
train_bs : int, optional
Batch size for training dataloader.
If ``None``, use default batch size.
test_bs : int, optional
Batch size for testing dataloader.
If ``None``, use default batch size.
client_idx : int, optional
Index of the client to get dataloader.
If ``None``, get the dataloader containing all data.
Usually used for centralized training.
Returns
-------
train_dl : :class:`torch.utils.data.DataLoader`
Training dataloader.
test_dl : :class:`torch.utils.data.DataLoader`
Testing dataloader.
"""
assert client_idx is None or 0 <= client_idx < self.num_clients
if client_idx is None:
train_X = np.concatenate([self._data[i]["train_X"] for i in range(self.num_clients)])
train_y = np.concatenate([self._data[i]["train_y"] for i in range(self.num_clients)])
test_X = np.concatenate([self._data[i]["test_X"] for i in range(self.num_clients)])
test_y = np.concatenate([self._data[i]["test_y"] for i in range(self.num_clients)])
else:
train_X = self._data[client_idx]["train_X"]
train_y = self._data[client_idx]["train_y"]
test_X = self._data[client_idx]["test_X"]
test_y = self._data[client_idx]["test_y"]
train_bs = train_bs or self.DEFAULT_BATCH_SIZE
if train_bs == -1:
train_bs = len(train_X)
train_dl = torchdata.DataLoader(
torchdata.TensorDataset(
torch.from_numpy(train_X).float(),
torch.from_numpy(train_y).long(),
),
batch_size=train_bs,
shuffle=True,
)
test_bs = test_bs or self.DEFAULT_BATCH_SIZE
if test_bs == -1:
test_bs = len(test_X)
test_dl = torchdata.DataLoader(
torchdata.TensorDataset(
torch.from_numpy(test_X).float(),
torch.from_numpy(test_y).long(),
),
batch_size=test_bs,
shuffle=False,
)
return train_dl, test_dl
[docs] def load_partition_data_distributed(self, process_id: int, batch_size: Optional[int] = None) -> tuple:
"""Get local dataloader at client `process_id` or get global dataloader.
Parameters
----------
process_id : int
Index of the client to get dataloader.
If ``None``, get the dataloader containing all data,
usually used for centralized training.
batch_size : int, optional
Batch size for dataloader.
If ``None``, use default batch size.
Returns
-------
tuple
- train_clients_num: :obj:`int`
Number of training clients.
- train_data_num: :obj:`int`
Number of training data.
- train_data_global: :class:`torch.utils.data.DataLoader` or None
Global training dataloader.
- test_data_global: :class:`torch.utils.data.DataLoader` or None
Global testing dataloader.
- local_data_num: :obj:`int`
Number of local training data.
- train_data_local: :class:`torch.utils.data.DataLoader` or None
Local training dataloader.
- test_data_local: :class:`torch.utils.data.DataLoader` or None
Local testing dataloader.
- n_class: :obj:`int`
Number of classes.
"""
_batch_size = batch_size or self.DEFAULT_BATCH_SIZE
if process_id == 0:
# get global dataset
train_data_global, test_data_global = self.get_dataloader(_batch_size, _batch_size)
train_data_num = len(train_data_global.dataset)
test_data_num = len(test_data_global.dataset)
train_data_local = None
test_data_local = None
local_data_num = 0
else:
# get local dataset
train_data_local, test_data_local = self.get_dataloader(_batch_size, _batch_size, process_id - 1)
train_data_num = local_data_num = len(train_data_local.dataset)
train_data_global = None
test_data_global = None
retval = (
self.num_clients,
train_data_num,
train_data_global,
test_data_global,
local_data_num,
train_data_local,
test_data_local,
self.num_classes,
)
return retval
[docs] def load_partition_data(self, batch_size: Optional[int] = None) -> tuple:
"""Partition data into all local clients.
Parameters
----------
batch_size : int, optional
Batch size for dataloader.
If ``None``, use default batch size.
Returns
-------
tuple
- train_clients_num: :obj:`int`
Number of training clients.
- train_data_num: :obj:`int`
Number of training data.
- test_data_num: :obj:`int`
Number of testing data.
- train_data_global: :class:`torch.utils.data.DataLoader`
Global training dataloader.
- test_data_global: :class:`torch.utils.data.DataLoader`
Global testing dataloader.
- data_local_num_dict: :obj:`dict`
Number of local training data for each client.
- train_data_local_dict: :obj:`dict`
Local training dataloader for each client.
- test_data_local_dict: :obj:`dict`
Local testing dataloader for each client.
- n_class: :obj:`int`
Number of classes.
"""
_batch_size = batch_size or self.DEFAULT_BATCH_SIZE
# get local dataset
data_local_num_dict = dict()
train_data_local_dict = dict()
test_data_local_dict = dict()
for client_idx in range(self.num_clients):
train_data_local, test_data_local = self.get_dataloader(_batch_size, _batch_size, client_idx)
local_data_num = len(train_data_local.dataset)
data_local_num_dict[client_idx] = local_data_num
train_data_local_dict[client_idx] = train_data_local
test_data_local_dict[client_idx] = test_data_local
# global dataset
train_data_global = torchdata.DataLoader(
torchdata.ConcatDataset(list(dl.dataset for dl in list(train_data_local_dict.values()))),
batch_size=_batch_size,
shuffle=True,
)
train_data_num = len(train_data_global.dataset)
test_data_global = torchdata.DataLoader(
torchdata.ConcatDataset(list(dl.dataset for dl in list(test_data_local_dict.values()) if dl is not None)),
batch_size=_batch_size,
shuffle=True,
)
test_data_num = len(test_data_global.dataset)
retval = (
self.num_clients,
train_data_num,
test_data_num,
train_data_global,
test_data_global,
data_local_num_dict,
train_data_local_dict,
test_data_local_dict,
self.num_classes,
)
return retval
[docs] def evaluate(self, probs: torch.Tensor, truths: torch.Tensor) -> Dict[str, float]:
"""Evaluation using predictions and ground truth.
Parameters
----------
probs : torch.Tensor
Predicted probabilities.
truths : torch.Tensor
Ground truth labels.
Returns
-------
Dict[str, float]
Evaluation results.
"""
return {
"acc": top_n_accuracy(probs, truths, 1),
"loss": self.criterion(probs, truths).item(),
"num_samples": probs.shape[0],
}
@property
def url(self) -> str:
"""URL for downloading the dataset."""
raise posixpath.dirname(_libsvm_datasets[self.dataset_name][0]) + ".html"
[docs] def download_if_needed(self) -> None:
"""Download data if needed."""
for url in _libsvm_datasets[self.dataset_name]:
if (self.datadir / posixpath.basename(url)).exists():
continue
http_get(url, self.datadir, extract=False)
@property
def candidate_models(self) -> Dict[str, torch.nn.Module]:
"""A set of candidate models."""
return {
"svm": mnn.SVC(self.num_features, self.num_classes),
"lr": mnn.LogisticRegression(self.num_features, self.num_classes),
}
[docs] @classmethod
def list_datasets(cls) -> List[str]:
"""List all available LibSVM datasets."""
return list(_libsvm_datasets.keys())
[docs] @classmethod
def list_all_libsvm_datasets(cls) -> pd.DataFrame:
"""List all LibSVM datasets."""
return pd.read_html(_libsvm_domain)[0]
@property
def criteria_mapping(self) -> Dict[str, torch.nn.Module]:
"""Mapping from criterion name to criterion."""
return {
"svm": torch.nn.MultiMarginLoss(),
"svr": torch.nn.MSELoss(),
"lr": torch.nn.CrossEntropyLoss(),
"logistic_regression": torch.nn.CrossEntropyLoss(),
}
@property
def num_features(self) -> int:
"""Number of features."""
return self.__num_features
@property
def num_classes(self) -> int:
"""Number of classes."""
return self.__num_classes
@property
def doi(self) -> List[str]:
"""DOI(s) related to the dataset."""
return ["10.1145/1961189.1961199"]
def libsvmread(fp: Union[str, Path], multilabel: bool = False, toarray: bool = True) -> Tuple[np.ndarray, np.ndarray]:
"""Read data file in libsvm format.
Parameters
----------
fp : Union[str, pathlib.Path]
Path to the file.
multilabel : bool, default False
Whether the labels are multilabel.
toarray : bool, default True
Whether to convert the features to dense array.
Returns
-------
features : numpy.ndarray
Features in numpy array.
labels : numpy.ndarray
Labels in numpy array.
"""
features, labels = load_svmlight_file(str(fp), multilabel=multilabel, dtype=np.float32)
if toarray:
features = features.toarray()
return features, labels