22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257 | class MzmlWriter:
"""
A class to write peak data to mzML file, typically called after running simulation.
"""
def __init__(self, analysis_name, scans):
"""
Initialises the mzML writer class.
Args:
analysis_name: Name of the analysis.
scans: dict where key is scan level, value is a list of Scans object for that level.
"""
self.analysis_name = analysis_name
self.scans = scans
def write_mzML(self, out_file):
"""
Write mzMl to output file
Args:
out_file: path to mzML file
Returns: None
"""
# if directory doesn't exist, create it
out_dir = os.path.dirname(out_file)
create_if_not_exist(out_dir)
# start writing mzML here
with PsimsMzMLWriter(open(out_file, "wb")) as writer:
# add default controlled vocabularies
writer.controlled_vocabularies()
# write other fields like sample list, software list, etc.
self._write_info(writer)
# open the run
with writer.run(id=self.analysis_name):
self._write_spectra(writer, self.scans)
# open chromatogram list sections
with writer.chromatogram_list(count=1):
tic_rts, tic_intensities = self._get_tic_chromatogram(self.scans)
writer.write_chromatogram(
tic_rts,
tic_intensities,
id="tic",
chromatogram_type="total ion current chromatogram",
time_unit="second",
)
writer.close()
def _write_info(self, out):
"""
Write various information to output stream
Args:
out: the output stream from psims
Returns: None
"""
# check file contains what kind of spectra
has_ms1_spectrum = 1 in self.scans
has_msn_spectrum = 1 in self.scans and len(self.scans) > 1
file_contents = ["centroid spectrum"]
if has_ms1_spectrum:
file_contents.append("MS1 spectrum")
if has_msn_spectrum:
file_contents.append("MSn spectrum")
out.file_description(file_contents=file_contents, source_files=[])
out.sample_list(samples=[])
out.software_list(software_list={"id": "VMS", "version": "1.0.0"})
out.scan_settings_list(scan_settings=[])
out.instrument_configuration_list(
instrument_configurations={"id": "VMS", "component_list": []}
)
out.data_processing_list({"id": "VMS"})
def sort_filter(self, all_scans, min_scan_id):
"""
Filter scans according to certain criteria. Currently it filters by
the minimum scan ID, as a workaround to IAPI which produces unwanted scans at
low scan IDs.
Args:
all_scans: the list of scans to filter
min_scan_id: the minimum scan ID to filter
Returns: the list of filtered scans
"""
new_scans = []
for s in all_scans:
if s.scan_id >= min_scan_id:
if s.num_peaks == 0: # scans can't be empty, but we can't just remove either...
s.mzs = np.array([70.0])
s.intensities = np.array([1.0])
s.num_peaks = 1
new_scans.append(s)
new_scans.sort(key=lambda s: s.rt)
return new_scans
all_scans = sorted(all_scans, key=lambda x: x.rt)
all_scans = [x for x in all_scans if x.num_peaks > 0]
all_scans = list(filter(lambda x: x.scan_id >= min_scan_id, all_scans))
# FIXME: why do we need to do this???!!
# add a single peak to empty scans
# empty = [x for x in all_scans if x.num_peaks == 0]
# for scan in empty:
# scan.mzs = np.array([100.0])
# scan.intensities = np.array([1.0])
# scan.num_peaks = 1
return all_scans
def _write_spectra(self, writer, scans, min_scan_id=INITIAL_SCAN_ID):
"""
Helper method to actually write a collection of spectra
Args:
writer: the output stream from psims
scans: a list of scans
min_scan_id: the minimum scan ID to write
Returns: None
"""
# NOTE: we only support writing up to ms2 scans for now
assert len(scans) <= 3
# get all scans across different ms_levels and sort them by scan_id
all_scans = []
for ms_level in scans:
all_scans.extend(scans[ms_level])
all_scans = self.sort_filter(all_scans, min_scan_id)
spectrum_count = len(all_scans)
# write scans
with writer.spectrum_list(count=spectrum_count):
for scan in all_scans:
self._write_scan(writer, scan)
def _write_scan(self, out, scan):
"""
Helper method to write a single scan
Args:
out: the psims output stream
scan: the scan to write
Returns: None
"""
assert scan.num_peaks > 0
label = "MS1 Spectrum" if scan.ms_level == 1 else "MSn Spectrum"
precursor_information = None
if scan.ms_level == 2:
collision_energy = scan.scan_params.get(ScanParameters.COLLISION_ENERGY)
activation_type = scan.scan_params.get(ScanParameters.ACTIVATION_TYPE)
precursor_information = []
for precursor in scan.scan_params.get(ScanParameters.PRECURSOR_MZ):
precursor_information.append(
{
"mz": precursor.precursor_mz,
"intensity": precursor.precursor_intensity,
"charge": precursor.precursor_charge,
"spectrum_reference": precursor.precursor_scan_id,
"activation": [activation_type, {"collision energy": collision_energy}],
}
)
lowest_observed_mz = min(scan.mzs)
highest_observed_mz = max(scan.mzs)
# bp_pos = np.argmax(scan.intensities)
# bp_intensity = scan.intensities[bp_pos]
# bp_mz = scan.mzs[bp_pos]
scan_id = scan.scan_id
try:
first_mz = scan.scan_params.get(ScanParameters.FIRST_MASS)
last_mz = scan.scan_params.get(ScanParameters.LAST_MASS)
# if it's a method scan (not a custom scan), there's no scan_params
# to get first_mz and last_mz
except AttributeError:
first_mz, last_mz = DEFAULT_MS1_SCAN_WINDOW
polarity = scan.scan_params.get(ScanParameters.POLARITY)
if polarity == POSITIVE:
int_polarity = 1
elif polarity == NEGATIVE:
int_polarity = -1
else:
int_polarity = 1
logger.warning("Unknown polarity in mzml writer: {}".format(polarity))
out.write_spectrum(
scan.mzs,
scan.intensities,
id=scan_id,
polarity=int_polarity,
centroided=True,
scan_start_time=scan.rt / 60.0,
scan_window_list=[(first_mz, last_mz)],
params=[
{label: ""},
{"ms level": scan.ms_level},
{"total ion current": np.sum(scan.intensities)},
{"lowest observed m/z": lowest_observed_mz},
{"highest observed m/z": highest_observed_mz},
# {'base peak m/z', bp_mz},
# {'base peak intensity', bp_intensity}
],
precursor_information=precursor_information,
)
def _get_tic_chromatogram(self, scans):
"""
Helper method to write total ion chromatogram information
Args:
scans: the list of scans
Returns: a tuple of time array and intensity arrays for the TIC
"""
time_array = []
intensity_array = []
for ms1_scan in scans[1]:
time_array.append(ms1_scan.rt)
intensity_array.append(np.sum(ms1_scan.intensities))
time_array = np.array(time_array)
intensity_array = np.array(intensity_array)
return time_array, intensity_array
|