Source code for sc_dr.datasets

#!/usr/bin/env python3
###########################################
# This module implements a few classes to 
# help load data into PyTorch compatible 
# datasets.
#
# Although the aim is PyTorch compatibility
# there is no problem in utilizing these
# datasets for general computation
###########################################
from typing import Any, Callable, Optional
from time import time

import numpy as np

#import torch
#from torch.utils.data import Dataset

from tqdm import tqdm
import h5py
import sharedmem as sm

import pickle

from sklearn.decomposition import PCA

from sklearn.preprocessing import LabelEncoder

class Dataset:
    pass

class DuoBenchmark(Dataset):
    def __init__(self,path,log_trans=False,log1p=False,split_head=True):
        """
        Load a dataset from a given path.

        The supplied file should be ',' separated with the first row
        consisting of cell identifiers in one of two formats indicated by
        the split_head parameter.
            -- split_head=True : identifiers are in the format 'id - type'
            -- split_head=False : identifiers are just the cell type

        Each remaining row are the counts for a particular gene
            ** note that there are no gene identifiers in this format
               ... this may need to be considered in future revisions
    
        Note: this was originally written to be used with datasets from the
        2018 Duo paper; however, it's utility has evolved beyond this
        and hence the name should be changed in future revisions.

        :param path: path to csv file
        :param log_trans: whether to take the log transform of the cell counts
        :param log1p: whether to take the log(1+x) transform of the cell counts
        :param split_head: indicated whether the header is 'id - type' (True) or 'type'
        """
        
        # Read the data in as np.float32
        self.data = np.transpose(np.genfromtxt(path,delimiter=',',skip_header=1,dtype=np.float32))

        # take the log transform if necessary
        if log_trans:
            self.data = np.log(self.data)
        elif log1p:
            self.data = np.log(1+self.data)

        # capture the labels from the header
        with open(path,'r') as fh:
            head = fh.readline().rstrip('\n').replace('"','')
            if split_head:
                self.labels = [(x.split('-'))[1].lstrip(' ') for x in head.split(',')]
            else:
                self.labels = head.split(',')

        # note for the following code block, there is
        # probably a cleaner way of doing this using
        # sklearn.preprocessing.LabelEncoder
        
        # assign each label a unique numerical id
        label_dict = dict()
        count = 0
        for label in self.labels:
            if label not in label_dict:
                label_dict[label] = count
                count += 1

        # assign each cell it's corresponding numerical id
        self.tags = [label_dict[l] for l in self.labels]

        # get the number of dimensions
        self.dims = len(self.data[0])
        

    # necessary for Dataset class
    def __getitem__(self, index):
        return self.data[index]

    # necessary for Dataset class
    def __len__(self):
        return len(self.labels)

