Coverage for langbrainscore/dataset/dataset.py: 23%

95 statements  

« prev     ^ index     » next       coverage.py v6.4, created at 2022-06-07 21:22 +0000

1# stdlib imports 

2import typing 

3from collections import abc 

4from pathlib import Path 

5 

6import numpy as np 

7import xarray as xr 

8from joblib import Parallel, delayed 

9from langbrainscore.interface import _Dataset 

10from langbrainscore.utils.logging import log 

11from langbrainscore.utils.xarray import collapse_multidim_coord 

12from tqdm import tqdm 

13 

14 

15class Dataset(_Dataset): 

16 @property 

17 def contents(self) -> xr.DataArray: 

18 """ 

19 access the internal xarray object. use with caution. 

20 """ 

21 return self._xr_obj 

22 

23 @property 

24 def stimuli(self) -> xr.DataArray: 

25 """ 

26 getter method that returns an xarray object of stimuli and associated metadata 

27 

28 Returns: 

29 xr.DataArray: xarray object containing the stimuli from the dataset and associated metadata 

30 """ 

31 return self.contents.stimulus 

32 

33 @property 

34 def dims(self) -> tuple: 

35 """ 

36 getter method that returns internal xarray dimensions 

37 

38 Returns: 

39 tuple[str]: dimensions of internal xarray object 

40 """ 

41 return self.contents.dims 

42 

43 def to_netcdf(self, filename): 

44 """ 

45 outputs the xarray.DataArray object to a netCDF file identified by 

46 `filename`. if it already exists, overwrites it. 

47 """ 

48 if Path(filename).expanduser().resolve().exists(): 

49 log(f"{filename} already exists. overwriting.", type="WARN") 

50 self._xr_obj.to_netcdf(filename) 

51 

52 @classmethod 

53 def load_netcdf(cls, filename): 

54 """ 

55 loads a netCDF object that contains a pre-packaged xarray instance from 

56 a file at `filename`. 

57 """ 

58 return cls(xr.load_dataarray(filename)) 

59 

60 @classmethod 

61 def from_file_or_url( 

62 cls, 

63 file_path_or_url: typing.Union[str, Path], 

64 data_column: str, 

65 sampleid_index: str, 

66 neuroid_index: str, 

67 stimuli_index: str, 

68 timeid_index: str = None, 

69 subject_index: str = None, 

70 sampleid_metadata: typing.Union[ 

71 typing.Iterable[str], typing.Mapping[str, str] 

72 ] = None, 

73 neuroid_metadata: typing.Union[ 

74 typing.Iterable[str], typing.Mapping[str, str] 

75 ] = None, 

76 timeid_metadata: typing.Union[ 

77 typing.Iterable[str], typing.Mapping[str, str] 

78 ] = None, 

79 multidim_metadata: typing.Iterable[ 

80 typing.Mapping[str, typing.Iterable[str]] 

81 ] = None, 

82 sort_by: typing.Iterable[str] = (), 

83 sep=",", 

84 parallel: int = -2, 

85 ) -> _Dataset: 