[docs]class PCAReducedDuo(DuoBenchmark): """ Same as DuoBenchmark but performs a preliminary PCA on the data. Usage is discouraged as it has not been updated to reflect recent Changes to the DuoBenchmark class """ def __init__(self,path,n_components=2,log_trans=False,log1p=False): super(PCAReducedDuo,self).__init__(path,log_trans=log_trans,log1p=log1p) self.old_data = self.data self.data = PCA(n_components=n_components).fit_transform(self.old_data) self.dims = len(self.data[0])
[docs]class FromPickle(Dataset): """ Load a Dataset from a pickled object. At this stage, however, labels will not be available for the Dataset. It is currently being used in scripts to compare embeddings. Labels can be taken from the full dataset """ def __init__(self,path): with open(path,'rb') as fh: self.data = pickle.load(fh).astype(np.float32) self.dims = len(self.data[0]) def __getitem__(self,index): return self.data[index] def __len__(self): return len(self.data)
[docs]class FromPickledPanda(Dataset): """ Load a dataset from pickled pandas dataframe """ def __init__(self,path): with open(path,'rb') as fh: pd_df = pickle.load(fh) self.labels = [str(x) for x in pd_df['cell_type']] pd_df = pd_df.drop('cell_type',axis=1) self.data = pd_df.values self.dims = len(self.data[0]) self.tags = LabelEncoder().fit_transform(self.labels) def __getitem__(self,index): return self.data[index] def __len__(self): return len(self.tags)
class E18MouseData(Dataset): def __init__(self, path: str, log1p: Optional[bool] = False, nproc: Optional[int] = 1, selection: Optional[list] = None, silent: Optional[bool] = False ) -> None: """ PyTorch Dataset wrapper to handle the GSE93421 hdf5 dataset :param path: path to hdf5 file for e18 Mouse Data :param nproc: number of processes to use in contructing vectors :param ratio: ratio of the dataset to actually load :param selection: list of cells to load data for :param silent: whether print statements should print """ hdf5 = h5py.File(path,'r',driver='core') self.dims = len(hdf5['mm10']['genes']) # allow a customizable selection of cells if selection is not None: self._len = len(selection) else: self._len = len(hdf5['mm10']['indptr']) selection = range(0,self._len) self.selection = selection # get a list that can be shared between processes selected_cells = sm.empty(self._len,dtype=np.int) for i in range(0,self._len): selected_cells[i] = self.selection[i] #self.cells = sm.full((self._len,self.dims),0,dtype=np.int16) # Load all of the important information into memory ############# # Data ############# if not silent: print("Reading data ...") start = time() ds = hdf5['mm10']['data'] data = sm.empty(len(ds),dtype=ds.dtype) ds.read_direct(data) tmp = ds.dtype end = time() if not silent: print("\t"+str(end-start)+"s ...") ############# # Indices ############# if not silent: print("Reading indices ...") start = time() ds = hdf5['mm10']['indices'] indx = sm.empty(len(ds),dtype=ds.dtype) ds.read_direct(indx) end = time() if not silent: print("\t"+str(end-start)+"s ...") ############# # Indptr ############# if not silent: print("Reading indptr ...") start = time() ds = hdf5['mm10']['indptr'] iptr = sm.empty(len(ds),dtype=ds.dtype) ds.read_direct(iptr) end = time() if not silent: print("\t"+str(end-start)+"s ...") hdf5.close() del ds ########################### # Create empty cell vectors ########################### # build the vector foreach cell if not silent: print("Creating 0 vectors ...") start = time() self.data = sm.full((self._len,self.dims),0,dtype=tmp) #self.cells = sm.full((self._len,self.dims),0,dtype=float) end = time() if not silent: print("\t"+str(end-start)+"s ...") ########################### # Multi-core loading ... ########################### if not silent: print("Building Tensor List ...") start = time() with sm.MapReduce(np = nproc) as pool: pool.map(_build_tensor, list(zip( [self.data] * nproc, [iptr] * nproc, [indx] * nproc, [data] * nproc, range(0,nproc) ,[nproc] * nproc, [selected_cells] * nproc, [log1p] * nproc)) ) end = time() if not silent: print("\t"+str(end-start)+"s ...") # Some explicit cleanup to conserve memory # Not sure if necessary, but I don't trust Python del iptr del indx del data del selected_cells def __getitem__(self, index): #return torch.Tensor(self.data[index]) return np.array(self.data[index]) def __len__(self): return self._len def _build_tensor(args): """ Helper function to allow parallel loading of tensors """ cells,iptr,indx,data,tid,nproc,selected,log1p = args for i in range(0+tid,len(cells),nproc): index = selected[i] sidx = iptr[index] # find the number of gene entries for # this cell if index < len(iptr) - 1: # find length to next cell nentries = iptr[index + 1] - sidx else: # find length to the end nentries = len(data) - sidx for j in range(0,nentries): if log1p: cells[i][indx[sidx+j]] = np.log(1+(data[sidx+j])) else: cells[i][indx[sidx+j]] = (data[sidx+j]) #cells[i][indx[sidx+j]] = float(data[sidx+j])
[docs]def scale_dataset(ds): """ Scale each feature to be between 0 and 1 Note that overwrites the original data. In the future, this function should be changed to retain scaling information :param ds: dataset to be scaled """ for i in range(0,ds.dims): fmax = ds.data[0][i] for j in range(1,len(ds)): curr = ds.data[j][i] if curr > fmax: fmax = curr if fmax > 0: for j in range(0,len(ds)): ds.data[j][i] /= fmax
if __name__ == '__main__': pass