86 """Creates a Dataset object holding an `xr.DataArray` instance using a CSV file readable by pandas. 

87 Constructs the `xr.DataArray` using specified columns to construct dimensions and 

88 metadata along those dimensions in the form of coordinates. 

89 Minimally requires `sampleid` and `neuroid` to be provided. 

90 

91 Note: Each row of the supplied file must have a single data point corresponding 

92 to a unique `sampleid`, `neuroid`, and `timeid` (unique dimension values). 

93 I.e., each neuroid (which could be a voxel, an ROI, a reaction time RT value, etc.) 

94 must be on a new line for the same stimulus trial at a certain time. 

95 If `timeid` and `subjectid` is not provided: 

96 - a singleton timeid dimension is created with the value "0" for each sample. 

97 - a singleton subjectid dimension is created with value "0" that spans the entire data. 

98 For help on what these terms mean, please visit the 

99 [xarray glossary page](https://xarray.pydata.org/en/stable/user-guide/terminology.html) 

100 

101 

102 Args: 

103 file_path_or_url (typing.Union[str, Path]): a path or URL to a csv file 

104 data_column (str): title of the column that holds the datapoints per unit of measurement 

105 (e.g., BOLD contrast effect size, reaction time, voltage amplitude, etc) 

106 sampleid_index (str): title of the column that should be used to construct an index for sampleids. 

107 this should be unique for each stimulus in the dataset. 

108 neuroid_index (str): title of the column that should be used to construct an index for neuroids. 

109 this should be unique for each point of measurement within a subject. e.g., voxel1, voxel2, ... 

110 neuroids in the packaged dataset are transformed to be a product of subject_index and neuroid_index. 

111 stimuli_index (str): title of the column that holds stimuli shown to participants 

112 timeid_index (str, optional): title of the column that holds timepoints of stimulus presentation. 

113 optional. if not provided, a singleton timepoint '0' is assigned to each datapoint. Defaults to None. 

114 subject_index (str, optional): title of the column specifiying subject IDs. Defaults to None. 

115 sampleid_metadata (typing.Union[typing.Iterable[str], typing.Mapping[str,str]], optional): 

116 names of columns (and optionally mapping of existing column names to new coordinate names) 

117 that supply metadata along the sampleid dimension. Defaults to None. 

118 neuroid_metadata (typing.Union[typing.Iterable[str], typing.Mapping[str,str]], optional): 

119 see `sampleid_metadata`. Defaults to None. 

120 timeid_metadata (typing.Union[typing.Iterable[str], typing.Mapping[str,str]], optional): 

121 see `sampleid_metadata`. Defaults to None. 

122 multidim_metadata (typing.Iterable[typing.Mapping[str, typing.Iterable[str]]], optional): 

123 metadata to go with more than one dimension. e.g., chunks of a stimulus that unfolds with time. 

124 currently `NotImplemented`. Defaults to None. 

125 sort_by (typing.Iterable[str], optional): Sort data by these columns while repackaging it. 

126 data is sorted by `sampleid_index`, `neuroid_index`, and `timeid_index` in addition to this 

127 value. Defaults to (). 

128 sep (str, optional): separator to read a value-delimited file. this argument is passed to pandas. 

129 Defaults to ','. 

130 

131 Raises: 

132 ValueError: _description_ 

133 

134 Returns: 

135 _Dataset: a subclass of the `_Dataset` interface with the packaged xarray.DataArray as a member. 

136 """ 

137 

138 T = typing.TypeVar("T") 

139 

140 def collapse_same_value(arr: typing.Iterable[T]) -> T: 

141 """ 

142 makes sure each element in an iterable is identical (using __eq__) 

143 to every other element by value and returns (any) one of the elements. 

144 if a non-identical element (!=) is found, raises ValueError 

145 """ 

146 try: 

147 first_thing = next(iter(arr)) 

148 except StopIteration: 

149 log(f"failed to obtain value from {arr}", verbosity_check=True) 

150 return np.nan 

151 for each_thing in arr: 

152 if first_thing != each_thing: 

153 raise ValueError(f"{first_thing} != {each_thing}") 

154 return first_thing 

155 

156 import pandas as pd 

157 

158 if str(file_path_or_url).endswith(".parquet.gzip"): 

159 try: 

160 df = pd.read_parquet(file_path_or_url) 

161 except Exception as invalid_file: 

162 raise ValueError("invalid parquet file / filename") from invalid_file 

163 else: 

164 try: 

165 df = pd.read_csv(file_path_or_url, sep=sep) 

166 except Exception as invalid_file: 

167 raise ValueError("invalid csv file / filename") from invalid_file 

168 

169 if timeid_index is None: 

170 timeid_index = "timeid" 

171 # create singleton timeid 

172 # we don't need to inflate data since each datapoint will just 

173 # correspond to timeid == 0 per sample 

174 timeid_column = [0] * len(df) 

175 df[timeid_index] = timeid_column 

176 if subject_index is None: 

177 subject_index = "subject" 

178 # create singleton subjectID 

179 # we don't need to inflate data since each datapoint will just 

180 # correspond to subject == 0 per sample 

181 subject_column = ["subject0"] * len(df) 

182 df[subject_index] = subject_column 

183 if not parallel: 

184 parallel = 1 

185 

186 subjects = list(set(df[subject_index])) 

187 sampleids = list( 

188 set(df[sampleid_index]) 

189 ) # what happens when the same stimulus is shown multiple times? 

190 # it will add entries with same sampleid that will have to 

191 # then be differentiated on the basis of metadata only 

192 # https://i.imgur.com/4V2DsIo.png 

193 neuroids = list(set(df[neuroid_index])) 

194 timeids = list(set(df[timeid_index])) 

195 

196 if not isinstance(sampleid_metadata, abc.Mapping): 

197 sampleid_metadata = {k: k for k in sampleid_metadata} 

198 if not isinstance(neuroid_metadata, abc.Mapping): 

199 neuroid_metadata = {k: k for k in neuroid_metadata} 

200 if not isinstance(timeid_metadata, abc.Mapping): 

201 timeid_metadata = {k: k for k in timeid_metadata or ()} 

202 

203 df = df.sort_values( 

204 [*sort_by, subject_index, sampleid_index, neuroid_index, timeid_index] 

205 ) 

206 

207 def get_sampleid_xr(sampleid): 

208 sample_view = df[df[sampleid_index] == sampleid] # why not sampleid_view? 

209 

210 neuroid_xrs = [] 

211 for neuroid in neuroids: 

212 neuroid_view = sample_view[sample_view[neuroid_index] == neuroid] 

213 

214 timeid_xrs = [] 

215 for timeid in timeids: 

216 timeid_view = neuroid_view[neuroid_view[timeid_index] == timeid] 

217 data = timeid_view[data_column].values 

218 timeid_xr = xr.DataArray( 

219 data.reshape(1, len(timeid_view[subject_index]), 1), 

220 dims=("sampleid", "neuroid", "timeid"), 

221 coords={ 

222 "sampleid": np.repeat(sampleid, 1), 

223 "neuroid": [ 

224 f"{a}_{b}" 

225 for a, b in zip( 

226 timeid_view[subject_index], 

227 timeid_view[neuroid_index], 

228 ) 

229 ], 

230 "timeid": np.repeat(timeid, 1), 

231 "subject": ("neuroid", timeid_view[subject_index]), 

232 "stimulus": ( 

233 "sampleid", 

234 [collapse_same_value(timeid_view[stimuli_index])], 

235 ), 

236 **{ 

237 metadata_names[column]: ( 

238 dimension, 

239 [collapse_same_value(timeid_view[column])], 

240 ) 

241 for dimension, metadata_names in ( 

242 ("sampleid", sampleid_metadata), 

243 ("timeid", timeid_metadata), 

244 ) 

245 for column in metadata_names 

246 }, 

247 **{ 

248 neuroid_metadata[column]: ( 

249 "neuroid", 

250 (timeid_view[column]), 

251 ) 

252 for column in neuroid_metadata 

253 }, 

254 }, 

255 ) 

256 timeid_xrs += [timeid_xr] 

257 

258 neuroid_xr = xr.concat(timeid_xrs, dim="timeid") 

259 neuroid_xrs += [neuroid_xr] 

260 

261 sampleid_xr = xr.concat(neuroid_xrs, dim="neuroid") 

262 return sampleid_xr 

263 

264 sampleid_xrs = Parallel(n_jobs=parallel)( 

265 delayed(get_sampleid_xr)(sampleid) 

266 for sampleid in tqdm(sampleids, desc="reassembling data per sampleid") 

267 ) 

268 

269 unified_xr = xr.concat(sampleid_xrs, dim="sampleid") 

270 

271 for dimension, metadata_names in ( 

272 ("sampleid", {**sampleid_metadata, "stimulus": "stimulus"}), 

273 ("timeid", timeid_metadata), 

274 ("neuroid", {**neuroid_metadata, "subject": "subject"}), 

275 ): 

276 for column in metadata_names: 

277 try: 

278 unified_xr = collapse_multidim_coord( 

279 unified_xr, metadata_names[column], dimension 

280 ) 

281 except ValueError as e: 

282 log( 

283 f"dimension:{dimension}, column:{column}, shape:{unified_xr[metadata_names[column]].shape}", 

284 type="ERR", 

285 ) 

286 

287 return cls(unified_xr) # NOTE: we use `cls` rather than `Dataset` so any 

288 # subclasses will use the subclass rather than parent